在湖仓一体的实时数仓架构中,Paimon 凭借其流批一体、高吞吐、低延迟的存储特性,成为了数据湖建设的核心存储引擎;而Flink 作为主流的实时计算框架,与 Kafka、Paimon 的生态融合性极佳,是实现实时数据采集、处理、入湖的最优组合之一。本文将详细讲解基于 Flink DataStream API 实现 Kafka 多 Topic 数据读取,并将数据写入由 Hive Metastore 管理的 Paimon 表的完整方案,实现实时数仓 ODS 层的高效数据采集,且写入后的 Paimon 表可直接基于 Hive 进行离线统计分析,真正做到流批一体。

一、实时数仓数据采集整体流程

首先看一下实时数据采集的完整方案,覆盖从 MySQL 业务数据库到 Paimon 数据湖的全链路实时同步,解决了业务数据的实时采集、传输、处理和持久化问题,同时通过 Hive Metastore 管理 Paimon 元数据,打通了流处理和批处理的元数据壁垒。

整体技术流程如下:

  1. Binlog 采集:通过 Debezium 捕获 MySQL 的 Binlog 日志,将数据库的增删改操作解析为标准化的 CDC(变更数据捕获)事件;

  2. 实时传输:Debezium 将解析后的 CDC 事件发送至 Kafka,Kafka 作为实时数据总线,承接多业务、多表的实时数据,支持高并发生产和消费;

  3. 实时计算:基于 Flink StreamExecutionEnvironment 构建实时作业,通过 KafkaSource 读取 Kafka 中多 Topic 的 CDC 数据,形成 DataStream 流数据;

  4. 数据处理:Flink 对原始流数据进行解析、分流等轻量处理,适配 Paimon 表的写入格式;

  5. 数据入湖:通过 Paimon 提供的 FlinkSinkBuilder,将处理后的数据写入 Paimon 表,Paimon 的元数据由 Hive Metastore 统一管理,底层数据存储在 HDFS;

  6. 离线分析:Hive 可直接识别由 Hive Metastore 管理的 Paimon 表,无需额外适配,即可基于 Hive 进行离线统计、报表分析等操作。

核心组件链路:MySQL Binlog → Debezium → Kafka → Flink DataStream → Paimon(Hive Metastore管理)→ HDFS

同时 Paimon 与 Hive 联动,支持Hive → Paimon表的直接查询分析。

二、核心实现方案(Java API)

本次方案采用 Flink DataStream API 开发,可对 Kafka 原始数据进行自定义解析和分流处理,同时实现了Paimon 表自动创建多 Topic 正则匹配读取Exactly-Once 语义保证Hive 元数据联动等核心特性,以下是完整的代码实现和模块详解。

2.1 核心依赖说明

实现本方案需引入 Flink、Kafka、Paimon、Hive、Jackson 等核心依赖(Maven),关键依赖包括:

  • Flink Streamning & Kafka Connector:实现 Kafka 数据读取

  • Paimon Flink & Catalog:实现 Paimon 表的创建和写入

  • Paimon Hive Metastore:实现 Paimon 元数据与 Hive 联动

  • Jackson-databind:实现 JSON 格式 CDC 数据解析

  • Hive Metastore Client:对接 Hive 元数据服务

2.2 完整代码实现

下面代码是flink同步kafka多个topic数据写入paimon的逻辑。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.panda.bigdata.utils.DateTimeHelper;
import com.panda.bigdata.utils.MultiConfigHelper;
import com.panda.bigdata.utils.SafeStringSchema;
import com.panda.bigdata.utils.TimestampValidator;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * 采集kafka多个topic数据到paimon表中,paimon元数据写入hive,后续可以在hive中查看paimon表
 * 
 * 提交命令:
 */
public class FlinkKafka2PaimonMultiOutStream {


    // 定义 Side Output Tags
    private static final OutputTag<String> stream_paimon_tag = new OutputTag<>("stream-paimon"){};
    // private static final OutputTag<String> stream_doris_tag = new OutputTag<>("stream-doris"){};
    // 主流
    // private static final String stream_main_tag = "stream-other";

