flink提供两种主流提交模式(生产推荐的 Application 模式 和 共享资源的 Session 模式)。

Application Mode(应用模式):每个作业启动一个独立的 Flink 集群,作业的main方法直接运行在 YARN 的 JobManager 上。

Session Mode(会话模式):先启动一个共享的 Flink 集群(Session),再提交多个作业到这个集群共享资源。

一、前置条件

在执行提交命令前,确保满足以下条件:

  1. 已配置FLINK_HOME环境变量(指向 Flink 安装目录);

  2. 已配置HADOOP_CONF_DIR(指向 Hadoop 的 conf 目录,让 Flink 能连接 YARN);

  3. YARN 集群正常运行,Flink Jar 包已完成本地测试;

  4. 建议将 Flink 依赖包预上传到 HDFS(避免每次提交重复上传):

hdfs dfs -mkdir -p /flink/lib
hdfs dfs -put $FLINK_HOME/lib/* /flink/lib/

二、生产环境核心命令(推荐 Application模式)

1. YARN Application Mode(Per-Job 集群模式,生产首选)

该模式下每个作业独立占用 YARN 资源,隔离性强,故障不会影响其他作业,是 Flink 1.13 + 官方推荐的生产模式。

# 1. 配置环境变量(若未全局配置)
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop  # 替换为你的Hadoop conf路径
export FLINK_HOME=/opt/flink-1.17.0            # 替换为你的Flink版本路径

# 2. 提交Jar包核心命令
$FLINK_HOME/bin/flink run-application \        # 指定使用application模式
-t yarn-application \                          # 指定使用YARN模式
-s hdfs:///user/flink/checkpoint/a89b89825fe6776304366aa7b07da249/chk-168 \ #从checkpoint目录下恢复(可选)
-D jobmanager.memory.process.size=2048m \       # JobManager总进程内存(生产至少2G)
-D taskmanager.memory.process.size=2048m \      # TaskManager总进程内存(生产至少4G)
-D taskmanager.numberOfTaskSlots=1 \            # 每个TM的任务槽数(对应CPU核数,1槽=1核)
-D parallelism.default=1 \                      # 作业默认并行度(根据集群资源调整)
-D yarn.application.queue=root.default \          # 指定YARN生产队列(避免占用测试队列)
-D yarn.provided.lib.dirs="hdfs://192.168.1.211:9000/user/flink/lib" \ # 预上传的Flink依赖包路径(加速提交)
-D restart-strategy=fixed-delay \               # 生产级重启策略:固定延迟重启
-D restart-strategy.fixed-delay.attempts=3 \    # 最大重启次数
-D restart-strategy.fixed-delay.delay=10s \     # 重启间隔
-D fs.default-scheme=hdfs://192.168.1.211:9000 \  # 默认文件系统(替换为你的NameNode地址)
-D yarn.log-aggregation.enable=true \           # 开启YARN日志聚合(方便排查问题)
-D yarn.ship-files=/home/phoenix/data/dolphinscheduler332/resources/flink/kafka2paimon_withhivecatalog.conf \
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonMultiOutStream \ # 主类路径
/home/phoenix/git/my_bigdata/target/my_bigdata-1.0-SNAPSHOT.jar \               # 替换为业务Jar包路径
参数1 \                     # 作业自定义参数1(示例)
参数2 \                       # 作业自定义参数2(示例)

样例如下:

######################### 提交样例1:传递一个参数 ###########################
$FLINK_HOME/bin/flink run-application -d \
-t yarn-application \
-D yarn.application.name="flink_kafka2paimon_prod_v1" \
-s hdfs:///user/flink/checkpoint/a89b89825fe6776304366aa7b07da249/chk-216 \
-D containerized.master.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.taskmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.jobmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D jobmanager.memory.process.size=2048m \
-D taskmanager.memory.process.size=2048m \
-D taskmanager.numberOfTaskSlots=1 \
-D parallelism.default=1 \
-D yarn.application.queue=root.phoenix \
-D yarn.log-aggregation.enable=true \
-D yarn.ship-files=/home/phoenix/data/dolphinscheduler332/phoenix/resources/flink/kafka2paimon_withhivecatalog.conf \
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonMultiOutStream \
/home/phoenix/git/my_bigdata/target/my_bigdata-1.0-SNAPSHOT.jar \
kafka2paimon_withhivecatalog.conf

######################### 提交样例1:传递多个参数 ###########################
$FLINK_HOME/bin/flink run-application -d \
-t yarn-application \
-D yarn.application.name="flink_kafka2paimon_bytable_v1" \
-D containerized.master.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.taskmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.jobmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D jobmanager.memory.process.size=1024m \
-D taskmanager.memory.process.size=1024m \
-D taskmanager.numberOfTaskSlots=1 \
-D parallelism.default=1 \
-D yarn.application.queue=root.phoenix \
-D yarn.log-aggregation.enable=true \
-D yarn.ship-files=/opt/apps/bigdata/apache-hive-2.3.9-bin/conf \
-D yarn.ship-files=/opt/apps/bigdata/hadoop-2.10.2/etc/hadoop \
-D yarn.ship-files=/home/phoenix/data/dolphinscheduler332/phoenix/resources/flink/flink_kafka2paimon_bytable.conf \
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonByTable \
/home/phoenix/data/dolphinscheduler332/phoenix/resources/flink/my_bigdata-1.0-SNAPSHOT.jar \
flink_kafka2paimon_bytable.conf \
$catalogName $hiveCatalogDDL $kafkaSourceDDL $paimonTableDDL $insertSql

关键参数解释:

参数

作用

生产建议值

jobmanager.memory.process.size

JM 总进程内存(含堆、堆外)

2G~8G(根据作业复杂度)

taskmanager.memory.process.size

TM 总进程内存

4G~16G(根据数据量)

taskmanager.numberOfTaskSlots

每个 TM 的 slot 数

2~8(匹配 CPU 核数)

parallelism.default

默认并行度

等于集群总 slot 数的 70%~80%

execution.checkpointing.mode

检查点模式

必须为EXACTLY_ONCE

restart-strategy

重启策略

生产用fixed-delay,避免无限制重启

yarn.provided.lib.dirs

Flink 依赖包 HDFS 路径

必配,提升提交速度 50%+

yarn.ship-files

配置选项(Configuration Option),用于指定需要随 Flink 应用程序一起分发到 YARN 集群中各个节点的额外文件。文件在提交job的服务器中存在。

containerized.jobmanager.env.JAVA_HOME

containerized.jobmanager.env.JAVA_HOME

设置yarn容器的java路径,yarn容器可以直接共享宿主机文件系统(容器中配置路径需要和宿主机一致)

flink提交模式对客户端位置的影响:

提交模式

客户端行为

适用场景

Attached 模式(默认)

客户端保持运行,持续接收日志并发送心跳

关闭终端 / 断开连接会导致客户端退出、心跳中断

开发调试、短任务运行

Detached 模式(-d 参数)

提交后客户端立即退出,任务完全由集群托管

不再依赖客户端心跳,彻底解决心跳超时问题

生产环境长期运行任务(强烈推荐)

Job Client 是用户与 Flink 集群的桥梁,与核心组件的交互流程如下:

  1. 客户端解析用户代码 → 生成 JobGraph

  2. 客户端上传 JobGraph 及依赖到服务器

  3. 客户端向 JobManager 提交任务并申请资源

  4. JobManager 调度任务到 TaskManager 执行

  5. 执行过程中,客户端与 JobManager 保持心跳与状态同步

  6. 任务结束后,JobManager 向客户端返回结果

2. YARN Session 模式(适合多小作业共享资源)

若你有多个轻量级作业,可先启动共享的 YARN Session,再提交作业(隔离性弱,故障会影响所有作业):

# 第一步:启动YARN Session(后台运行)
$FLINK_HOME/bin/yarn-session.sh \
-n 3 \          # TaskManager数量
-s 4 \          # 每个TM的slot数
-jm 4096m \     # JM内存
-tm 8192m \     # TM内存
-que production \ # YARN队列
-d             # 后台运行(不占用终端)

# 第二步:提交Jar包到已启动的Session
$FLINK_HOME/bin/flink run \
-yid application_1740000000000_1234 \# 指定要关联的YARN Session ID
-p 8 \          # 作业并行度
/path/to/your/business-job.jar \
--order-topic order_data

三、生产环境额外建议

  1. 日志配置:修改flink-conf.yaml,将日志输出到 HDFS,方便持久化查询:

    yarn.log.dir: hdfs:///flink/logs
    log.file: hdfs:///flink/logs/${yarn.application.id}
  2. 作业启停

    • 查看 YARN 上的 Flink 作业:yarn application -list

    • 停止作业:yarn application -kill <application-id>flink cancel <job-id>

  3. 资源监控:通过 YARN ResourceManager UI(默认 8088)或 Flink WebUI(YARN Session 模式下可通过yarn application -status <app-id>获取)监控作业资源使用。

四、监控flink作业是否正常运行

flink作业提交后需要实时监控作业运行是否正常,可以使用多种方式实现。

自动化、可扩展的方案。Flink 提供了强大的 Metrics 系统,可以将内部指标暴露给外部监控系统(如 Prometheus, InfluxDB, Datadog 等)。然后在Prometheus中配置报警规则。

2、自定义脚本,简单方便(开发环境)

使用时将flink job name作为参数传入即可监控flink作业的状态。

vim monitor_flink_job.sh

#!/bin/bash

set -e
yarn_app_name="$1"

# === 提取 application_id ===
APP_ID=$(yarn application -list 2>/dev/null | grep "$yarn_app_name" | awk '{print $1}')

if [ -z "$APP_ID" ]; then
  echo "[ERROR] Failed to extract YARN application ID from output."
  echo "Please ensure the Flink command outputs an application ID (e.g., in yarn-application mode)."
  exit 1
fi

echo "[INFO] Detected YARN Application ID: $APP_ID"

# === 监控 YARN 应用状态 ===
SLEEP_INTERVAL=300  # 每300秒检查一次
# MAX_CHECKS=$(( MAX_WAIT_MINUTES * 60 / SLEEP_INTERVAL ))
CHECK_COUNT=0

while [ true ]; do
  sleep $SLEEP_INTERVAL
  CHECK_COUNT=$((CHECK_COUNT + 1))

  # 获取应用状态(使用 yarn application -status)
  STATUS_OUTPUT=$(yarn application -status "$APP_ID" 2>/dev/null || true)
  STATE_LINE=$(echo "$STATUS_OUTPUT" | grep -i "State :" || echo "")
  FINAL_STATE_LINE=$(echo "$STATUS_OUTPUT" | grep -i "Final-State :" || echo "")

  # 优先看 Final-State(作业结束后才有),否则看 State
  if [[ -n "$FINAL_STATE_LINE" ]]; then
    STATE=$(echo "$FINAL_STATE_LINE" | awk -F: '{print $2}' | tr -d '[:space:]')
  elif [[ -n "$STATE_LINE" ]]; then
    STATE=$(echo "$STATE_LINE" | awk -F: '{print $2}' | tr -d '[:space:]')
  else
    # 无法获取状态,可能应用已不存在
    echo "[WARN] Cannot retrieve status for $APP_ID. It may have been removed from YARN."
    echo "[INFO] Checking if app exists via 'yarn application -list'..."
    if yarn application -list 2>/dev/null | grep -q "$APP_ID"; then
      echo "[INFO] App still exists but status unknown. Continue monitoring..."
      continue
    else
      echo "[ERROR] Application $APP_ID not found in YARN. Assuming it failed or was killed."
      exit 1
    fi
  fi

  echo "[INFO] Current state of $APP_ID: $STATE (check $CHECK_COUNT)"

  case "$STATE" in
    "FINISHED")
      echo "[SUCCESS] Application $APP_ID finished successfully."
      exit 0
      ;;
    "FAILED"|"KILLED"|"LOST")
      echo "[ERROR] Application $APP_ID ended with state: $STATE"
      exit 1
      ;;
    "ACCEPTED"|"RUNNING"|"SUBMITTED")
      # 正常运行中,继续监控
      continue
      ;;
    *)
      echo "[WARN] Unknown state: $STATE. Continue monitoring..."
      continue
      ;;
  esac
done

# 超时
echo "[ERROR] Timeout after $MAX_WAIT_MINUTES minutes. Application $APP_ID did not finish."
exit 1

总结

  1. 生产环境优先选择YARN Application Mode(Per-Job),通过-t yarn-application指定,隔离性和稳定性最优;

  2. 核心配置必须包含:检查点(EXACTLY_ONCE)、重启策略、状态后端(RocksDB)、YARN 队列;

  3. 预上传 Flink 依赖到 HDFS(yarn.provided.lib.dirs)可大幅提升提交速度,日志聚合开启便于问题排查。