在湖仓一体的大数据架构趋势下,Paimon(原 Flink Table Store)凭借流批一体的特性成为数据湖建设的优选方案,而Flink作为新一代流批一体计算引擎,能高效实现实时数据的接入与处理,Hive Metastore则为跨引擎的元数据管理提供了标准化能力。本文将详细讲解如何通过 Flink Table API 实现 Kafka 实时数据读取,并将数据写入基于 Hive Metastore 管理元数据的 Paimon 数据湖,真正实现湖仓一体化的数仓建设。

一、技术选型与核心优势

1. 核心组件

  • Flink:作为计算引擎,提供 Table API/SQL 的声明式开发能力,原生支持流批一体,通过 Checkpoint 机制保证数据处理的 Exactly-Once 语义,是实时数据入湖的最佳选择。

  • Kafka:作为实时数据采集的中间层,承接上游业务系统的流式数据,提供高吞吐、低延迟的消息传递能力。

  • Paimon:新一代流批一体数据湖存储,支持 LSM 树存储结构、高效的 Changelog 生成与 Compaction 机制,完美适配 Flink 的实时写入与批式查询。

  • Hive Metastore:作为元数据中心,统一管理 Paimon 表的库、表、分区等元数据,实现 Flink、Hive、Spark 等多引擎的元数据互通,真正打通湖仓壁垒。

2. 方案核心优势

  1. 湖仓一体:基于 Hive Metastore 实现元数据统一管理,Paimon 表可直接被 Hive/Spark 查询,流批数据无缝互通;

  2. 语义保障:通过 Flink Checkpoint 机制实现数据写入的 Exactly-Once,避免数据重复或丢失;

  3. 灵活扩展:基于 Flink Table API/SQL 开发,支持动态调整源表 / 目标表配置,适配多业务场景;

  4. 性能优异:Paimon 的 LSM 树结构与 Compaction 机制有效解决小文件问题,兼顾写入与查询性能。

三、核心实现思路

本方案通过 Flink Table API 实现端到端的实时数据入湖,整体流程如下:

  1. 配置与传参解析:读取 Flink 配置文件,解析命令行传入的 Catalog / 表 DDL 与插入 SQL(基于 Base64 解码,避免特殊字符传参问题);

  2. Flink 环境初始化:配置 Flink 流执行环境,开启 Checkpoint、设置重启策略、启用对象复用等核心优化;

  3. 元数据层注册:通过 DDL 注册基于 Hive Metastore 的 Paimon Catalog,并设置为默认 Catalog;

  4. 源表 / 目标表注册:注册 Kafka 流式源表,创建 Paimon 目标表(元数据同步至 Hive Metastore);

  5. 数据写入执行:通过 INSERT SQL 将 Kafka 源表数据写入 Paimon 目标表,Flink 自动优化多流写入为单一 Job Graph,提升执行效率。

四、代码解析

以下是完整的实现代码,并对核心逻辑与关键配置进行逐段解析,代码采用Java开发,基于 Flink Table API 实现流处理。

1. 完整实现代码

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.panda.bigdata.utils.Base64Decoder;
import com.panda.bigdata.utils.MultiConfigHelper;

public class FlinkKafka2PaimonByTable {

    public static void main(String[] args) throws Exception {
        final Logger log = LoggerFactory.getLogger(FlinkKafka2PaimonByTable.class);

        String configPath = args[0]
        log.info("配置文件:%s".formatted(configPath));
        String kafkaSourceDDL = Base64Decoder.decodeBase64ToString(args[1]);
        String paimonTableDDL = Base64Decoder.decodeBase64ToString(args[2]);
        String insertSql = Base64Decoder.decodeBase64ToString(args[3]);

        log.info("## kafkaSourceDDL:%s".formatted(kafkaSourceDDL));
        log.info("## paimonTableDDL:%s".formatted(paimonTableDDL));
        log.info("## insertSql:%s".formatted(insertSql));

        String flink_checkpoint_path = MultiConfigHelper.getProperty(configPath, "flink_checkpoint_path");

        Configuration flinkConf = new Configuration();
        flinkConf.set(RestOptions.BIND_PORT, "8189"); // Rest端口
        flinkConf.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        flinkConf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, flink_checkpoint_path);

        flinkConf.setString("pipeline.object-reuse", "true"); // 启用对象复用
        flinkConf.setString("serialization.force-kryo", "false"); // 不强制所有类型用Kryo
        flinkConf.setString("serialization.kryo.registrator", "org.apache.flink.api.java.typeutils.runtime.kryo.FlinkKryoRegistrar");
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(flinkConf);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
        env.enableCheckpointing(60000); // 建议开启检查点以保证 Exactly-Once
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(5);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        // 关键:作业取消后保留Checkpoint(否则无法用于恢复)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 1. 注册 Paimon Catalog (在 Java 代码中)

