Flink DataStream读取Kafka写入Paimon(使用Hive Metastore)
在湖仓一体的实时数仓架构中,Paimon 凭借其流批一体、高吞吐、低延迟的存储特性,成为了数据湖建设的核心存储引擎;而Flink 作为主流的实时计算框架,与 Kafka、Paimon 的生态融合性极佳,是实现实时数据采集、处理、入湖的最优组合之一。本文将详细讲解基于 Flink DataStream API 实现 Kafka 多 Topic 数据读取,并将数据写入由 Hive Metastore 管理的 Paimon 表的完整方案,实现实时数仓 ODS 层的高效数据采集,且写入后的 Paimon 表可直接基于 Hive 进行离线统计分析,真正做到流批一体。
一、实时数仓数据采集整体流程
首先看一下实时数据采集的完整方案,覆盖从 MySQL 业务数据库到 Paimon 数据湖的全链路实时同步,解决了业务数据的实时采集、传输、处理和持久化问题,同时通过 Hive Metastore 管理 Paimon 元数据,打通了流处理和批处理的元数据壁垒。
整体技术流程如下:
Binlog 采集:通过 Debezium 捕获 MySQL 的 Binlog 日志,将数据库的增删改操作解析为标准化的 CDC(变更数据捕获)事件;
实时传输:Debezium 将解析后的 CDC 事件发送至 Kafka,Kafka 作为实时数据总线,承接多业务、多表的实时数据,支持高并发生产和消费;
实时计算:基于 Flink StreamExecutionEnvironment 构建实时作业,通过 KafkaSource 读取 Kafka 中多 Topic 的 CDC 数据,形成 DataStream 流数据;
数据处理:Flink 对原始流数据进行解析、分流等轻量处理,适配 Paimon 表的写入格式;
数据入湖:通过 Paimon 提供的 FlinkSinkBuilder,将处理后的数据写入 Paimon 表,Paimon 的元数据由 Hive Metastore 统一管理,底层数据存储在 HDFS;
离线分析:Hive 可直接识别由 Hive Metastore 管理的 Paimon 表,无需额外适配,即可基于 Hive 进行离线统计、报表分析等操作。
核心组件链路:MySQL Binlog → Debezium → Kafka → Flink DataStream → Paimon(Hive Metastore管理)→ HDFS
同时 Paimon 与 Hive 联动,支持Hive → Paimon表的直接查询分析。

