先决条件确认

在执行以下步骤前,请确保已满足以下条件:

  • ✅ MySQL已开启Binlog(ROW模式)

  • ✅ Kafka集群已运行

  • ✅ Kafka Connect服务已启动(端口8083可用)

  • ✅ Debezium MySQL连接器插件已安装到Kafka Connect

🚀 详细实施步骤

步骤1:配置MySQL数据库

1.1 检查并配置MySQL Binlog

-- 连接到MySQL
mysql -u root -p

-- 检查当前Binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';

-- 如果未开启ROW模式,修改MySQL配置文件(my.cnf或my.ini)
-- [mysqld]
-- server-id        = 1
-- log_bin          = mysql-bin
-- binlog_format    = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 10
-- max_binlog_size  = 100M

-- 重启MySQL后创建专用于Debezium的用户
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

步骤2:创建Debezium连接器配置文件

创建 debezium-mysql-connector-finance.json 文件:

{
  "name": "debezium-mysql-connector-finance",
  "config": {
    // 连接器基本配置
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    
    // MySQL连接配置
    "database.hostname": "192.168.1.183",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "123456",
    "database.server.id": "188056",
    "database.server.name": "mysql183",
    "topic.prefix": "mysql183",
    
    // 数据库和表过滤配置
    "database.include.list": "finance",
    // 全库同步可以不添加这个参数
    // "table.include.list": "ecommerce.users,ecommerce.orders,ecommerce.order_items",
    // "database.exclude.list": "mysql,sys,information_schema,performance_schema",
    
    // 快照配置
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "minimal",
    "snapshot.fetch.size": 2000,
    "snapshot.delay.ms": 0,
    
    // Binlog配置
    "binlog.buffer.size": 0,
    "max.batch.size": 2048,
    "max.queue.size": 8192,
    "poll.interval.ms": 500,
    "connect.timeout.ms": 30000,
    "connect.keep.alive": true,
    "connect.keep.alive.interval.ms": 30000,
    
    // 库表结构变更记录存储
    "schema.history.internal.kafka.bootstrap.servers": "192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092",
    "schema.history.internal.kafka.topic": "mysql183-schema-history-finance",
    "schema.history.internal.kafka.topic.partitions": 1,
    "schema.history.internal.kafka.topic.replication.factor": 3,
    "schema.history.internal.kafka.topic.cleanup.policy": "compact",
    "schema.history.internal.store.only.monitored.tables.ddl": true,
    "schema.history.internal.skip.unparseable.ddl": true,
    
    // 事件处理
    "tombstones.on.delete": true,
    "event.processing.failure.handling.mode": "fail",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    
    // 字段黑名单(排除敏感字段)
    // "column.blacklist": "finance.users.password",
    
    // Kafka Connect配置
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true,
    
    // 心跳配置(用于监控)
    "heartbeat.interval.ms": 5000,
    "heartbeat.topics.prefix": "debezium_heartbeats",
    
    // 错误处理
    "errors.retry.delay.max.ms": 60000,
    "errors.tolerance": "none",
    "errors.deadletterqueue.topic.name": "mysql183-errors",
    "errors.deadletterqueue.topic.replication.factor": 1,
    
    // 指标监控
    "metrics.enabled": true,
    "metrics.accumulation.level": "CONNECTOR"
  }
}

步骤3:注册Debezium连接器

3.1 使用REST API注册连接器

# 注册Debezium MySQL连接器
curl -X POST \
  -H "Content-Type: application/json" \
  --data-binary @/home/phoenix/apps/kafka_2.13-3.9.1/debezium/debezium-mysql-connector-finance.json \
  http://192.168.1.211:8083/connectors

# 预期成功响应:
# {"name":"debezium-mysql-connector-finance","config":{...},"tasks":[]}

3.2 监控连接器启动过程

# 查看Kafka Connect日志(查看连接器启动过程)
tail -f /home/phoenix/apps/kafka_2.13-3.9.1/logs/connect.log | grep -E "(debezium-mysql-connector-finance|MySqlConnector)"

# 或直接查看连接器任务状态
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | \
  jq '.tasks[] | {id: .id, state: .state, trace: .trace}'

其他相关命令:

# 查看连接器列表

curl -s http://192.168.1.211:8083/connectors | jq .

# 查看连接器详细信息

curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance | jq .

# 获取当前配置

curl http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/config | jq

# 检查连接器状态

curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | jq .

# 暂停连接器(停止读取binlog,但保持状态)

curl -X PUT http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/pause

# 恢复连接器

curl -X PUT http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/resume

# 重启连接器

curl -X POST http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/restart

# 删除连接器(会删除相关配置)

curl -X DELETE http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance

