flink提供两种主流提交模式(生产推荐的 Application 模式 和 共享资源的 Session 模式)。
Application Mode(应用模式):每个作业启动一个独立的 Flink 集群,作业的main方法直接运行在 YARN 的 JobManager 上。
Session Mode(会话模式):先启动一个共享的 Flink 集群(Session),再提交多个作业到这个集群共享资源。
一、前置条件
在执行提交命令前,确保满足以下条件:
已配置
FLINK_HOME环境变量(指向 Flink 安装目录);已配置
HADOOP_CONF_DIR(指向 Hadoop 的 conf 目录,让 Flink 能连接 YARN);YARN 集群正常运行,Flink Jar 包已完成本地测试;
建议将 Flink 依赖包预上传到 HDFS(避免每次提交重复上传):
hdfs dfs -mkdir -p /flink/lib
hdfs dfs -put $FLINK_HOME/lib/* /flink/lib/二、生产环境核心命令(推荐 Application模式)
1. YARN Application Mode(Per-Job 集群模式,生产首选)
该模式下每个作业独立占用 YARN 资源,隔离性强,故障不会影响其他作业,是 Flink 1.13 + 官方推荐的生产模式。
# 1. 配置环境变量(若未全局配置)
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop # 替换为你的Hadoop conf路径
export FLINK_HOME=/opt/flink-1.17.0 # 替换为你的Flink版本路径
# 2. 提交Jar包核心命令
$FLINK_HOME/bin/flink run-application \ # 指定使用application模式
-t yarn-application \ # 指定使用YARN模式
-s hdfs:///user/flink/checkpoint/a89b89825fe6776304366aa7b07da249/chk-168 \ #从checkpoint目录下恢复(可选)
-D jobmanager.memory.process.size=2048m \ # JobManager总进程内存(生产至少2G)
-D taskmanager.memory.process.size=2048m \ # TaskManager总进程内存(生产至少4G)
-D taskmanager.numberOfTaskSlots=1 \ # 每个TM的任务槽数(对应CPU核数,1槽=1核)
-D parallelism.default=1 \ # 作业默认并行度(根据集群资源调整)
-D yarn.application.queue=root.default \ # 指定YARN生产队列(避免占用测试队列)
-D yarn.provided.lib.dirs="hdfs://192.168.1.211:9000/user/flink/lib" \ # 预上传的Flink依赖包路径(加速提交)
-D restart-strategy=fixed-delay \ # 生产级重启策略:固定延迟重启
-D restart-strategy.fixed-delay.attempts=3 \ # 最大重启次数
-D restart-strategy.fixed-delay.delay=10s \ # 重启间隔
-D fs.default-scheme=hdfs://192.168.1.211:9000 \ # 默认文件系统(替换为你的NameNode地址)
-D yarn.log-aggregation.enable=true \ # 开启YARN日志聚合(方便排查问题)
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonMultiOutStream \ # 主类路径
/home/phoenix/git/my_bigdata/target/my_bigdata-1.0-SNAPSHOT.jar \ # 替换为业务Jar包路径
参数1 \ # 作业自定义参数1(示例)
参数2 \ # 作业自定义参数2(示例)样例如下:
$FLINK_HOME/bin/flink run-application -d \
-t yarn-application \
-D yarn.application.name="flink_kafka2paimon_prod_v1" \
-s hdfs:///user/flink/checkpoint/a89b89825fe6776304366aa7b07da249/chk-216 \
-D containerized.master.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.taskmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.jobmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D jobmanager.memory.process.size=2048m \
-D taskmanager.memory.process.size=2048m \
-D taskmanager.numberOfTaskSlots=1 \
-D parallelism.default=1 \
-D yarn.application.queue=root.phoenix \
-D yarn.log-aggregation.enable=true \
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonMultiOutStream \
/home/phoenix/git/my_bigdata/target/my_bigdata-1.0-SNAPSHOT.jar关键参数解释:
flink提交模式对客户端位置的影响:
Job Client 是用户与 Flink 集群的桥梁,与核心组件的交互流程如下:
客户端解析用户代码 → 生成 JobGraph
客户端上传 JobGraph 及依赖到服务器
客户端向 JobManager 提交任务并申请资源
JobManager 调度任务到 TaskManager 执行
执行过程中,客户端与 JobManager 保持心跳与状态同步
任务结束后,JobManager 向客户端返回结果
2. YARN Session 模式(适合多小作业共享资源)
若你有多个轻量级作业,可先启动共享的 YARN Session,再提交作业(隔离性弱,故障会影响所有作业):
# 第一步:启动YARN Session(后台运行)
$FLINK_HOME/bin/yarn-session.sh \
-n 3 \ # TaskManager数量
-s 4 \ # 每个TM的slot数
-jm 4096m \ # JM内存
-tm 8192m \ # TM内存
-que production \ # YARN队列
-d # 后台运行(不占用终端)
# 第二步:提交Jar包到已启动的Session
$FLINK_HOME/bin/flink run \
-yid application_1740000000000_1234 \# 指定要关联的YARN Session ID
-p 8 \ # 作业并行度
/path/to/your/business-job.jar \
--order-topic order_data
三、生产环境额外建议
日志配置:修改
flink-conf.yaml,将日志输出到 HDFS,方便持久化查询:yarn.log.dir: hdfs:///flink/logs log.file: hdfs:///flink/logs/${yarn.application.id}作业启停:
查看 YARN 上的 Flink 作业:
yarn application -list停止作业:
yarn application -kill <application-id>或flink cancel <job-id>
资源监控:通过 YARN ResourceManager UI(默认 8088)或 Flink WebUI(YARN Session 模式下可通过
yarn application -status <app-id>获取)监控作业资源使用。
总结
生产环境优先选择YARN Application Mode(Per-Job),通过
-t yarn-application指定,隔离性和稳定性最优;核心配置必须包含:检查点(EXACTLY_ONCE)、重启策略、状态后端(RocksDB)、YARN 队列;
预上传 Flink 依赖到 HDFS(
yarn.provided.lib.dirs)可大幅提升提交速度,日志聚合开启便于问题排查。
评论