一、什么是 Hive Catalog?
Hive Catalog 是 Apache Hive 提供的元数据管理服务,用于存储表结构(schema)、数据库、分区信息、表位置等元数据。它通常依赖于关系型数据库(如 MySQL、PostgreSQL)作为底层存储。
Flink 和 Spark 都支持集成 Hive Catalog,从而可以直接读写 Hive 表,复用 Hive 的元数据,实现与 Hive 生态系统的无缝集成。
二、Flink 中的 Hive Catalog
1. 简介
从 Flink 1.11 开始,Flink 原生支持 Hive Catalog。通过 Hive Catalog,Flink SQL 可以:
读取/写入 Hive 表(包括分区表)
使用 Hive UDF
复用 Hive 的 metastore 元数据
2. 使用步骤
以 Flink 1.20.3 + Hive 2.3.9 为例
(1)添加依赖
如果使用scala或者java api需要在pom中添加下面依赖
需要添加 Flink 与 Hive 集成的 connector:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.20.3</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.9</version>
<scope>provided</scope>
</dependency>基于flink sql使用,需要将hive相关jar复制到flink的lib目录下
具体见:https://www.pandadt.tech/arc/flinkpei-zhi -> flink的hive依赖注意:Hive 版本需与 Flink 官方兼容列表匹配。
(2)配置 Hive Catalog
这一步的作用是使用flink连接hive的metastore。下面介绍两种注册方式:
在代码中注册 Hive Catalog:
// Java 示例
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // 包含 hive-site.xml 的目录
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(name, hiveCatalog);
// 设置当前 catalog 为 Hive
tableEnv.useCatalog(name);在 SQL CLI中注册hive catlog:
执行:
$FLINK_HOME/bin/yarn-session.sh -d -nm "FlinkSQLSession"
$FLINK_HOME/bin/sql-client.sh embedded -s application_1769863234861_0048
注:yarn sessionid可以通过yarn application -list获取
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-version' = '2.3.9',
'default-database' = 'ods_paimon',
'hive-conf-dir' = '/opt/apps/bigdata/apache-hive-2.3.9-bin/conf'
);
USE CATALOG myhive;确保hive的配置文件中包含配置hive.metastore.uris:
<property>
<name>hive.metastore.uris</name>
<value>thrift://192.168.1.211:9083</value>
</property>
(3)操作 Hive 表
创建 Catalog 后,就可以在其中创建和操作表了。
创建 Hive 兼容表:如果想创建 Hive 也能查询的表,有两种方法:
方法A:使用 Hive 方言(推荐)
-- 切换到 Hive 方言
SET 'table.sql-dialect'='hive';
-- 创建 Hive 格式的表
create external table my_hive_table (
id int,
name string
) partitioned by (dt string)
stored as parquet
location '/user/paimon/warehouse/ods_paimon/my_hive_table'
TBLPROPERTIES (
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.delay' = '1h',
'partition.time-extractor.timestamp-pattern' = '$dt'
)
;
-- 切换回默认的 Flink 方言
SET 'table.sql-dialect'='default';方法B:在通用 DDL 中指定连接器
CREATE EXTERNAL TABLE my_hive_table (
id INT,
name STRING
) WITH (
'connector' = 'hive', -- 关键属性,声明为 Hive 表
'sink.partition-commit.policy.kind' = 'metastore,success-file'
);创建通用表(以 Kafka 源表为例):这种表专供 Flink 使用。
create table ods_kafka_source_rt (
source_db STRING METADATA FROM 'value.source.database' VIRTUAL,
source_table STRING METADATA FROM 'value.source.table' VIRTUAL,
operation_ts TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- 源数据库的时间戳
process_ts TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, -- Debezium处理时间
before ROW<
`id` BIGINT,
`name` STRING,
`create_time` TIMESTAMP(3)
>,
after ROW<
`id` BIGINT,
`name` STRING,
`create_time` TIMESTAMP(3)
>,
id AS after.id,
name AS after.name,
create_time AS after.create_time,
WATERMARK FOR create_time AS create_time - INTERVAL '10' SECOND
) with (
'connector' = 'kafka',
'topic' = 'mysql183_v2.finance.kafka_source',
'properties.bootstrap.servers' = '192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092',
'properties.group.id' = 'flink_debezium_finance_ods',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json',
'value.debezium-json.schema-include' = 'true',
'value.fields-include' = 'ALL'
);查询与写入数据:像使用普通 Flink 表一样操作。
-- 从 Kafka 表读取,写入 Hive 表
insert into my_hive_table partition(dt='2026-02-02')
select 1 id, 'zhangsan' name;
或者
insert into my_hive_table
select 2 id, 'lisi' name, '2026-02-02' dt;
insert into my_hive_table
select id, name, create_time from ods_kafka_source_rt;
-- 直接查询 Hive 表
select * from my_hive_table;注意:Flink 写入 Hive 表时,默认使用批处理模式(Batch Mode),且需确保表格式兼容(如 ORC、Parquet)。
三、Spark 中的 Hive Catalog
1. 简介
Spark 从早期版本就深度集成 Hive。启用 Hive 支持后,Spark SQL 可以:
访问 Hive metastore 中定义的表
执行 HiveQL 语法(部分)
使用 Hive SerDe 和 UDF
Spark 3.0 引入了 Catalog 插件机制,使得用户可以配置多个 Catalog(如内置的 in-memory catalog、Hive catalog、自定义 catalog 等),并通过统一的 SQL 接口访问。
Spark 的“Hive Catalog”其实就是其默认的 catalog 实现(当启用 Hive 支持时)。
2. 使用方式
(1)启用 Hive catalog支持
使用spark-sql启用:
spark-sql --master yarn \
--conf spark.driver.host=192.168.1.211 \
--conf spark.driver.advertiseAddress=192.168.1.211在创建
SparkSession时启用 Hive:
// Scala 示例
val spark = SparkSession.builder()
.appName("HiveExample")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport() // 关键!
.getOrCreate()PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HiveExample") \
.config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
.enableHiveSupport() \
.getOrCreate()必须将
hive-site.xml放在 Spark 的conf/目录下,以便连接 Hive metastore。
(2)操作 Hive 表
-- 在 Spark SQL 中直接使用 Hive 表
SHOW DATABASES;
USE default;
SHOW TABLES;
-- 建表
create table if not exists ods_spark.employees (
id int,
name string
)
partitioned by (dt string)
stored as parquet
location '/user/spark/warehouse/ods_spark/employees'
;
insert into table employees partition(dt='2026-02-03')
select 1 id, 'zhangsan' name;
SELECT * FROM employees;也可以通过 DataFrame API:
df = spark.table("default.my_hive_table")
df.show()(3)写入 Hive 表
df.write.mode("overwrite").insertInto("my_hive_table")
# 或
df.write.mode("overwrite").saveAsTable("new_hive_table")注意:
insertInto要求表已存在;saveAsTable会创建新表。
四、对比总结
五、注意事项
版本兼容性:Flink/Spark 与 Hive 版本必须匹配,参考官方文档。
权限问题:确保 Flink/Spark 有权限访问 HDFS 和 Hive metastore 数据库。
Hive 配置文件:
hive-site.xml必须正确配置 metastore URI、warehouse 目录等。表格式支持:建议使用 Parquet、ORC 等列式存储格式,避免纯文本格式性能问题。
评论