二、核心实现方案(Java API)
本次方案采用 Flink DataStream API 开发,可对 Kafka 原始数据进行自定义解析和分流处理,同时实现了Paimon 表自动创建、多 Topic 正则匹配读取、Exactly-Once 语义保证、Hive 元数据联动等核心特性,以下是完整的代码实现和模块详解。
2.1 核心依赖说明
实现本方案需引入 Flink、Kafka、Paimon、Hive、Jackson 等核心依赖(Maven),关键依赖包括:
Flink Streamning & Kafka Connector:实现 Kafka 数据读取
Paimon Flink & Catalog:实现 Paimon 表的创建和写入
Paimon Hive Metastore:实现 Paimon 元数据与 Hive 联动
Jackson-databind:实现 JSON 格式 CDC 数据解析
Hive Metastore Client:对接 Hive 元数据服务
2.2 完整代码实现
下面代码是flink同步kafka多个topic数据写入paimon的逻辑。
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.panda.bigdata.utils.DateTimeHelper;
import com.panda.bigdata.utils.MultiConfigHelper;
import com.panda.bigdata.utils.SafeStringSchema;
import com.panda.bigdata.utils.TimestampValidator;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* 采集kafka多个topic数据到paimon表中,paimon元数据写入hive,后续可以在hive中查看paimon表
*
* 提交命令:
*/
public class FlinkKafka2PaimonMultiOutStream {
// 定义 Side Output Tags
private static final OutputTag<String> stream_paimon_tag = new OutputTag<>("stream-paimon"){};
// private static final OutputTag<String> stream_doris_tag = new OutputTag<>("stream-doris"){};
// 主流
// private static final String stream_main_tag = "stream-other";
public static void main(String[] args) throws Exception {
final Logger log = LoggerFactory.getLogger(FlinkKafka2PaimonMultiOutStream.class);
String configPath = args[0];
String kafka_bootstrap_servers = MultiConfigHelper.getProperty(configPath, "kafka_bootstrap_servers");
String kafk_topic_pattern = MultiConfigHelper.getProperty(configPath, "kafka_topic_pattern");
String kafka_group_id = MultiConfigHelper.getProperty(configPath, "kafka_group_id");
String kafka_offsets_initializer = MultiConfigHelper.getProperty(configPath, "kafka_offsets_initializer");
// String hive_catalog_name = MultiConfigHelper.getProperty(configPath, "hive_catalog_name");
String hive_metastore_uri = MultiConfigHelper.getProperty(configPath, "hive_metastore_uri");
String paimon_default_db = MultiConfigHelper.getProperty(configPath, "paimon_default_db");
String paimon_warehouse_path = MultiConfigHelper.getProperty(configPath, "paimon_warehouse_path");
String paimon_ods_table_name = MultiConfigHelper.getProperty(configPath, "paimon_ods_table_name");
String paimon_ods_db_name = MultiConfigHelper.getProperty(configPath, "paimon_ods_db_name");
String flink_checkpoint_path = MultiConfigHelper.getProperty(configPath, "flink_checkpoint_path");
log.info("配置文件:{}", configPath);
log.info("配置信息:{}, {}, {}, {}", kafka_bootstrap_servers, kafk_topic_pattern, kafka_group_id, kafka_offsets_initializer);
Configuration flinkConf = new Configuration();
flinkConf.set(RestOptions.BIND_PORT, "8189"); // Rest端口
flinkConf.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
flinkConf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, flink_checkpoint_path);
flinkConf.setString("pipeline.object-reuse", "true"); // 启用对象复用
flinkConf.setString("serialization.force-kryo", "false"); // 不强制所有类型用Kryo
flinkConf.setString("serialization.kryo.registrator", "org.apache.flink.api.java.typeutils.runtime.kryo.FlinkKryoRegistrar");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(flinkConf);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
env.enableCheckpointing(60000); // 建议开启检查点以保证 Exactly-Once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
// 关键:作业取消后保留Checkpoint(否则无法用于恢复)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 1. 配置并创建 Kafka Source
OffsetsInitializer offsetsInitializer = OffsetsInitializer.earliest();
if (kafka_offsets_initializer.equals("latest")){
offsetsInitializer = OffsetsInitializer.latest();
}
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(kafka_bootstrap_servers)
.setTopicPattern(Pattern.compile(kafk_topic_pattern))
.setStartingOffsets(offsetsInitializer) // 从最早偏移量开始(可改为latest)
.setGroupId(kafka_group_id)
.setValueOnlyDeserializer(new SafeStringSchema()) // 读取原始JSON字符串
.build();
DataStream<String> rawKafkaStream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"flink_kafka2paimon_stream_" + kafk_topic_pattern.replace(".*", "")
);
// 2. 解析 JSON 并进行分流
SingleOutputStreamOperator<String> mainStream = rawKafkaStream
.process(new ProcessFunction<String, String>() {
private static final long serialVersionUID = 1L;
// private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
// JsonNode jsonNode = objectMapper.readTree(value);
// String type = jsonNode.get("type").asText();
// 发送到主流
// out.collect(value);
//发送到paimon
ctx.output(stream_paimon_tag, value);
//发送到doris
// ctx.output(stream_doris_tag, value);
}
});
// --- DataStream API 写入 Paimon ---
// 准备 Paimon Catalog 配置
Options catalogOptions = new Options();
catalogOptions.set("type", "paimon");
catalogOptions.set("metastore", "hive");
catalogOptions.set("uri", hive_metastore_uri);
catalogOptions.set("default-database", paimon_default_db);
catalogOptions.set("warehouse", paimon_warehouse_path);
// hadoop-conf-dir 和 fs.defaultFS 可以通过 flink-conf.yaml 或环境变量设置,这里省略
CatalogContext catalogContext = CatalogContext.create(catalogOptions);
Catalog paimonCatalog = CatalogFactory.createCatalog(catalogContext);
// 获取 Paimon 表实例
Table paimonTable = getPaimonTable(paimonCatalog, paimon_ods_db_name, paimon_ods_table_name);
// Table dorisTable = null;
// Table tableOther = null;
// 将分流后的数据写入对应的 Paimon 表
DataStream<Row> paimonStream = mainStream.getSideOutput(stream_paimon_tag)
.map(jsonStr -> getPaimonRowData(jsonStr))
;
DataType rowDataType = DataTypes.ROW(
DataTypes.FIELD("db_name", DataTypes.STRING()),
DataTypes.FIELD("table_name", DataTypes.STRING()),
DataTypes.FIELD("dt", DataTypes.STRING()),
DataTypes.FIELD("create_time", DataTypes.STRING()),
DataTypes.FIELD("raw_msg", DataTypes.STRING())
);
FlinkSinkBuilder flinkBuilder = new FlinkSinkBuilder(paimonTable)
.forRow(paimonStream, rowDataType)
;
flinkBuilder.build();
env.execute("Flink Kafka 2 Paimon MultiStream Job (DataStream API)");
}
// 组装paimon行数据
public static Row getPaimonRowData(String jsonStr) {
String db_name = "";
String table_name = "";
String dt = DateTimeHelper.getCurrentTimestampFormatted(DateTimeHelper.DATE_MONTH_FORMATTER);
String create_time = DateTimeHelper.getCurrentTimestampFormatted(DateTimeHelper.STANDARD_FORMATTER);
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(jsonStr);
JsonNode createTimeNode = rootNode.path("payload").path("after").path("create_time");
if (!createTimeNode.isMissingNode() && !createTimeNode.isNull()) {
// createTimeStr发现两种格式:1756940435000(13位时间戳) 和 2025-08-30T04:39:46Z(utc时间字符串),分别处理
String createTimeStr = createTimeNode.asText();
if (createTimeStr.contains("T") && createTimeStr.contains("z")) {
dt = DateTimeHelper.convertIsoToFormatted(createTimeStr, DateTimeHelper.DATE_MONTH_FORMATTER);
} else if(TimestampValidator.isValidTimestamp(createTimeStr)){
dt = DateTimeHelper.convertTimestampToCustomFormat(createTimeStr, DateTimeHelper.DATE_MONTH_FORMATTER);
}
}
db_name = rootNode.path("payload").path("source").path("db").asText();
table_name = rootNode.path("payload").path("source").path("table").asText();
if(db_name == null || db_name == ""){
db_name = "other";
}
if(table_name == null || table_name == ""){
table_name = "other";
}
} catch(Exception e){
System.out.println(e);
}
return Row.of(db_name, table_name, dt, create_time, jsonStr);
}
// 获取Paimon表
private static Table getPaimonTable(Catalog catalog, String database, String tableName) {
Identifier identifier = Identifier.create(database, tableName);
Table table = null;
try {
List<String> tableList = catalog.listTables(database);
if (!tableList.contains(tableName)) {
System.out.println("Creating paimon table: " + tableName);
// 2.5 创建表
Map<String, String> tableOptions = new HashMap<>();
tableOptions.put("file.format", "parquet"); // 文件格式:parquet
// 分区提交策略
tableOptions.put("sink.partition-commit.policy.kind", "metastore,success-file");
// 启用Hive Metastore分区表
tableOptions.put("metastore.partitioned-table", "true");
tableOptions.put("hive.input.recursive.dir", "true");
tableOptions.put("hive.input.dir.recursion.level", "1");
Schema schema = Schema.newBuilder()
.column("db_name", org.apache.paimon.types.DataTypes.STRING()) // 数据库名(分区字段)
.column("table_name", org.apache.paimon.types.DataTypes.STRING()) // 表名(分区字段)
.column("dt", org.apache.paimon.types.DataTypes.STRING()) // 日期分区(如yyyyMM,分区字段)
.column("create_time", org.apache.paimon.types.DataTypes.STRING())
.column("raw_msg", org.apache.paimon.types.DataTypes.STRING())
.partitionKeys("db_name", "table_name", "dt")
.options(tableOptions)
.build();
catalog.createTable(identifier, schema, true);
}
table = catalog.getTable(identifier);
} catch (Exception e) {
throw new RuntimeException("创建paimon表失败: " + tableName, e);
}
return table;
}
}2.3 核心模块详解
(1)Flink 环境与容错配置
本次作业重点配置了 Flink 的Checkpoint和重启策略,保证了数据写入的 Exactly-Once 语义,同时提升了作业的容错性:
开启 60 秒一次的 Checkpoint,设置为 EXACTLY_ONCE 模式,确保数据仅被写入一次;
作业取消后保留 Checkpoint,支持故障后无缝恢复;
固定延迟重启策略,作业异常时自动重启 3 次,降低人工介入成本;
启用对象复用,提升 Flink 作业的运行性能。
(2)Kafka 多 Topic 读取
通过setTopicPattern实现 Kafka 多 Topic 的正则匹配读取,无需逐个配置 Topic,适配业务表不断扩展的场景;同时支持配置起始偏移量(earliest/latest),满足首次全量同步和后续增量同步的需求。
(3)数据分流与解析
通过 Flink 的侧输出流(OutputTag) 实现数据分流,本次将数据全部发送至 Paimon 侧输出流,可轻松扩展至 Doris、ClickHouse 等其他存储;getPaimonRowData方法实现了 CDC 数据的自适应解析,支持 13 位时间戳和 ISO UTC 时间两种格式,同时提取源库名、表名,补充分区字段,保留原始消息,既保证了数据的规范性,又保留了全量原始数据。
(4)Paimon Catalog 与 Hive Metastore 联动
通过 Paimon 的CatalogFactory创建对接 Hive Metastore 的 Catalog,核心配置metastore: hive和hive.metastore.uri,实现 Paimon 元数据由 Hive 统一管理,让 Hive 可直接识别 Paimon 表。
(5)Paimon 表自动创建
getPaimonTable方法实现了 Paimon 表的懒加载创建,若表不存在则根据预定义的结构自动创建,无需手动在 Hive 或 Paimon 中建表,简化了运维流程;同时配置了 Parquet 列存格式、复合分区键(db_name/table_name/dt),提升了数据的存储和查询效率。
(6)Paimon 数据写入
通过 Paimon 提供的FlinkSinkBuilder,将 Flink 的 DataStream<Row>数据直接写入 Paimon 表,底层由 Paimon 优化写入逻辑,支持高吞吐、低延迟的实时写入,同时结合 Flink 的 Checkpoint 保证 Exactly-Once。
三、核心配置文件说明
本次方案将所有核心参数抽离至配置文件(kafka2paimon_withhivecatalog.conf),实现代码与配置解耦,便于不同环境(测试 / 生产)的参数修改,核心配置项如下:
# Kafka配置
kafka_bootstrap_servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
kafka_topic_pattern=cdc_.* # 正则匹配所有以cdc_开头的Topic
kafka_group_id=flink_kafka2paimon
kafka_offsets_initializer=latest # 起始偏移量:earliest/latest
# Hive Metastore配置
hive_metastore_uri=thrift://hive-metastore:9083
# Paimon配置
paimon_default_db=default
paimon_warehouse_path=hdfs://hdfs-cluster/paimon/warehouse
paimon_ods_db_name=ods_paimon
paimon_ods_table_name=ods_cdc_all
# Flink Checkpoint配置
flink_checkpoint_path=hdfs://hdfs-cluster/flink/checkpoint/kafka2paimon
四、作业提交与运行注意事项
Flink 集群配置:确保 Flink 集群的
flink-conf.yaml配置了 HDFS 和 Hive 的相关依赖,同时配置了足够的内存和并行度,适配 Kafka 的高吞吐;依赖包准备:将 Flink、Kafka、Paimon、Hive 的核心依赖包放入 Flink 的
lib目录,或通过-yD参数指定依赖包路径,避免作业运行时出现类缺失;Hive Metastore 联动:确保 Hive Metastore 服务正常运行,且 Paimon 的 warehouse 路径在 HDFS 上有读写权限,Hive 可访问该路径;
作业提交命令:通过 Flink run 命令提交作业,指定配置文件路径,示例:
flink run-application -d \ -t yarn-application \ -c com.panda.bigdata.flink.FlinkKafka2PaimonMultiOutStream \ flink-bigdata-1.0-SNAPSHOT.jar \ /opt/conf/kafka2paimon_withhivecatalog.conf日志排查:作业运行过程中,可通过 Flink Web UI 或 Yarn 日志查看作业状态,若出现数据解析失败,日志会打印原始数据和异常信息,便于问题定位;
Hive 查询验证:作业运行后,可直接在 Hive 中执行
show tables in ods_paimon;和select * from ods_paimon.ods_cdc_all limit 10;验证数据是否写入成功。
五、总结
本文实现的基于 Flink DataStream API 的 Kafka 到 Paimon 的实时同步方案,打通了从业务数据库到数据湖的全链路实时采集,同时通过 Hive Metastore 管理 Paimon 元数据,实现了流批一体的数据分析。该方案兼具实时性、可靠性、可扩展性和易用性,解决了传统数仓中流批数据分离、元数据不一致、运维成本高等问题,是湖仓一体架构下实时数仓 ODS 层建设的优质方案。
评论