先决条件确认
在执行以下步骤前,请确保已满足以下条件:
✅ MySQL已开启Binlog(ROW模式)
✅ Kafka集群已运行
✅ Kafka Connect服务已启动(端口8083可用)
✅ Debezium MySQL连接器插件已安装到Kafka Connect
🚀 详细实施步骤


步骤1:配置MySQL数据库
1.1 检查并配置MySQL Binlog
-- 连接到MySQL
mysql -u root -p
-- 检查当前Binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';
-- 如果未开启ROW模式,修改MySQL配置文件(my.cnf或my.ini)
-- [mysqld]
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 10
-- max_binlog_size = 100M
-- 重启MySQL后创建专用于Debezium的用户
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;步骤2:创建Debezium连接器配置文件
创建 debezium-mysql-connector-finance.json 文件:
{
"name": "debezium-mysql-connector-finance",
"config": {
// 连接器基本配置
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// MySQL连接配置
"database.hostname": "192.168.1.183",
"database.port": "3306",
"database.user": "debezium",
"database.password": "123456",
"database.server.id": "188056",
"database.server.name": "mysql183",
"topic.prefix": "mysql183",
// 数据库和表过滤配置
"database.include.list": "finance",
// 全库同步可以不添加这个参数
// "table.include.list": "ecommerce.users,ecommerce.orders,ecommerce.order_items",
// "database.exclude.list": "mysql,sys,information_schema,performance_schema",
// 快照配置
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.fetch.size": 2000,
"snapshot.delay.ms": 0,
// Binlog配置
"binlog.buffer.size": 0,
"max.batch.size": 2048,
"max.queue.size": 8192,
"poll.interval.ms": 500,
"connect.timeout.ms": 30000,
"connect.keep.alive": true,
"connect.keep.alive.interval.ms": 30000,
// 库表结构变更记录存储
"schema.history.internal.kafka.bootstrap.servers": "192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092",
"schema.history.internal.kafka.topic": "mysql183-schema-history-finance",
"schema.history.internal.kafka.topic.partitions": 1,
"schema.history.internal.kafka.topic.replication.factor": 3,
"schema.history.internal.kafka.topic.cleanup.policy": "compact",
"schema.history.internal.store.only.monitored.tables.ddl": true,
"schema.history.internal.skip.unparseable.ddl": true,
// 事件处理
"tombstones.on.delete": true,
"event.processing.failure.handling.mode": "fail",
"errors.log.enable": true,
"errors.log.include.messages": true,
// 字段黑名单(排除敏感字段)
// "column.blacklist": "finance.users.password",
// Kafka Connect配置
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": true,
"value.converter.schemas.enable": true,
// 心跳配置(用于监控)
"heartbeat.interval.ms": 5000,
"heartbeat.topics.prefix": "debezium_heartbeats",
// 错误处理
"errors.retry.delay.max.ms": 60000,
"errors.tolerance": "none",
"errors.deadletterqueue.topic.name": "mysql183-errors",
"errors.deadletterqueue.topic.replication.factor": 1,
// 指标监控
"metrics.enabled": true,
"metrics.accumulation.level": "CONNECTOR"
}
}步骤3:注册Debezium连接器
3.1 使用REST API注册连接器
# 注册Debezium MySQL连接器
curl -X POST \
-H "Content-Type: application/json" \
--data-binary @/home/phoenix/apps/kafka_2.13-3.9.1/debezium/debezium-mysql-connector-finance.json \
http://192.168.1.211:8083/connectors
# 预期成功响应:
# {"name":"debezium-mysql-connector-finance","config":{...},"tasks":[]}3.2 监控连接器启动过程
# 查看Kafka Connect日志(查看连接器启动过程)
tail -f /home/phoenix/apps/kafka_2.13-3.9.1/logs/connect.log | grep -E "(debezium-mysql-connector-finance|MySqlConnector)"
# 或直接查看连接器任务状态
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | \
jq '.tasks[] | {id: .id, state: .state, trace: .trace}'其他相关命令:
# 查看连接器列表
curl -s http://192.168.1.211:8083/connectors | jq .
# 查看连接器详细信息
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance | jq .
# 获取当前配置
curl http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/config | jq
# 检查连接器状态
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | jq .
# 暂停连接器(停止读取binlog,但保持状态)
curl -X PUT http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/pause
# 恢复连接器
curl -X PUT http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/resume
# 重启连接器
curl -X POST http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/restart
# 删除连接器(会删除相关配置)
curl -X DELETE http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance
# 清空偏移量(重新开始捕获)
# 注意:这会从快照模式重新开始
curl -X DELETE \
http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/offsets
步骤4:验证Kafka Topic创建
4.1 查看创建的Topic
# 列出所有Kafka Topic
kafka-topics.sh --bootstrap-server 192.168.1.211:9092 --list
# 预期会看到以下Topic:
# - mysql183.finance.users # 用户表变更事件
# - mysql183.finance.orders # 订单表变更事件
# - mysql183.finance.order_items # 订单商品变更事件
# - mysql183 # 默认Topic
# - schema-history-finance # 数据库历史记录
# - __debezium_heartbeats.mysql183 # 心跳Topic
# - connect-configs, connect-offsets, connect-statuses # Kafka Connect内部Topic
# 查看Topic详细信息
kafka-topics.sh --bootstrap-server 192.168.1.211:9092 \
--topic mysql183.finance.app_stock_index_percnet_df \
--describe4.2 查看初始快照数据
# 消费初始快照数据
kafka-console-consumer.sh \
--bootstrap-server 192.168.1.211:9092 \
--topic mysql183.finance.app_stock_index_percnet_df \
--from-beginning \
--timeout-ms 10000 \
--property print.key=true \
--property key.separator=" | " \
--max-messages 5
# 查看数据库模式变更历史
kafka-console-consumer.sh \
--bootstrap-server 192.168.1.211:9092 \
--topic mysql183-schema-history-finance \
--from-beginning \
--max-messages 3步骤5:理解CDC事件数据结构
5.1 事件类型解析
插入事件示例:
{
"before": null,
"after": {
"id": 4,
"username": "alice_jones",
"email": "alice@example.com",
"full_name": "Alice Jones",
"status": "active",
"created_at": 1704038400000,
"updated_at": 1704038400000,
"last_login": null
},
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1704038400123,
"snapshot": "false",
"db": "ecommerce",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 457892,
"row": 0,
"thread": 7,
"query": null
},
"op": "c",
"ts_ms": 1704038400123,
"transaction": null
}更新事件示例:
{
"before": {
"id": 3,
"username": "bob_wilson",
"email": "bob@example.com",
"full_name": "Bob Wilson",
"status": "active",
"created_at": 1703952000000,
"updated_at": 1703952000000,
"last_login": null
},
"after": {
"id": 3,
"username": "bob_wilson",
"email": "bob@example.com",
"full_name": "Bob Wilson",
"status": "inactive",
"created_at": 1703952000000,
"updated_at": 1704038400500,
"last_login": null
},
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1704038400500,
"snapshot": "false",
"db": "ecommerce",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 458123,
"row": 0,
"thread": 7,
"query": null
},
"op": "u",
"ts_ms": 1704038400500,
"transaction": null
}5.2 关键字段说明
总结
通过以上详细步骤,已经完成了:
MySQL配置:确保Binlog正确开启并配置了ROW模式
连接器配置:创建了详细的Debezium MySQL连接器配置文件
连接器注册:通过REST API将连接器注册到Kafka Connect
数据验证:确认CDC事件正确写入Kafka Topic
实时测试:通过执行SQL操作验证实时捕获功能
问题排查:提供了完整的监控和诊断方法
数据消费:展示了如何消费和应用CDC数据
关键成功因素:
✅ MySQL Binlog必须是ROW模式
✅ Debezium用户需要足够的权限(REPLICATION SLAVE等)
✅ Kafka Connect服务必须正常运行
✅ 连接器配置中的数据库连接信息必须正确
✅ 确保网络连通性(MySQL → Kafka Connect)
参考:
debezium的属性列表:
https://debezium.io/documentation/reference/3.4/connectors/mysql.html#mysql-connector-properties
评论