一、什么是 Hive Catalog?

Hive Catalog 是 Apache Hive 提供的元数据管理服务,用于存储表结构(schema)、数据库、分区信息、表位置等元数据。它通常依赖于关系型数据库(如 MySQL、PostgreSQL)作为底层存储。

Flink 和 Spark 都支持集成 Hive Catalog,从而可以直接读写 Hive 表,复用 Hive 的元数据,实现与 Hive 生态系统的无缝集成。


1. 简介

Flink 1.11 开始,Flink 原生支持 Hive Catalog。通过 Hive Catalog,Flink SQL 可以:

  • 读取/写入 Hive 表(包括分区表)

  • 使用 Hive UDF

  • 复用 Hive 的 metastore 元数据

2. 使用步骤

以 Flink 1.20.3 + Hive 2.3.9 为例

(1)添加依赖

  • 如果使用scala或者java api需要在pom中添加下面依赖

需要添加 Flink 与 Hive 集成的 connector:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.20.3</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.3.9</version>
    <scope>provided</scope>
</dependency>
  • 基于flink sql使用,需要将hive相关jar复制到flink的lib目录下

具体见:https://www.pandadt.tech/arc/flinkpei-zhi  -> flink的hive依赖

注意:Hive 版本需与 Flink 官方兼容列表匹配。

(2)配置 Hive Catalog

这一步的作用是使用flink连接hive的metastore。下面介绍两种注册方式:

  • 在代码中注册 Hive Catalog:

// Java 示例
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/path/to/hive/conf"; // 包含 hive-site.xml 的目录

HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(name, hiveCatalog);

// 设置当前 catalog 为 Hive
tableEnv.useCatalog(name);
  • 在 SQL CLI中注册hive catlog:

执行:

$FLINK_HOME/bin/yarn-session.sh -d -nm "FlinkSQLSession"

$FLINK_HOME/bin/sql-client.sh embedded -s application_1769863234861_0048

注:yarn sessionid可以通过yarn application -list获取

CREATE CATALOG myhive WITH (
  'type' = 'hive',
  'hive-version' = '2.3.9',
  'default-database' = 'ods_paimon',
  'hive-conf-dir' = '/opt/apps/bigdata/apache-hive-2.3.9-bin/conf'
);

USE CATALOG myhive;

确保hive的配置文件中包含配置hive.metastore.uris:

<property>

<name>hive.metastore.uris</name>

<value>thrift://192.168.1.211:9083</value>

</property>

(3)操作 Hive 表

创建 Catalog 后,就可以在其中创建和操作表了。

  • 创建 Hive 兼容表:如果想创建 Hive 也能查询的表,有两种方法:

方法A:使用 Hive 方言(推荐)

-- 切换到 Hive 方言
SET 'table.sql-dialect'='hive';

-- 创建 Hive 格式的表
create external table my_hive_table (
    id int,
    name string
) partitioned by (dt string) 
stored as parquet
location '/user/paimon/warehouse/ods_paimon/my_hive_table'
TBLPROPERTIES (
  'sink.partition-commit.policy.kind' = 'metastore,success-file',
  'sink.partition-commit.trigger' = 'partition-time',
  'sink.partition-commit.delay' = '1h',
  'partition.time-extractor.timestamp-pattern' = '$dt'
)
;

-- 切换回默认的 Flink 方言
SET 'table.sql-dialect'='default';

方法B:在通用 DDL 中指定连接器

CREATE EXTERNAL TABLE my_hive_table (
    id INT,
    name STRING
) WITH (
    'connector' = 'hive',  -- 关键属性,声明为 Hive 表
    'sink.partition-commit.policy.kind' = 'metastore,success-file'
);
  • 创建通用表(以 Kafka 源表为例):这种表专供 Flink 使用。