# 清空偏移量(重新开始捕获)

# 注意:这会从快照模式重新开始

curl -X DELETE \

http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/offsets

步骤4:验证Kafka Topic创建

4.1 查看创建的Topic

# 列出所有Kafka Topic
kafka-topics.sh --bootstrap-server 192.168.1.211:9092 --list

# 预期会看到以下Topic:
# - mysql183.finance.users          # 用户表变更事件
# - mysql183.finance.orders         # 订单表变更事件
# - mysql183.finance.order_items    # 订单商品变更事件
# - mysql183                         # 默认Topic
# - schema-history-finance                 # 数据库历史记录
# - __debezium_heartbeats.mysql183   # 心跳Topic
# - connect-configs, connect-offsets, connect-statuses  # Kafka Connect内部Topic

# 查看Topic详细信息
kafka-topics.sh --bootstrap-server 192.168.1.211:9092 \
  --topic mysql183.finance.app_stock_index_percnet_df \
  --describe

4.2 查看初始快照数据

# 消费初始快照数据
kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.211:9092 \
  --topic mysql183.finance.app_stock_index_percnet_df \
  --from-beginning \
  --timeout-ms 10000 \
  --property print.key=true \
  --property key.separator=" | " \
  --max-messages 5

# 查看数据库模式变更历史
kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.211:9092 \
  --topic mysql183-schema-history-finance \
  --from-beginning \
  --max-messages 3

步骤5:理解CDC事件数据结构

5.1 事件类型解析

插入事件示例:

{
  "before": null,
  "after": {
    "id": 4,
    "username": "alice_jones",
    "email": "alice@example.com",
    "full_name": "Alice Jones",
    "status": "active",
    "created_at": 1704038400000,
    "updated_at": 1704038400000,
    "last_login": null
  },
  "source": {
    "version": "2.5.0.Final",
    "connector": "mysql",
    "name": "mysql-server",
    "ts_ms": 1704038400123,
    "snapshot": "false",
    "db": "ecommerce",
    "table": "users",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 457892,
    "row": 0,
    "thread": 7,
    "query": null
  },
  "op": "c",
  "ts_ms": 1704038400123,
  "transaction": null
}

更新事件示例:

{
  "before": {
    "id": 3,
    "username": "bob_wilson",
    "email": "bob@example.com",
    "full_name": "Bob Wilson",
    "status": "active",
    "created_at": 1703952000000,
    "updated_at": 1703952000000,
    "last_login": null
  },
  "after": {
    "id": 3,
    "username": "bob_wilson",
    "email": "bob@example.com",
    "full_name": "Bob Wilson",
    "status": "inactive",
    "created_at": 1703952000000,
    "updated_at": 1704038400500,
    "last_login": null
  },
  "source": {
    "version": "2.5.0.Final",
    "connector": "mysql",
    "name": "mysql-server",
    "ts_ms": 1704038400500,
    "snapshot": "false",
    "db": "ecommerce",
    "table": "users",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 458123,
    "row": 0,
    "thread": 7,
    "query": null
  },
  "op": "u",
  "ts_ms": 1704038400500,
  "transaction": null
}

5.2 关键字段说明

字段

说明

示例值

op

操作类型

c=创建, r=读取(快照), u=更新, d=删除

source.name

逻辑服务器名

mysql-server

source.db

数据库名

ecommerce

source.table

表名

users

source.ts_ms

源库时间戳

1704038400123

source.file

Binlog文件名

mysql-bin.000003

source.pos

Binlog位置

457892

ts_ms

处理时间戳

1704038400123

before

变更前数据

null 或 完整行数据

after

变更后数据

null 或 完整行数据

总结

通过以上详细步骤,已经完成了:

  1. MySQL配置:确保Binlog正确开启并配置了ROW模式

  2. 连接器配置:创建了详细的Debezium MySQL连接器配置文件

  3. 连接器注册:通过REST API将连接器注册到Kafka Connect

  4. 数据验证:确认CDC事件正确写入Kafka Topic

  5. 实时测试:通过执行SQL操作验证实时捕获功能

  6. 问题排查:提供了完整的监控和诊断方法

  7. 数据消费:展示了如何消费和应用CDC数据

关键成功因素:

  • ✅ MySQL Binlog必须是ROW模式

  • ✅ Debezium用户需要足够的权限(REPLICATION SLAVE等)

  • ✅ Kafka Connect服务必须正常运行

  • ✅ 连接器配置中的数据库连接信息必须正确

  • ✅ 确保网络连通性(MySQL → Kafka Connect)

参考:

debezium的属性列表:

https://debezium.io/documentation/reference/3.4/connectors/mysql.html#mysql-connector-properties