🧩 前提条件

  • 已安装并启动 Apache DolphinScheduler(建议 v3.0+)

  • 已安装 Apache Flink(Standalone 或 YARN 模式均可)

  • DolphinScheduler 的 Worker 节点可以访问 Flink 安装目录或能通过命令行执行 flink 命令

  • 若使用 YARN 模式,确保 Hadoop 和 YARN 环境已配置好


如果部署的standalone服务,在standalone-server/conf/dolphinscheduler_env.sh中添加flink配置:

export FLINK_HOME=/home/phoenix/apps/flink-1.20.3
export HADOOP_HOME=/home/phoenix/apps/hadoop-2.10.2
export HADOOP_CONF_HOME=/home/phoenix/apps/hadoop-2.10.2/etc/hadoop
PATH=$PATH:$FLINK_HOME/bin:$HADOOP_HOME/bin

配置号以后重启dolphin服务。

  1. 进入项目 → 工作流定义创建工作流

  2. 拖拽左侧 “Flink” 节点到画布中

  3. 双击该节点进行配置

配置 Flink 任务参数:

参数

说明

示例

任务名称

自定义

MyFlinkJob

运行模式

Local / Standalone / Yarn Per-Job / Yarn Session

Yarn Per-Job

Flink Version

Flink 版本(仅用于显示)

1.17

Program Type

主类语言类型

JAVA / SCALA / PYTHON

Main Class

Flink 作业主类(Java/Scala)

com.example.WordCount

Main Jar Package

上传的 JAR 包路径(相对路径,见下文)

flink-examples.jar

Deploy Mode

部署模式(通常与运行模式一致)

yarn-per-job

Slot Number

TaskManager Slot 数量

2

TaskManager Memory

TM 内存(MB)

2048

JobManager Memory

JM 内存(MB)

1024

Args

传递给 main 方法的参数

--input hdfs:///data/input --output hdfs:///data/output

Other Parameters

其他 Flink CLI 参数

-Dexecution.checkpointing.interval=60000

⚠️ 注意Main Jar Package 必须是 资源中心 中已上传的 JAR 文件(见下一步)。