create table ods_kafka_source_rt (
  source_db STRING METADATA FROM 'value.source.database' VIRTUAL,
  source_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  operation_ts TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- 源数据库的时间戳
  process_ts TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,           -- Debezium处理时间


  before ROW<
      `id` BIGINT,
      `name` STRING,
      `create_time` TIMESTAMP(3)
  >,

  after ROW<
      `id` BIGINT,
      `name` STRING,
      `create_time` TIMESTAMP(3)
  >,

  id AS after.id,
  name AS after.name,
  create_time AS after.create_time,

  WATERMARK FOR create_time AS create_time - INTERVAL '10' SECOND
) with (
  'connector' = 'kafka',
  'topic' = 'mysql183_v2.finance.kafka_source',
  'properties.bootstrap.servers' = '192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092',
  'properties.group.id' = 'flink_debezium_finance_ods',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json',
  'value.debezium-json.schema-include' = 'true',
  'value.fields-include' = 'ALL'
);
  • 查询与写入数据:像使用普通 Flink 表一样操作。

-- 从 Kafka 表读取,写入 Hive 表
insert into my_hive_table partition(dt='2026-02-02')
select 1 id, 'zhangsan' name;
或者
insert into my_hive_table
select 2 id, 'lisi' name, '2026-02-02' dt;

insert into my_hive_table
select id, name, create_time from ods_kafka_source_rt;

-- 直接查询 Hive 表
select * from my_hive_table;

注意:Flink 写入 Hive 表时,默认使用批处理模式(Batch Mode),且需确保表格式兼容(如 ORC、Parquet)。


三、Spark 中的 Hive Catalog

1. 简介

Spark 从早期版本就深度集成 Hive。启用 Hive 支持后,Spark SQL 可以:

  • 访问 Hive metastore 中定义的表

  • 执行 HiveQL 语法(部分)

  • 使用 Hive SerDe 和 UDF

Spark 3.0 引入了 Catalog 插件机制,使得用户可以配置多个 Catalog(如内置的 in-memory catalog、Hive catalog、自定义 catalog 等),并通过统一的 SQL 接口访问。

Spark 的“Hive Catalog”其实就是其默认的 catalog 实现(当启用 Hive 支持时)。

2. 使用方式

(1)启用 Hive catalog支持

使用spark-sql启用:

spark-sql --master yarn \
  --conf spark.driver.host=192.168.1.211 \
  --conf spark.driver.advertiseAddress=192.168.1.211
  • 在创建 SparkSession 时启用 Hive:

// Scala 示例
val spark = SparkSession.builder()
  .appName("HiveExample")
  .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
  .enableHiveSupport()  // 关键!
  .getOrCreate()
  • PySpark:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HiveExample") \
    .config("spark.sql.warehouse.dir", "/user/hive/warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

必须将 hive-site.xml 放在 Spark 的 conf/ 目录下,以便连接 Hive metastore。

(2)操作 Hive 表

-- 在 Spark SQL 中直接使用 Hive 表
SHOW DATABASES;
USE default;
SHOW TABLES;

-- 建表
create table if not exists ods_spark.employees (
    id int,
    name string
) 
partitioned by (dt string) 
stored as parquet
location '/user/spark/warehouse/ods_spark/employees'
;

insert into table employees partition(dt='2026-02-03')
select 1 id, 'zhangsan' name;

SELECT * FROM employees;

也可以通过 DataFrame API:

df = spark.table("default.my_hive_table")
df.show()

(3)写入 Hive 表

df.write.mode("overwrite").insertInto("my_hive_table")
# 或
df.write.mode("overwrite").saveAsTable("new_hive_table")

注意:insertInto 要求表已存在;saveAsTable 会创建新表。


四、对比总结

特性

Flink Hive Catalog

Spark Hive Support

集成方式

通过 HiveCatalog 显式注册

通过 .enableHiveSupport() 启用

元数据读取

支持读取 Hive 表结构和分区

完全兼容 Hive metastore

写入 Hive 表

支持(批模式)

支持(流/批均可,但流写 Hive 需特殊处理)

实时性

Flink 更适合流式场景

Spark Structured Streaming 对 Hive 写入有限制

配置依赖

需要 hive-site.xml 和对应 JAR

同左


五、注意事项

  1. 版本兼容性:Flink/Spark 与 Hive 版本必须匹配,参考官方文档。

  2. 权限问题:确保 Flink/Spark 有权限访问 HDFS 和 Hive metastore 数据库。

  3. Hive 配置文件hive-site.xml 必须正确配置 metastore URI、warehouse 目录等。

  4. 表格式支持:建议使用 Parquet、ORC 等列式存储格式,避免纯文本格式性能问题。