这是一个flink读取kafka数据写入paimon的java api例子。

paimon表使用hive metastore管理,写入后可以基于hive进行统计分析。

执行流程如下:

采集kafka多个topic数据到paimon表中,paimon元数据写入hive,后续可以在hive中查看paimon表。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.panda.bigdata.utils.DateTimeHelper;
import com.panda.bigdata.utils.MultiConfigHelper;
import com.panda.bigdata.utils.SafeStringSchema;
import com.panda.bigdata.utils.TimestampValidator;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * 采集kafka多个topic数据到paimon表中,paimon元数据写入hive,后续可以在hive中查看paimon表
 * 
 * 提交命令:
 */
public class FlinkKafka2PaimonMultiOutStream {


    // 定义 Side Output Tags
    private static final OutputTag<String> stream_paimon_tag = new OutputTag<>("stream-paimon"){};
    // private static final OutputTag<String> stream_doris_tag = new OutputTag<>("stream-doris"){};
    // 主流
    // private static final String stream_main_tag = "stream-other";

    public static void main(String[] args) throws Exception {
        final Logger log = LoggerFactory.getLogger(FlinkKafka2PaimonMultiOutStream.class);
        String configPath = args.length > 0 ? args[0] : "/home/phoenix/git/my_bigdata/src/main/resources/config/kafka2paimon_withhivecatalog.conf";

        String kafka_bootstrap_servers = MultiConfigHelper.getProperty(configPath, "kafka_bootstrap_servers");
        String kafk_topic_pattern = MultiConfigHelper.getProperty(configPath, "kafka_topic_pattern");
        String kafka_group_id = MultiConfigHelper.getProperty(configPath, "kafka_group_id");
        String kafka_offsets_initializer = MultiConfigHelper.getProperty(configPath, "kafka_offsets_initializer");
        // String hive_catalog_name = MultiConfigHelper.getProperty(configPath, "hive_catalog_name");
        String hive_metastore_uri = MultiConfigHelper.getProperty(configPath, "hive_metastore_uri");
        String paimon_default_db = MultiConfigHelper.getProperty(configPath, "paimon_default_db");
        String paimon_warehouse_path = MultiConfigHelper.getProperty(configPath, "paimon_warehouse_path");
        String paimon_ods_table_name = MultiConfigHelper.getProperty(configPath, "paimon_ods_table_name");
        String paimon_ods_db_name = MultiConfigHelper.getProperty(configPath, "paimon_ods_db_name");
        String flink_checkpoint_path = MultiConfigHelper.getProperty(configPath, "flink_checkpoint_path");

        log.info("配置文件:{}", configPath);
        log.info("配置信息:{}, {}, {}, {}", kafka_bootstrap_servers, kafk_topic_pattern, kafka_group_id, kafka_offsets_initializer);

        Configuration flinkConf = new Configuration();
        flinkConf.set(RestOptions.BIND_PORT, "8189"); // Rest端口
        flinkConf.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        flinkConf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, flink_checkpoint_path);
        flinkConf.setString("pipeline.object-reuse", "true"); // 启用对象复用
        flinkConf.setString("serialization.force-kryo", "false"); // 不强制所有类型用Kryo
        flinkConf.setString("serialization.kryo.registrator", "org.apache.flink.api.java.typeutils.runtime.kryo.FlinkKryoRegistrar");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(flinkConf);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
        env.enableCheckpointing(60000); // 建议开启检查点以保证 Exactly-Once
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        // 关键:作业取消后保留Checkpoint(否则无法用于恢复)
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 1. 配置并创建 Kafka Source
        OffsetsInitializer offsetsInitializer = OffsetsInitializer.earliest();
        if (kafka_offsets_initializer.equals("latest")){
            offsetsInitializer = OffsetsInitializer.latest();
        }
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers(kafka_bootstrap_servers)
                    .setTopicPattern(Pattern.compile(kafk_topic_pattern))
                    .setStartingOffsets(offsetsInitializer) // 从最早偏移量开始(可改为latest)
                    .setGroupId(kafka_group_id)
                    .setValueOnlyDeserializer(new SafeStringSchema()) // 读取原始JSON字符串
                    .build();

        DataStream<String> rawKafkaStream = env.fromSource(
                                kafkaSource, 
                                WatermarkStrategy.noWatermarks(), 
                                "flink_kafka2paimon_stream_" + kafk_topic_pattern.replace(".*", "")
                            );

        // 2. 解析 JSON 并进行分流
        SingleOutputStreamOperator<String> mainStream = rawKafkaStream
                .process(new ProcessFunction<String, String>() {
                    private static final long serialVersionUID = 1L;
                    // private final ObjectMapper objectMapper = new ObjectMapper();

                    @Override
                    public void processElement(String value, ProcessFunction<String, String>.Context ctx, Collector<String> out) throws Exception {
                        // JsonNode jsonNode = objectMapper.readTree(value);
                        
                        // String type = jsonNode.get("type").asText();

                        // 发送到主流
                        // out.collect(value);

                        //发送到paimon
                        ctx.output(stream_paimon_tag, value);
                        //发送到doris
                        // ctx.output(stream_doris_tag, value);
                    }
                });

        // --- DataStream API 写入 Paimon ---
        