    public static void main(String[] args) throws Exception {
        final Logger log = LoggerFactory.getLogger(FlinkKafka2PaimonMultiOutStream.class);
        String configPath = args[0];

        String kafka_bootstrap_servers = MultiConfigHelper.getProperty(configPath, "kafka_bootstrap_servers");
        String kafk_topic_pattern = MultiConfigHelper.getProperty(configPath, "kafka_topic_pattern");
        String kafka_group_id = MultiConfigHelper.getProperty(configPath, "kafka_group_id");
        String kafka_offsets_initializer = MultiConfigHelper.getProperty(configPath, "kafka_offsets_initializer");
        // String hive_catalog_name = MultiConfigHelper.getProperty(configPath, "hive_catalog_name");
        String hive_metastore_uri = MultiConfigHelper.getProperty(configPath, "hive_metastore_uri");
        String paimon_default_db = MultiConfigHelper.getProperty(configPath, "paimon_default_db");
        String paimon_warehouse_path = MultiConfigHelper.getProperty(configPath, "paimon_warehouse_path");
        String paimon_ods_table_name = MultiConfigHelper.getProperty(configPath, "paimon_ods_table_name");
        String paimon_ods_db_name = MultiConfigHelper.getProperty(configPath, "paimon_ods_db_name");
        String flink_checkpoint_path = MultiConfigHelper.getProperty(configPath, "flink_checkpoint_path");

        log.info("配置文件:{}", configPath);
        log.info("配置信息:{}, {}, {}, {}", kafka_bootstrap_servers, kafk_topic_pattern, kafka_group_id, kafka_offsets_initializer);

        Configuration flinkConf = new Configuration();
        flinkConf.set(RestOptions.BIND_PORT, "8189"); // Rest端口
        flinkConf.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        flinkConf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, flink_checkpoint_path);
        flinkConf.setString("pipeline.object-reuse", "true"); // 启用对象复用
        flinkConf.setString("serialization.force-kryo", "false"); // 不强制所有类型用Kryo
        flinkConf.setString("serialization.kryo.registrator", "org.apache.flink.api.java.typeutils.runtime.kryo.FlinkKryoRegistrar");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(flinkConf);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
        env.enableCheckpointing(60000); // 建议开启检查点以保证 Exactly-Once
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        // 关键:作业取消后保留Checkpoint(否则无法用于恢复)
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 1. 配置并创建 Kafka Source
        OffsetsInitializer offsetsInitializer = OffsetsInitializer.earliest();
        if (kafka_offsets_initializer.equals("latest")){
            offsetsInitializer = OffsetsInitializer.latest();
        }
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers(kafka_bootstrap_servers)
                    .setTopicPattern(Pattern.compile(kafk_topic_pattern))
                    .setStartingOffsets(offsetsInitializer) // 从最早偏移量开始(可改为latest)
                    .setGroupId(kafka_group_id)
                    .setValueOnlyDeserializer(new SafeStringSchema()) // 读取原始JSON字符串
                    .build();

        DataStream<String> rawKafkaStream = env.fromSource(
                                kafkaSource, 
                                WatermarkStrategy.noWatermarks(), 
                                "flink_kafka2paimon_stream_" + kafk_topic_pattern.replace(".*", "")
                            );

