一、什么是 Flink Hive Catalog?
HiveCatalog 是 Flink 提供的一种 Catalog 实现,它允许 Flink 直接读取和写入 Hive Metastore 中定义的数据库、表、分区、函数等元数据。通过 HiveCatalog:
Flink 可以 复用 Hive 已有的表结构,无需重复定义 Schema;
支持 跨会话持久化 Flink 特有的元数据(如 Kafka 表、MySQL 表等)到 Hive Metastore;
实现 统一的元数据管理入口,打通流处理与批处理的元数据边界。
二、使用flink sql client 注册hive catalog
启动flink sql client
执行:
1)首先创建yarn application
$FLINK_HOME/bin/yarn-session.sh -d -nm "FlinkSQLSession"
2)flink sql client使用创建的yarn application
$FLINK_HOME/bin/sql-client.sh embedded -s application_1770088725820_0028
注:yarn sessionid可以通过 yarn application -list 获取
创建hive catalog
paimon hive catalog: 支持创建paimon表
CREATE CATALOG my_paimon_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://192.168.1.211:9083', 'hive-conf-dir' = '/opt/apps/bigdata/apache-hive-2.3.9-bin/conf', 'hadoop-conf-dir' = '/opt/apps/bigdata/hadoop-2.10.2/etc/hadoop', 'warehouse' = 'hdfs://m-bigdata1:9000/user/paimon/warehouse', 'default-database'='ods_paimon' ); USE CATALOG my_paimon_hive;
配置flink checkpoint间隔
Paimon Sink 是 checkpoint 触发写入 的(类似 FileSink),如果未开启 checkpoint 或 checkpoint 间隔很长,数据会缓存在内存中,不会立即落盘。
-- 开启 checkpoint,每 30 秒一次
SET 'execution.checkpointing.interval' = '30s';
-- 可选:设置 checkpoint 超时时间
SET 'execution.checkpointing.timeout' = '5min';创建kafka source table
# 读取原始数据
create TEMPORARY table ods_kafka_source_raw_rt (
raw_message string
) WITH (
'connector' = 'kafka',
'topic' = 'mysql183_v2.finance.kafka_source',
'properties.bootstrap.servers' = '192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092',
'properties.group.id' = 'flink_debezium_finance_ods_raw',
'scan.startup.mode' = 'earliest-offset',
'format' = 'raw'
);读取数据:
SELECT
-- 提取payload层的JSON字符串
JSON_VALUE(raw_message, '$.payload') AS payload_json,
-- 提取操作类型(op)
JSON_VALUE(raw_message, '$.payload.op') AS op,
-- 提取after字段(结构化解析)
JSON_VALUE(raw_message, '$.payload.after.id') AS after_id,
JSON_VALUE(raw_message, '$.payload.after.name') AS after_name,
JSON_VALUE(raw_message, '$.payload.after.create_time') AS after_create_time,
-- 提取before字段(当前是INSERT操作,before为null,正常)
JSON_VALUE(raw_message, '$.payload.before.id') AS before_id,
-- 提取source元数据(如数据库、表名)
JSON_VALUE(raw_message, '$.payload.source.db') AS source_db,
JSON_VALUE(raw_message, '$.payload.source.table') AS source_table
FROM ods_kafka_source_raw_rt;创建paimon表
create table ods_finance_kafka_source_rt (
id int,
name string,
dt string
) partitioned by (dt) -- 假设按天分区
with (
'connector' = 'paimon',
'bucket' = '3', -- 桶的数量,根据实际情况调整
'bucket-key' = 'id',
'sink.partition-commit.policy.kind' = 'metastore,success-file', -- 提交分区策略
'file.format' = 'parquet',
'metastore.partitioned-table'= 'true'
);将kafka数据写入paimon
insert into ods_paimon.ods_finance_kafka_source_rt partition(dt='2026-02-01')
select
cast(JSON_VALUE(raw_message, '$.payload.after.id') as int) id,
JSON_VALUE(raw_message, '$.payload.after.name') name
from ods_kafka_source_raw_rt;三、使用scala / Java API注册hive catalog
参考:
https://paimon.apache.org/docs/1.3/flink/sql-ddl/
https://paimon.apache.org/docs/1.3/maintenance/configurations/#coreoptions
https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/table/kafka/
https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/connectors/table/formats/debezium/
评论