        String hiveCatalogDDL= """
        create catalog my_paimon_catalog_byhive with (
            'type' = 'paimon',
            'metastore' = 'hive',
            'uri' = 'thrift://192.168.1.211:9083',
            'hive-conf-dir' = '/opt/apps/bigdata/apache-hive-2.3.9-bin/conf',
            'hadoop-conf-dir' = '/opt/apps/bigdata/hadoop-2.10.2/etc/hadoop',
            'warehouse' = 'hdfs://192.168.1.211:9000/user/paimon/warehouse',
            'default-database'='ods_paimon'
        );        
        """;
        tEnv.executeSql(hiveCatalogDDL);
        tEnv.useCatalog("my_paimon_catalog_byhive");

        // 2. 在 Flink SQL 中注册 Kafka Source 表
        tEnv.executeSql(kafkaSourceDDL);

        // 3. 创建paimon目标表
        tEnv.executeSql(paimonTableDDL);

        // 4. 执行 SQL 查询并插入到不同的 Paimon 表
        tEnv.executeSql(insertSql);

        // 如果有INSERT语句,flink会自动触发job,没有insert语句需要显示执行env.execute()
        // env.execute("Flink Kafka to Paimon MultiStream Job (SQL API)");
        
        // **重要**: 当有多个 INSERT 语句时,Flink 会将它们优化成一个 Job Graph。
        // executeSql 对于 DML 语句 (如 INSERT) 会立即触发 job 提交。
        // 因此,上面三个 executeSql 调用会启动一个包含三个 sink 的单一 Flink 作业。
        // env.execute("Flink Kafka to Paimon MultiStream Job (SQL API)");
    }
}

2. 核心逻辑解析

(1)传参与配置解析

  • 配置文件默认路径为项目资源目录,支持命令行自定义传入,提高灵活性;

  • 所有 DDL 与 SQL 均通过Base64 编码后从命令行传入,解码后执行,避免特殊字符(如空格、引号、回车)在命令行传参时被转义,导致语法错误;

  • 读取配置文件中的 Checkpoint 存储路径,解耦配置与代码,便于生产环境调整。

  • 对象复用pipeline.object-reuse=true,减少 Flink 运行时对象创建与 GC 开销,提升实时处理性能;

  • 序列化优化:不强制 Kryo 序列化,仅对自定义类型使用 Kryo,兼顾序列化效率与兼容性;

  • Rest 端口:自定义绑定 8189 端口,避免与 Flink 集群默认端口冲突。

(3)Checkpoint 核心配置(Exactly-Once 保障)

Checkpoint 是 Flink 实现 Exactly-Once 的核心,本方案的关键配置如下:

  • 检查点间隔:60s,可根据业务实时性要求调整(间隔过小会生成大量小文件,过大则增加数据延迟);

  • 模式EXACTLY_ONCE,保证数据在生产、传输、处理、写入全链路仅被处理一次,需要配合paimon表的PRIMARY KEY配置

  • 外部化检查点RETAIN_ON_CANCELLATION,作业手动取消后保留 Checkpoint 文件,支持故障后基于最新 Checkpoint 恢复,避免数据重跑;

  • 并发控制:最大并发 1 个,最小暂停 5s,避免多个 Checkpoint 同时执行导致的资源竞争,保证 Checkpoint 成功率。

(4)Paimon Catalog 注册

通过执行 DDL 创建基于 Hive Metastore 的 Paimon Catalog,该 Catalog 的元数据会同步至 Hive Metastore,Paimon 表可直接在 Hive 中通过show tables查看并查询。

(5)多流写入优化

当存在多个 INSERT 语句时,Flink Table API 会自动将其优化为单一 Job Graph,所有写入任务共享同一个 Flink 作业,减少资源占用;executeSql执行 DML 语句时会直接提交作业,无需手动调用env.execute()

五、运行部署步骤

1. 配置文件编写

flink_kafka2paimon_bytable.conf中配置 Checkpoint 存储路径(建议使用 HDFS,保证高可用):

# Flink Checkpoint存储路径(可以是本地文件,也可以是HDFS文件)
flink_checkpoint_path=/home/phoenix/data/dolphinscheduler332/phoenix/resources/flink/flink_kafka2paimon_bytable.conf 

2. DDL/SQL Base64 编码

将 Kafka 源表 DDL、Paimon 目标表 DDL、INSERT SQL 分别进行 Base64 编码,例如在线编码或通过代码工具编码,避免命令行传参问题。

