flink提供两种主流提交模式(生产推荐的 Application 模式 和 共享资源的 Session 模式)。

Application Mode(应用模式):每个作业启动一个独立的 Flink 集群,作业的main方法直接运行在 YARN 的 JobManager 上。

Session Mode(会话模式):先启动一个共享的 Flink 集群(Session),再提交多个作业到这个集群共享资源。

一、前置条件

在执行提交命令前,确保满足以下条件:

  1. 已配置FLINK_HOME环境变量(指向 Flink 安装目录);

  2. 已配置HADOOP_CONF_DIR(指向 Hadoop 的 conf 目录,让 Flink 能连接 YARN);

  3. YARN 集群正常运行,Flink Jar 包已完成本地测试;

  4. 建议将 Flink 依赖包预上传到 HDFS(避免每次提交重复上传):

hdfs dfs -mkdir -p /flink/lib
hdfs dfs -put $FLINK_HOME/lib/* /flink/lib/

二、生产环境核心命令(推荐 Application模式)

1. YARN Application Mode(Per-Job 集群模式,生产首选)

该模式下每个作业独立占用 YARN 资源,隔离性强,故障不会影响其他作业,是 Flink 1.13 + 官方推荐的生产模式。

# 1. 配置环境变量(若未全局配置)
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop  # 替换为你的Hadoop conf路径
export FLINK_HOME=/opt/flink-1.17.0            # 替换为你的Flink版本路径

# 2. 提交Jar包核心命令
$FLINK_HOME/bin/flink run-application \        # 指定使用application模式
-t yarn-application \                          # 指定使用YARN模式
-s hdfs:///user/flink/checkpoint/a89b89825fe6776304366aa7b07da249/chk-168 \ #从checkpoint目录下恢复(可选)
-D jobmanager.memory.process.size=2048m \       # JobManager总进程内存(生产至少2G)
-D taskmanager.memory.process.size=2048m \      # TaskManager总进程内存(生产至少4G)
-D taskmanager.numberOfTaskSlots=1 \            # 每个TM的任务槽数(对应CPU核数,1槽=1核)
-D parallelism.default=1 \                      # 作业默认并行度(根据集群资源调整)
-D yarn.application.queue=root.default \          # 指定YARN生产队列(避免占用测试队列)
-D yarn.provided.lib.dirs="hdfs://192.168.1.211:9000/user/flink/lib" \ # 预上传的Flink依赖包路径(加速提交)
-D restart-strategy=fixed-delay \               # 生产级重启策略:固定延迟重启
-D restart-strategy.fixed-delay.attempts=3 \    # 最大重启次数
-D restart-strategy.fixed-delay.delay=10s \     # 重启间隔
-D fs.default-scheme=hdfs://192.168.1.211:9000 \  # 默认文件系统(替换为你的NameNode地址)
-D yarn.log-aggregation.enable=true \           # 开启YARN日志聚合(方便排查问题)
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonMultiOutStream \ # 主类路径
/home/phoenix/git/my_bigdata/target/my_bigdata-1.0-SNAPSHOT.jar \               # 替换为业务Jar包路径
参数1 \                     # 作业自定义参数1(示例)
参数2 \                       # 作业自定义参数2(示例)

样例如下:

$FLINK_HOME/bin/flink run-application -d \
-t yarn-application \
-D yarn.application.name="flink_kafka2paimon_prod_v1" \
-s hdfs:///user/flink/checkpoint/a89b89825fe6776304366aa7b07da249/chk-216 \
-D containerized.master.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.taskmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.jobmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D jobmanager.memory.process.size=2048m \
-D taskmanager.memory.process.size=2048m \
-D taskmanager.numberOfTaskSlots=1 \
-D parallelism.default=1 \
-D yarn.application.queue=root.phoenix \
-D yarn.log-aggregation.enable=true \
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonMultiOutStream \
/home/phoenix/git/my_bigdata/target/my_bigdata-1.0-SNAPSHOT.jar

关键参数解释:

参数

作用

生产建议值

jobmanager.memory.process.size

JM 总进程内存(含堆、堆外)

2G~8G(根据作业复杂度)

taskmanager.memory.process.size

TM 总进程内存

4G~16G(根据数据量)

taskmanager.numberOfTaskSlots

每个 TM 的 slot 数

2~8(匹配 CPU 核数)

parallelism.default

默认并行度

等于集群总 slot 数的 70%~80%

execution.checkpointing.mode

检查点模式

必须为EXACTLY_ONCE

restart-strategy

重启策略

生产用fixed-delay,避免无限制重启

yarn.provided.lib.dirs

Flink 依赖包 HDFS 路径

必配,提升提交速度 50%+

flink提交模式对客户端位置的影响:

提交模式

客户端行为

适用场景

Attached 模式(默认)

客户端保持运行,持续接收日志并发送心跳

关闭终端 / 断开连接会导致客户端退出、心跳中断

开发调试、短任务运行

Detached 模式(-d 参数)

提交后客户端立即退出,任务完全由集群托管

不再依赖客户端心跳,彻底解决心跳超时问题

生产环境长期运行任务(强烈推荐)

Job Client 是用户与 Flink 集群的桥梁,与核心组件的交互流程如下:

  1. 客户端解析用户代码 → 生成 JobGraph

  2. 客户端上传 JobGraph 及依赖到服务器

  3. 客户端向 JobManager 提交任务并申请资源

  4. JobManager 调度任务到 TaskManager 执行

  5. 执行过程中,客户端与 JobManager 保持心跳与状态同步

  6. 任务结束后,JobManager 向客户端返回结果

2. YARN Session 模式(适合多小作业共享资源)

若你有多个轻量级作业,可先启动共享的 YARN Session,再提交作业(隔离性弱,故障会影响所有作业):

# 第一步:启动YARN Session(后台运行)
$FLINK_HOME/bin/yarn-session.sh \
-n 3 \          # TaskManager数量
-s 4 \          # 每个TM的slot数
-jm 4096m \     # JM内存
-tm 8192m \     # TM内存
-que production \ # YARN队列
-d             # 后台运行(不占用终端)

# 第二步:提交Jar包到已启动的Session
$FLINK_HOME/bin/flink run \
-yid application_1740000000000_1234 \# 指定要关联的YARN Session ID
-p 8 \          # 作业并行度
/path/to/your/business-job.jar \
--order-topic order_data

三、生产环境额外建议

  1. 日志配置:修改flink-conf.yaml,将日志输出到 HDFS,方便持久化查询:

    yarn.log.dir: hdfs:///flink/logs
    log.file: hdfs:///flink/logs/${yarn.application.id}
  2. 作业启停

    • 查看 YARN 上的 Flink 作业:yarn application -list

    • 停止作业:yarn application -kill <application-id>flink cancel <job-id>

  3. 资源监控:通过 YARN ResourceManager UI(默认 8088)或 Flink WebUI(YARN Session 模式下可通过yarn application -status <app-id>获取)监控作业资源使用。

总结

  1. 生产环境优先选择YARN Application Mode(Per-Job),通过-t yarn-application指定,隔离性和稳定性最优;

  2. 核心配置必须包含:检查点(EXACTLY_ONCE)、重启策略、状态后端(RocksDB)、YARN 队列;

  3. 预上传 Flink 依赖到 HDFS(yarn.provided.lib.dirs)可大幅提升提交速度,日志聚合开启便于问题排查。