        // 准备 Paimon Catalog 配置
        Options catalogOptions = new Options();
        catalogOptions.set("type", "paimon");
        catalogOptions.set("metastore", "hive");
        catalogOptions.set("uri", hive_metastore_uri);
        catalogOptions.set("default-database", paimon_default_db);
        catalogOptions.set("warehouse", paimon_warehouse_path);
        // hadoop-conf-dir 和 fs.defaultFS 可以通过 flink-conf.yaml 或环境变量设置,这里省略
        CatalogContext catalogContext = CatalogContext.create(catalogOptions);

        Catalog paimonCatalog = CatalogFactory.createCatalog(catalogContext);

        // 获取 Paimon 表实例
        Table paimonTable = getPaimonTable(paimonCatalog, paimon_ods_db_name, paimon_ods_table_name);
        // Table dorisTable = null;
        // Table tableOther = null;

        // 将分流后的数据写入对应的 Paimon 表
        DataStream<Row> paimonStream = mainStream.getSideOutput(stream_paimon_tag)
                .map(jsonStr -> getPaimonRowData(jsonStr))
        ;
        DataType rowDataType = DataTypes.ROW(
            DataTypes.FIELD("db_name", DataTypes.STRING()),
            DataTypes.FIELD("table_name", DataTypes.STRING()),
            DataTypes.FIELD("dt", DataTypes.STRING()),
            DataTypes.FIELD("create_time", DataTypes.STRING()),
            DataTypes.FIELD("raw_msg", DataTypes.STRING())
        );
        
        FlinkSinkBuilder flinkBuilder = new FlinkSinkBuilder(paimonTable)
            .forRow(paimonStream, rowDataType)
            ;
        flinkBuilder.build();

        env.execute("Flink Kafka 2 Paimon MultiStream Job (DataStream API)");
    }

    // 组装paimon行数据
    public static Row getPaimonRowData(String jsonStr) {
        String db_name = "";
        String table_name = "";
        String dt = DateTimeHelper.getCurrentTimestampFormatted(DateTimeHelper.DATE_MONTH_FORMATTER);
        String create_time = DateTimeHelper.getCurrentTimestampFormatted(DateTimeHelper.STANDARD_FORMATTER);
        
        try {
            ObjectMapper mapper = new ObjectMapper();
            JsonNode rootNode = mapper.readTree(jsonStr);

            JsonNode createTimeNode = rootNode.path("payload").path("after").path("create_time");
            
            if (!createTimeNode.isMissingNode() && !createTimeNode.isNull()) {
                // createTimeStr发现两种格式:1756940435000(13位时间戳) 和 2025-08-30T04:39:46Z(utc时间字符串),分别处理
                String createTimeStr = createTimeNode.asText();

                if (createTimeStr.contains("T") && createTimeStr.contains("z")) {
                    dt = DateTimeHelper.convertIsoToFormatted(createTimeStr, DateTimeHelper.DATE_MONTH_FORMATTER);

                } else if(TimestampValidator.isValidTimestamp(createTimeStr)){
                    dt = DateTimeHelper.convertTimestampToCustomFormat(createTimeStr, DateTimeHelper.DATE_MONTH_FORMATTER);
                }
            }

            db_name = rootNode.path("payload").path("source").path("db").asText();
            table_name = rootNode.path("payload").path("source").path("table").asText();
            if(db_name == null || db_name == ""){
                db_name = "other";
            }
            if(table_name == null  || table_name == ""){
                table_name = "other";
            }
        } catch(Exception e){
            System.out.println(e);
        }
        
        return Row.of(db_name, table_name, dt, create_time, jsonStr);
    }

    // 获取Paimon表
    private static Table getPaimonTable(Catalog catalog, String database, String tableName) {
        Identifier identifier = Identifier.create(database, tableName);
        Table table = null;
        try {
            List<String> tableList = catalog.listTables(database);
            if (!tableList.contains(tableName)) {
                System.out.println("Creating paimon table: " + tableName);
                
                // 2.5 创建表
                Map<String, String> tableOptions = new HashMap<>();
                tableOptions.put("file.format", "parquet"); // 文件格式:parquet
                // 分区提交策略
                tableOptions.put("sink.partition-commit.policy.kind", "metastore,success-file");
                // 启用Hive Metastore分区表
                tableOptions.put("metastore.partitioned-table", "true");
                tableOptions.put("metastore.partitioned-table", "true");
                tableOptions.put("hive.input.recursive.dir", "true");
                tableOptions.put("hive.input.dir.recursion.level", "1");


                Schema schema = Schema.newBuilder()
                    .column("db_name", org.apache.paimon.types.DataTypes.STRING())         // 数据库名(分区字段)
                    .column("table_name", org.apache.paimon.types.DataTypes.STRING())            // 表名(分区字段)
                    .column("dt", org.apache.paimon.types.DataTypes.STRING())               // 日期分区(如yyyyMM,分区字段)
                    .column("create_time", org.apache.paimon.types.DataTypes.STRING())
                    .column("raw_msg", org.apache.paimon.types.DataTypes.STRING())
                    .partitionKeys("db_name", "table_name", "dt")
                    .options(tableOptions)
                    .build();

                catalog.createTable(identifier, schema, true);
            }

            table = catalog.getTable(identifier);
        } catch (Exception e) {
            throw new RuntimeException("创建paimon表失败: " + tableName, e);
        }
        return table;
    }
}