        // 2. 解析 JSON 并进行分流
        SingleOutputStreamOperator<String> mainStream = rawKafkaStream
                .process(new ProcessFunction<String, String>() {
                    private static final long serialVersionUID = 1L;
                    // private final ObjectMapper objectMapper = new ObjectMapper();

                    @Override
                    public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
                        // JsonNode jsonNode = objectMapper.readTree(value);
                        
                        // String type = jsonNode.get("type").asText();

                        // 发送到主流
                        // out.collect(value);

                        //发送到paimon
                        ctx.output(stream_paimon_tag, value);
                        //发送到doris
                        // ctx.output(stream_doris_tag, value);
                    }
                });

        // --- DataStream API 写入 Paimon ---
        
        // 准备 Paimon Catalog 配置
        Options catalogOptions = new Options();
        catalogOptions.set("type", "paimon");
        catalogOptions.set("metastore", "hive");
        catalogOptions.set("uri", hive_metastore_uri);
        catalogOptions.set("default-database", paimon_default_db);
        catalogOptions.set("warehouse", paimon_warehouse_path);
        // hadoop-conf-dir 和 fs.defaultFS 可以通过 flink-conf.yaml 或环境变量设置,这里省略
        CatalogContext catalogContext = CatalogContext.create(catalogOptions);

        Catalog paimonCatalog = CatalogFactory.createCatalog(catalogContext);

        // 获取 Paimon 表实例
        Table paimonTable = getPaimonTable(paimonCatalog, paimon_ods_db_name, paimon_ods_table_name);
        // Table dorisTable = null;
        // Table tableOther = null;

        // 将分流后的数据写入对应的 Paimon 表
        DataStream<Row> paimonStream = mainStream.getSideOutput(stream_paimon_tag)
                .map(jsonStr -> getPaimonRowData(jsonStr))
        ;
        DataType rowDataType = DataTypes.ROW(
            DataTypes.FIELD("db_name", DataTypes.STRING()),
            DataTypes.FIELD("table_name", DataTypes.STRING()),
            DataTypes.FIELD("dt", DataTypes.STRING()),
            DataTypes.FIELD("create_time", DataTypes.STRING()),
            DataTypes.FIELD("raw_msg", DataTypes.STRING())
        );
        
        FlinkSinkBuilder flinkBuilder = new FlinkSinkBuilder(paimonTable)
            .forRow(paimonStream, rowDataType)
            ;
        flinkBuilder.build();

        env.execute("Flink Kafka 2 Paimon MultiStream Job (DataStream API)");
    }

    // 组装paimon行数据
    public static Row getPaimonRowData(String jsonStr) {
        String db_name = "";
        String table_name = "";
        String dt = DateTimeHelper.getCurrentTimestampFormatted(DateTimeHelper.DATE_MONTH_FORMATTER);
        String create_time = DateTimeHelper.getCurrentTimestampFormatted(DateTimeHelper.STANDARD_FORMATTER);
        
        try {
            ObjectMapper mapper = new ObjectMapper();
            JsonNode rootNode = mapper.readTree(jsonStr);

            JsonNode createTimeNode = rootNode.path("payload").path("after").path("create_time");
            
            if (!createTimeNode.isMissingNode() && !createTimeNode.isNull()) {
                // createTimeStr发现两种格式:1756940435000(13位时间戳) 和 2025-08-30T04:39:46Z(utc时间字符串),分别处理
                String createTimeStr = createTimeNode.asText();

                if (createTimeStr.contains("T") && createTimeStr.contains("z")) {
                    dt = DateTimeHelper.convertIsoToFormatted(createTimeStr, DateTimeHelper.DATE_MONTH_FORMATTER);

                } else if(TimestampValidator.isValidTimestamp(createTimeStr)){
                    dt = DateTimeHelper.convertTimestampToCustomFormat(createTimeStr, DateTimeHelper.DATE_MONTH_FORMATTER);
                }
            }

            db_name = rootNode.path("payload").path("source").path("db").asText();
            table_name = rootNode.path("payload").path("source").path("table").asText();
            if(db_name == null || db_name == ""){
                db_name = "other";
            }
            if(table_name == null  || table_name == ""){
                table_name = "other";
            }
        } catch(Exception e){
            System.out.println(e);
        }
        
        return Row.of(db_name, table_name, dt, create_time, jsonStr);
    }

    // 获取Paimon表
    private static Table getPaimonTable(Catalog catalog, String database, String tableName) {
        Identifier identifier = Identifier.create(database, tableName);
        Table table = null;
        try {
            List<String> tableList = catalog.listTables(database);
            if (!tableList.contains(tableName)) {
                System.out.println("Creating paimon table: " + tableName);
                
                // 2.5 创建表
                Map<String, String> tableOptions = new HashMap<>();
                tableOptions.put("file.format", "parquet"); // 文件格式:parquet
                // 分区提交策略
                tableOptions.put("sink.partition-commit.policy.kind", "metastore,success-file");
                // 启用Hive Metastore分区表
                tableOptions.put("metastore.partitioned-table", "true");
                tableOptions.put("hive.input.recursive.dir", "true");
                tableOptions.put("hive.input.dir.recursion.level", "1");


                Schema schema = Schema.newBuilder()
                    .column("db_name", org.apache.paimon.types.DataTypes.STRING())         // 数据库名(分区字段)
                    .column("table_name", org.apache.paimon.types.DataTypes.STRING())            // 表名(分区字段)
                    .column("dt", org.apache.paimon.types.DataTypes.STRING())               // 日期分区(如yyyyMM,分区字段)
                    .column("create_time", org.apache.paimon.types.DataTypes.STRING())
                    .column("raw_msg", org.apache.paimon.types.DataTypes.STRING())
                    .partitionKeys("db_name", "table_name", "dt")
                    .options(tableOptions)
                    .build();

                catalog.createTable(identifier, schema, true);
            }

            table = catalog.getTable(identifier);
        } catch (Exception e) {
            throw new RuntimeException("创建paimon表失败: " + tableName, e);
        }
        return table;
    }
}

2.3 核心模块详解

本次作业重点配置了 Flink 的Checkpoint重启策略,保证了数据写入的 Exactly-Once 语义,同时提升了作业的容错性:

  • 开启 60 秒一次的 Checkpoint,设置为 EXACTLY_ONCE 模式,确保数据仅被写入一次;

  • 作业取消后保留 Checkpoint,支持故障后无缝恢复;

  • 固定延迟重启策略,作业异常时自动重启 3 次,降低人工介入成本;

  • 启用对象复用,提升 Flink 作业的运行性能。