kafkaSourceDDL=$(echo "
create temporary table ods_kafka_source_raw_test_rt (
    raw_message string
) WITH (
    'connector' = 'kafka',
    'topic' = 'mysql.finance.kafka_source',
    'properties.bootstrap.servers' = '192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092',
    'properties.group.id' = 'flink_debezium_finance_ods_raw',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'raw'  
);
" | base64 | tr -d '[:space:]')

paimonTableDDL=$(echo "
create table if not exists ods_paimon.ods_paimon_mysql_table_di_test (
    create_time string comment '创建时间',
    db_name string comment '数据库名称',
    table_name string comment '表名称',
    dt_month string comment '日期:yyyy-MM',
    
    raw_msg string comment '原始数据'
) partitioned by (db_name, table_name, dt_month)  
with (
    'connector' = 'paimon',
    'bucket' = '1',  -- 桶的数量,根据实际情况调整
    'bucket-key' = 'create_time',
    'sink.partition-commit.policy.kind' = 'metastore,success-file',
    'file.format' = 'parquet',
    'metastore.partitioned-table'= 'true',
    'hive.input.recursive.dir'='true',
    'hive.input.dir.recursion.level'='1'
);
" | base64 | tr -d '[:space:]')

insertSql=$(echo "
insert into ods_paimon.ods_paimon_mysql_table_di_test
select 
    date_format(now(), 'yyyy-MM-dd HH:mm:ss') create_time,
    raw_message,
    json_value(raw_message, '$.payload.source.db') db_name,
    json_value(raw_message, '$.payload.source.table') table_name,
    date_format(now(), 'yyyyMM') dt_month
from ods_kafka_source_raw_test_rt;
" | base64 | tr -d '[:space:]')

3. 项目打包

通过 Maven 进行项目打包,排除依赖(Flink 核心依赖由集群提供):

mvn clean package -DskipTests -Pflink-cluster

执行以下命令提交作业(替换实际参数):

$FLINK_HOME/bin/flink run-application -d \
-t yarn-application \
-D yarn.application.name="flink_kafka2paimon_bytable_v1" \
-D containerized.master.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.taskmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D containerized.jobmanager.env.JAVA_HOME=/home/phoenix/apps/jdk-19.0.2 \
-D jobmanager.memory.process.size=1024m \
-D taskmanager.memory.process.size=1024m \
-D taskmanager.numberOfTaskSlots=1 \
-D parallelism.default=1 \
-D yarn.application.queue=root.phoenix \
-D yarn.log-aggregation.enable=true \
-D yarn.ship-files=/opt/apps/bigdata/apache-hive-2.3.9-bin/conf \
-D yarn.ship-files=/opt/apps/bigdata/hadoop-2.10.2/etc/hadoop \
-D yarn.ship-files=/home/phoenix/data/dolphinscheduler332/phoenix/resources/flink/flink_kafka2paimon_bytable.conf \
-c com.panda.bigdata.kafka2paimon.FlinkKafka2PaimonByTable \
/home/phoenix/data/dolphinscheduler332/phoenix/resources/flink/my_bigdata-1.0-SNAPSHOT.jar \
flink_kafka2paimon_bytable.conf \
$kafkaSourceDDL $paimonTableDDL $insertSql
  • -d:后台运行作业;

  • yarn.ship-files:用于指定需要随 Flink 应用程序一起分发到 YARN 集群中各个节点的额外文件。

六、生产环境优化建议

  1. 资源配置:根据数据量调整 TaskManager 的taskmanager.numberOfTaskSlots、JobManager 的jobmanager.memory.process.size,保证资源充足;

  2. 并发度:设置 Kafka Source 与 Paimon Sink 的并发度,建议与 Kafka 分区数、Paimon 分桶数一致,避免数据倾斜;

  3. 重启策略:生产环境可使用failure-rate重启策略,避免作业因临时故障频繁重启。

2. Paimon 表配置优化

Paimon 表的配置直接影响写入性能与小文件治理,核心配置如下:

CREATE TABLE paimon_table (
  id BIGINT PRIMARY KEY,
  name STRING,
  create_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING)
WITH (
  'connector' = 'paimon',
  'bucket' = '8', -- 分桶数,建议与Sink并发度一致
  'target-file-size' = '128MB', -- 目标文件大小,减少小文件
  'write-buffer-size' = '256MB', -- 写缓冲区大小,减少刷盘次数
  'write-buffer-spillable' = 'true', -- 开启缓冲区溢出,避免OOM
  'changelog-producer' = 'full-compaction' -- 基于Compaction生成Changelog,兼顾性能与实时性
);

3. 小文件治理

Paimon 的小文件问题主要由 Checkpoint 间隔过小、并发度过高导致,可通过以下方式治理:

  1. 适当增大 Checkpoint 间隔(如 60s-300s),平衡实时性与文件数量;

  2. 开启 Paimon 自动 Compaction,设置compaction.trigger.ratio=0.8,当小文件占比达到 80% 时自动合并;

  3. 限制 Sink 并发度,避免每个并发子任务都生成小文件。

4. 监控告警

  1. 监控 Flink 作业的 Checkpoint 成功率、延迟,当成功率低于 95% 时及时告警;

  2. 监控 Paimon 表的文件数量、Compaction 执行情况,避免小文件堆积;

  3. 监控 Hive Metastore 的运行状态,保证元数据同步正常。

七、总结

通过 Flink Table API 实现了 Kafka 实时数据到 Paimon 数据湖的写入,并基于 Hive Metastore 实现了元数据的统一管理,成功落地湖仓一体的大数据架构。该方案不仅保证了数据实时写入,还实现了 Flink、Hive、Spark 多引擎的元数据互通,让流式数据既能实时入湖,又能被批式引擎高效查询。

在生产环境中,可基于该方案进行扩展:例如增加数据清洗、维度关联等业务逻辑,或结合 Paimon 的 Changelog 特性实现数仓的实时分层(DWD→DWS→ADS)。同时,通过合理的配置调优与小文件治理,可兼顾系统的写入性能与查询性能,为企业的实时数仓建设提供可靠的技术支撑。