HiveCatalog 是 Flink 提供的一种 Catalog 实现,它允许 Flink 直接读取和写入 Hive Metastore 中定义的数据库、表、分区、函数等元数据。通过 HiveCatalog:

  • Flink 可以 复用 Hive 已有的表结构,无需重复定义 Schema;

  • 支持 跨会话持久化 Flink 特有的元数据(如 Kafka 表、MySQL 表等)到 Hive Metastore;

  • 实现 统一的元数据管理入口,打通流处理与批处理的元数据边界。

执行:

1)首先创建yarn application

$FLINK_HOME/bin/yarn-session.sh -d -nm "FlinkSQLSession"

2)flink sql client使用创建的yarn application

$FLINK_HOME/bin/sql-client.sh embedded -s application_1770088725820_0028

注:yarn sessionid可以通过 yarn application -list 获取

创建hive catalog

paimon hive catalog: 支持创建paimon表

CREATE CATALOG my_paimon_hive WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    'uri' = 'thrift://192.168.1.211:9083',
    'hive-conf-dir' = '/opt/apps/bigdata/apache-hive-2.3.9-bin/conf',
    'hadoop-conf-dir' = '/opt/apps/bigdata/hadoop-2.10.2/etc/hadoop',
    'warehouse' = 'hdfs://m-bigdata1:9000/user/paimon/warehouse',
    'default-database'='ods_paimon'
);

USE CATALOG my_paimon_hive;

Paimon Sink 是 checkpoint 触发写入 的(类似 FileSink),如果未开启 checkpoint 或 checkpoint 间隔很长,数据会缓存在内存中,不会立即落盘。

-- 开启 checkpoint,每 30 秒一次
SET 'execution.checkpointing.interval' = '30s';

-- 可选:设置 checkpoint 超时时间
SET 'execution.checkpointing.timeout' = '5min';

创建kafka source table

# 读取原始数据
create TEMPORARY table ods_kafka_source_raw_rt (
    raw_message string
) WITH (
    'connector' = 'kafka',
    'topic' = 'mysql183_v2.finance.kafka_source',
    'properties.bootstrap.servers' = '192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092',
    'properties.group.id' = 'flink_debezium_finance_ods_raw',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'raw'  
);

读取数据:

SELECT
  -- 提取payload层的JSON字符串
  JSON_VALUE(raw_message, '$.payload') AS payload_json,
  
  -- 提取操作类型(op)
  JSON_VALUE(raw_message, '$.payload.op') AS op,
  
  -- 提取after字段(结构化解析)
  JSON_VALUE(raw_message, '$.payload.after.id') AS after_id,
  JSON_VALUE(raw_message, '$.payload.after.name') AS after_name,
  JSON_VALUE(raw_message, '$.payload.after.create_time') AS after_create_time,
  
  -- 提取before字段(当前是INSERT操作,before为null,正常)
  JSON_VALUE(raw_message, '$.payload.before.id') AS before_id,
  
  -- 提取source元数据(如数据库、表名)
  JSON_VALUE(raw_message, '$.payload.source.db') AS source_db,
  JSON_VALUE(raw_message, '$.payload.source.table') AS source_table
FROM ods_kafka_source_raw_rt;

创建paimon表

create table ods_finance_kafka_source_rt (
    id int,
    name string,
    dt string
) partitioned by (dt)  -- 假设按天分区
with (
    'connector' = 'paimon',
    'bucket' = '3',  -- 桶的数量,根据实际情况调整
    'bucket-key' = 'id',
    'sink.partition-commit.policy.kind' = 'metastore,success-file',  -- 提交分区策略
    'file.format' = 'parquet',
    'metastore.partitioned-table'= 'true'
);

将kafka数据写入paimon

insert into ods_paimon.ods_finance_kafka_source_rt partition(dt='2026-02-01')
select 
  cast(JSON_VALUE(raw_message, '$.payload.after.id') as int) id,
  JSON_VALUE(raw_message, '$.payload.after.name') name
from ods_kafka_source_raw_rt;

三、使用scala / Java API注册hive catalog

参考:

https://paimon.apache.org/docs/1.3/flink/sql-ddl/

https://paimon.apache.org/docs/1.3/maintenance/configurations/#coreoptions

https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/table/kafka/

https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/table/formats/debezium/