(2)Kafka 多 Topic 读取

通过setTopicPattern实现 Kafka 多 Topic 的正则匹配读取,无需逐个配置 Topic,适配业务表不断扩展的场景;同时支持配置起始偏移量(earliest/latest),满足首次全量同步和后续增量同步的需求。

(3)数据分流与解析

通过 Flink 的侧输出流(OutputTag) 实现数据分流,本次将数据全部发送至 Paimon 侧输出流,可轻松扩展至 Doris、ClickHouse 等其他存储;getPaimonRowData方法实现了 CDC 数据的自适应解析,支持 13 位时间戳和 ISO UTC 时间两种格式,同时提取源库名、表名,补充分区字段,保留原始消息,既保证了数据的规范性,又保留了全量原始数据。

(4)Paimon Catalog 与 Hive Metastore 联动

通过 Paimon 的CatalogFactory创建对接 Hive Metastore 的 Catalog,核心配置metastore: hivehive.metastore.uri,实现 Paimon 元数据由 Hive 统一管理,让 Hive 可直接识别 Paimon 表。

(5)Paimon 表自动创建

getPaimonTable方法实现了 Paimon 表的懒加载创建,若表不存在则根据预定义的结构自动创建,无需手动在 Hive 或 Paimon 中建表,简化了运维流程;同时配置了 Parquet 列存格式、复合分区键(db_name/table_name/dt),提升了数据的存储和查询效率。

(6)Paimon 数据写入

通过 Paimon 提供的FlinkSinkBuilder,将 Flink 的 DataStream<Row>数据直接写入 Paimon 表,底层由 Paimon 优化写入逻辑,支持高吞吐、低延迟的实时写入,同时结合 Flink 的 Checkpoint 保证 Exactly-Once。

三、核心配置文件说明

本次方案将所有核心参数抽离至配置文件(kafka2paimon_withhivecatalog.conf),实现代码与配置解耦,便于不同环境(测试 / 生产)的参数修改,核心配置项如下:

# Kafka配置
kafka_bootstrap_servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
kafka_topic_pattern=cdc_.* # 正则匹配所有以cdc_开头的Topic
kafka_group_id=flink_kafka2paimon
kafka_offsets_initializer=latest # 起始偏移量:earliest/latest

# Hive Metastore配置
hive_metastore_uri=thrift://hive-metastore:9083

# Paimon配置
paimon_default_db=default
paimon_warehouse_path=hdfs://hdfs-cluster/paimon/warehouse
paimon_ods_db_name=ods_paimon
paimon_ods_table_name=ods_cdc_all

# Flink Checkpoint配置
flink_checkpoint_path=hdfs://hdfs-cluster/flink/checkpoint/kafka2paimon

四、作业提交与运行注意事项

  1. Flink 集群配置:确保 Flink 集群的flink-conf.yaml配置了 HDFS 和 Hive 的相关依赖,同时配置了足够的内存和并行度,适配 Kafka 的高吞吐;

  2. 依赖包准备:将 Flink、Kafka、Paimon、Hive 的核心依赖包放入 Flink 的lib目录,或通过-yD参数指定依赖包路径,避免作业运行时出现类缺失;

  3. Hive Metastore 联动:确保 Hive Metastore 服务正常运行,且 Paimon 的 warehouse 路径在 HDFS 上有读写权限,Hive 可访问该路径;

  4. 作业提交命令:通过 Flink run 命令提交作业,指定配置文件路径,示例:

    flink run-application -d \
    -t yarn-application \
    -c com.panda.bigdata.flink.FlinkKafka2PaimonMultiOutStream \
    flink-bigdata-1.0-SNAPSHOT.jar \
    /opt/conf/kafka2paimon_withhivecatalog.conf

  5. 日志排查:作业运行过程中,可通过 Flink Web UI 或 Yarn 日志查看作业状态,若出现数据解析失败,日志会打印原始数据和异常信息,便于问题定位;

  6. Hive 查询验证:作业运行后,可直接在 Hive 中执行show tables in ods_paimon;select * from ods_paimon.ods_cdc_all limit 10;验证数据是否写入成功。

五、总结

本文实现的基于 Flink DataStream API 的 Kafka 到 Paimon 的实时同步方案,打通了从业务数据库到数据湖的全链路实时采集,同时通过 Hive Metastore 管理 Paimon 元数据,实现了流批一体的数据分析。该方案兼具实时性、可靠性、可扩展性和易用性,解决了传统数仓中流批数据分离、元数据不一致、运维成本高等问题,是湖仓一体架构下实时数仓 ODS 层建设的优质方案。