上传 Flink JAR 到资源中心:

  1. 在顶部菜单点击 资源中心文件管理

  2. 点击 上传,选择你的 Flink 作业 JAR 包(如 my-flink-job.jar

  3. 上传后,路径会显示为:my-flink-job.jar(根目录)或 /dir/my-flink-job.jar

在 Flink 任务节点的 Main Jar Package 字段中填写该路径(不含前导 /

问题:

提交flink任务后,dolphin显示任务完成,但是flink实时任务会长期执行,dolphin结束则flink client结束,flink job会因为没有flink client心跳而取消整个作业。

提交flink job后,定期检测yarn application的状态,dolphin任务一直执行,便于在flink作业失败后及时报警。

1)在dolphin的资源中心中创建shell脚本:

#!/bin/bash

set -e

# === 参数解析 ===
if [ $# -lt 1 ]; then
  echo "Usage: $0 \"<full_flink_submit_command>\" [max_wait_minutes]"
  echo "Example: $0 \"flink run-application -t yarn-application -c MyApp my.jar\" 60"
  exit 1
fi

FLINK_SUBMIT_CMD="$1"
MAX_WAIT_MINUTES=${2:-360}  # 默认最多等 6 小时

# === 提交 Flink 作业 ===
echo "[INFO] Submitting Flink job with command:"
echo "$FLINK_SUBMIT_CMD"
echo "----------------------------------------"

# 执行提交并捕获输出(同时显示到控制台)
OUTPUT=$(eval "$FLINK_SUBMIT_CMD" 2>&1)
EXIT_CODE=$?

# 打印提交结果
echo "$OUTPUT"

# 如果提交本身失败,直接退出
if [ $EXIT_CODE -ne 0 ]; then
  echo "[ERROR] Flink submit command failed!"
  exit $EXIT_CODE
fi

# === 提取 application_id ===
APP_ID=$(echo "$OUTPUT" | grep -o 'application_[0-9_]\+' | head -n1)

if [ -z "$APP_ID" ]; then
  echo "[ERROR] Failed to extract YARN application ID from output."
  echo "Please ensure the Flink command outputs an application ID (e.g., in yarn-application mode)."
  exit 1
fi

echo "[INFO] Detected YARN Application ID: $APP_ID"

# === 监控 YARN 应用状态 ===
SLEEP_INTERVAL=300  # 每300秒检查一次
# MAX_CHECKS=$(( MAX_WAIT_MINUTES * 60 / SLEEP_INTERVAL ))
CHECK_COUNT=0

while [ true ]; do
  sleep $SLEEP_INTERVAL
  CHECK_COUNT=$((CHECK_COUNT + 1))

  # 获取应用状态(使用 yarn application -status)
  STATUS_OUTPUT=$(yarn application -status "$APP_ID" 2>/dev/null || true)
  STATE_LINE=$(echo "$STATUS_OUTPUT" | grep -i "State :" || echo "")
  FINAL_STATE_LINE=$(echo "$STATUS_OUTPUT" | grep -i "Final-State :" || echo "")

  # 优先看 Final-State(作业结束后才有),否则看 State
  if [[ -n "$FINAL_STATE_LINE" ]]; then
    STATE=$(echo "$FINAL_STATE_LINE" | awk -F: '{print $2}' | tr -d '[:space:]')
  elif [[ -n "$STATE_LINE" ]]; then
    STATE=$(echo "$STATE_LINE" | awk -F: '{print $2}' | tr -d '[:space:]')
  else
    # 无法获取状态,可能应用已不存在
    echo "[WARN] Cannot retrieve status for $APP_ID. It may have been removed from YARN."
    echo "[INFO] Checking if app exists via 'yarn application -list'..."
    if yarn application -list 2>/dev/null | grep -q "$APP_ID"; then
      echo "[INFO] App still exists but status unknown. Continue monitoring..."
      continue
    else
      echo "[ERROR] Application $APP_ID not found in YARN. Assuming it failed or was killed."
      exit 1
    fi
  fi

  echo "[INFO] Current state of $APP_ID: $STATE (check $CHECK_COUNT)"

  case "$STATE" in
    "FINISHED")
      echo "[SUCCESS] Application $APP_ID finished successfully."
      exit 0
      ;;
    "FAILED"|"KILLED"|"LOST")
      echo "[ERROR] Application $APP_ID ended with state: $STATE"
      exit 1
      ;;
    "ACCEPTED"|"RUNNING"|"SUBMITTED")
      # 正常运行中,继续监控
      continue
      ;;
    *)
      echo "[WARN] Unknown state: $STATE. Continue monitoring..."
      continue
      ;;
  esac
done

# 超时
echo "[ERROR] Timeout after $MAX_WAIT_MINUTES minutes. Application $APP_ID did not finish."
exit 1

2) 在dolphin中配置shell任务:

flink/submit_flink_job.sh "$FLINK_HOME/bin/flink run-application -d \
-t yarn-application \
-D yarn.application.name="flink_kafka2paimon_prod_v2" \
-s hdfs:///user/flink/checkpoint/FlinkKafka2PaimonMultiOutStream/5be817b630ac8425afdb8c3b64e3fbfc/chk-292 \
-D containerized.master.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.taskmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.jobmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D jobmanager.memory.process.size=2048m \
-D taskmanager.memory.process.size=2048m \
-D taskmanager.numberOfTaskSlots=1 \
-D parallelism.default=1 \
-D yarn.application.queue=root.phoenix \
-D yarn.log-aggregation.enable=true \
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonMultiOutStream \
flink/my_bigdata-1.0-SNAPSHOT.jar"