先决条件确认
在执行以下步骤前,请确保已满足以下条件:
✅ MySQL已开启Binlog(ROW模式)
✅ Kafka集群已运行
✅ Kafka Connect服务已启动(端口8083可用)
✅ Debezium MySQL连接器插件已安装到Kafka Connect
🚀 详细实施步骤


步骤1:配置MySQL数据库
1.1 检查并配置MySQL Binlog
-- 连接到MySQL
mysql -u root -p
-- 检查当前Binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';
-- 如果未开启ROW模式,修改MySQL配置文件(my.cnf或my.ini)
-- [mysqld]
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 10
-- max_binlog_size = 100M
-- 重启MySQL后创建专用于Debezium的用户
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;1.2 创建测试数据库和表
-- 创建测试数据库
CREATE DATABASE IF NOT EXISTS finance;
USE finance;
-- 创建用户表
CREATE TABLE users (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
full_name VARCHAR(100),
status ENUM('active', 'inactive', 'suspended') DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
last_login TIMESTAMP NULL
) ENGINE=InnoDB;
-- 创建订单表
CREATE TABLE orders (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
order_number VARCHAR(20) UNIQUE NOT NULL,
total_amount DECIMAL(15, 2) NOT NULL,
currency VARCHAR(3) DEFAULT 'USD',
status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB;
-- 创建订单商品表
CREATE TABLE order_items (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
order_id BIGINT NOT NULL,
product_id INT NOT NULL,
product_name VARCHAR(200) NOT NULL,
quantity INT NOT NULL DEFAULT 1,
unit_price DECIMAL(10, 2) NOT NULL,
subtotal DECIMAL(10, 2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (order_id) REFERENCES orders(id) ON DELETE CASCADE
) ENGINE=InnoDB;
-- 插入初始测试数据
INSERT INTO users (username, email, full_name) VALUES
('john_doe', 'john@example.com', 'John Doe'),
('jane_smith', 'jane@example.com', 'Jane Smith'),
('bob_wilson', 'bob@example.com', 'Bob Wilson');
INSERT INTO orders (user_id, order_number, total_amount, status) VALUES
(1, 'ORD-2024-001', 299.97, 'delivered'),
(2, 'ORD-2024-002', 89.99, 'shipped'),
(1, 'ORD-2024-003', 150.50, 'pending');
INSERT INTO order_items (order_id, product_id, product_name, quantity, unit_price) VALUES
(1, 101, 'Wireless Headphones', 1, 199.99),
(1, 102, 'Phone Case', 2, 49.99),
(2, 103, 'USB-C Cable', 1, 19.99),
(2, 104, 'Power Bank', 1, 69.99),
(3, 105, 'Smart Watch', 1, 150.50);步骤2:创建Debezium连接器配置文件
创建 debezium-mysql-connector-finance.json 文件:
{
"name": "debezium-mysql-connector-finance",
"config": {
// 连接器基本配置
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// MySQL连接配置
"database.hostname": "192.168.1.183",
"database.port": "3306",
"database.user": "debezium",
"database.password": "123456",
"database.server.id": "188056",
"database.server.name": "mysql183",
"topic.prefix": "mysql183",
// 数据库和表过滤配置
"database.include.list": "finance",
// 全库同步可以不添加这个参数
// "table.include.list": "ecommerce.users,ecommerce.orders,ecommerce.order_items",
// "database.exclude.list": "mysql,sys,information_schema,performance_schema",
// 快照配置
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.fetch.size": 2000,
"snapshot.delay.ms": 0,
// Binlog配置
"binlog.buffer.size": 0,
"max.batch.size": 2048,
"max.queue.size": 8192,
"poll.interval.ms": 500,
"connect.timeout.ms": 30000,
"connect.keep.alive": true,
"connect.keep.alive.interval.ms": 30000,
// 库表结构变更记录存储
"schema.history.internal.kafka.bootstrap.servers": "192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092",
"schema.history.internal.kafka.topic": "mysql183-schema-history-finance",
"schema.history.internal.kafka.topic.partitions": 1,
"schema.history.internal.kafka.topic.replication.factor": 3,
"schema.history.internal.kafka.topic.cleanup.policy": "compact",
"schema.history.internal.store.only.monitored.tables.ddl": true,
"schema.history.internal.skip.unparseable.ddl": true,
// 事件处理
"tombstones.on.delete": true,
"event.processing.failure.handling.mode": "fail",
"errors.log.enable": true,
"errors.log.include.messages": true,
// 字段黑名单(排除敏感字段)
// "column.blacklist": "finance.users.password",
// Kafka Connect配置
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": true,
"value.converter.schemas.enable": true,
// 心跳配置(用于监控)
"heartbeat.interval.ms": 5000,
"heartbeat.topics.prefix": "debezium_heartbeats",
// 错误处理
"errors.retry.delay.max.ms": 60000,
"errors.tolerance": "none",
"errors.deadletterqueue.topic.name": "mysql183-errors",
"errors.deadletterqueue.topic.replication.factor": 1,
// 指标监控
"metrics.enabled": true,
"metrics.accumulation.level": "CONNECTOR"
}
}步骤3:注册Debezium连接器
3.1 使用REST API注册连接器
# 注册Debezium MySQL连接器
curl -X POST \
-H "Content-Type: application/json" \
--data-binary @/home/phoenix/apps/kafka_2.13-3.9.1/debezium/debezium-mysql-connector-finance.json \
http://192.168.1.211:8083/connectors
# 预期成功响应:
# {"name":"debezium-mysql-connector-finance","config":{...},"tasks":[]}
# 验证连接器是否创建成功
curl -s http://192.168.1.211:8083/connectors | jq .
# 查看连接器详细信息
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance | jq .
# 获取当前配置
curl http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/config | jq
# 检查连接器状态
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | jq .
# 暂停连接器(停止读取binlog,但保持状态)
curl -X PUT http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/pause
# 恢复连接器
curl -X PUT http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/resume
# 重启连接器
curl -X POST http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/restart3.2 监控连接器启动过程
# 查看Kafka Connect日志(查看连接器启动过程)
tail -f /home/phoenix/apps/kafka_2.13-3.9.1/logs/connect.log | grep -E "(debezium-mysql-connector-finance|MySqlConnector)"
# 或直接查看连接器任务状态
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | \
jq '.tasks[] | {id: .id, state: .state, trace: .trace}'步骤4:验证Kafka Topic创建
4.1 查看创建的Topic
# 列出所有Kafka Topic
kafka-topics.sh --bootstrap-server 192.168.1.211:9092 --list
# 预期会看到以下Topic:
# - mysql183.finance.users # 用户表变更事件
# - mysql183.finance.orders # 订单表变更事件
# - mysql183.finance.order_items # 订单商品变更事件
# - mysql183 # 默认Topic
# - schema-history-finance # 数据库历史记录
# - __debezium_heartbeats.mysql183 # 心跳Topic
# - connect-configs, connect-offsets, connect-statuses # Kafka Connect内部Topic
# 查看Topic详细信息
kafka-topics.sh --bootstrap-server 192.168.1.211:9092 \
--topic mysql183.finance.app_stock_index_percnet_df \
--describe4.2 查看初始快照数据
# 消费初始快照数据
kafka-console-consumer.sh \
--bootstrap-server 192.168.1.211:9092 \
--topic mysql183.finance.app_stock_index_percnet_df \
--from-beginning \
--timeout-ms 10000 \
--property print.key=true \
--property key.separator=" | " \
--max-messages 5
# 查看数据库模式变更历史
kafka-console-consumer.sh \
--bootstrap-server 192.168.1.211:9092 \
--topic mysql183-schema-history-finance \
--from-beginning \
--max-messages 3步骤5:测试实时数据变更捕获
5.1 实时监控Kafka中的变更事件
创建实时监控脚本 monitor-debezium.sh:
#!/bin/bash
# monitor-debezium.sh - 实时监控Debezium CDC事件
BOOTSTRAP_SERVER="localhost:9092"
TOPIC_PREFIX="mysql-server.ecommerce"
echo "=== 开始监控Debezium CDC事件流 ==="
echo "监控主题: ${TOPIC_PREFIX}.*"
echo "按 Ctrl+C 停止监控"
echo ""
# 监控所有ecommerce相关主题
while true; do
echo "====== $(date '+%Y-%m-%d %H:%M:%S') ======"
# 检查每个主题的最新消息
for topic in users orders order_items; do
full_topic="${TOPIC_PREFIX}.${topic}"
echo "--- ${full_topic} ---"
# 获取主题最后一条消息
kafka-console-consumer.sh \
--bootstrap-server ${BOOTSTRAP_SERVER} \
--topic ${full_topic} \
--max-messages 1 \
--timeout-ms 5000 \
--property print.timestamp=true \
--property print.key=true \
--property key.separator=" | " 2>/dev/null || echo "无新消息"
# 获取主题当前消息数量
count=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server ${BOOTSTRAP_SERVER} \
--topic ${full_topic} \
--time -1 2>/dev/null | awk -F: '{sum += $3} END {print sum}')
echo "消息总数: ${count:-0}"
echo ""
done
# 检查连接器状态
echo "--- 连接器状态 ---"
curl -s http://localhost:8083/connectors/mysql-ecommerce-connector/status | \
jq -r '.tasks[0] | "状态: \(.state) | 工作线程: \(.worker_id)"'
echo ""
sleep 5
done5.2 查看特定操作的CDC事件
# 查看插入事件的详细结构
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic mysql-server.ecommerce.users \
--offset latest \
--max-messages 1 \
--property print.key=false | jq '.'
# 查看更新事件的详细结构(包含before/after)
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic mysql-server.ecommerce.orders \
--offset latest \
--max-messages 1 \
--property print.key=false | jq '.'
# 查看删除事件
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic mysql-server.ecommerce.order_items \
--offset latest \
--max-messages 1 \
--property print.key=false | jq '.'步骤6:理解CDC事件数据结构
6.1 事件类型解析
插入事件示例:
{
"before": null,
"after": {
"id": 4,
"username": "alice_jones",
"email": "alice@example.com",
"full_name": "Alice Jones",
"status": "active",
"created_at": 1704038400000,
"updated_at": 1704038400000,
"last_login": null
},
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1704038400123,
"snapshot": "false",
"db": "ecommerce",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 457892,
"row": 0,
"thread": 7,
"query": null
},
"op": "c",
"ts_ms": 1704038400123,
"transaction": null
}更新事件示例:
{
"before": {
"id": 3,
"username": "bob_wilson",
"email": "bob@example.com",
"full_name": "Bob Wilson",
"status": "active",
"created_at": 1703952000000,
"updated_at": 1703952000000,
"last_login": null
},
"after": {
"id": 3,
"username": "bob_wilson",
"email": "bob@example.com",
"full_name": "Bob Wilson",
"status": "inactive",
"created_at": 1703952000000,
"updated_at": 1704038400500,
"last_login": null
},
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "mysql-server",
"ts_ms": 1704038400500,
"snapshot": "false",
"db": "ecommerce",
"table": "users",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 458123,
"row": 0,
"thread": 7,
"query": null
},
"op": "u",
"ts_ms": 1704038400500,
"transaction": null
}6.2 关键字段说明
步骤7:验证与问题排查
7.1 验证数据一致性
# 1. 统计MySQL中的数据行数
mysql -u root -p -e "
USE ecommerce;
SELECT 'users' as table_name, COUNT(*) as count FROM users
UNION ALL
SELECT 'orders', COUNT(*) FROM orders
UNION ALL
SELECT 'order_items', COUNT(*) FROM order_items;
"
# 2. 统计Kafka Topic中的消息数
for topic in mysql-server.ecommerce.users mysql-server.ecommerce.orders mysql-server.ecommerce.order_items; do
count=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server localhost:9092 \
--topic $topic \
--time -1 2>/dev/null | awk -F: '{sum += $3} END {print sum}')
echo "${topic}: ${count:-0} 条消息"
done7.2 常见问题排查
# 1. 检查连接器错误
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | \
jq -r '.tasks[0].trace // "无错误信息"'
# 2. 检查Binlog位置
# 查看当前处理的Binlog位置
kafka-console-consumer.sh \
--bootstrap-server 192.168.1.211:9092 \
--topic connect-offsets \
--from-beginning \
--property print.key=true \
--property key.separator=" | " \
--max-messages 3 | grep debezium-mysql-connector-finance
# 3. 重启连接器(如有问题)
curl -X POST http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/restart
# 4. 查看连接器配置
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/config | jq '.'
# 5. 监控连接器指标
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | \
jq '.tasks[0].metrics'7.3 重置连接器(谨慎操作)
# 删除连接器(会删除相关配置)
curl -X DELETE http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance
# 清空偏移量(重新开始捕获)
# 注意:这会从快照模式重新开始
curl -X DELETE \
http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/offsets
# 重新注册连接器
curl -X POST \
-H "Content-Type: application/json" \
--data-binary @/home/phoenix/apps/kafka_2.13-3.9.1/debezium/debezium-mysql-connector-finance.json \
http://192.168.1.211:8083/connectors步骤8:消费CDC数据进行应用
8.1 使用Kafka消费者读取数据
# debezium_consumer.py
from kafka import KafkaConsumer
import json
# 配置Kafka消费者
consumer = KafkaConsumer(
bootstrap_servers='localhost:9092',
group_id='debezium-consumer-group',
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# 订阅Debezium Topic
consumer.subscribe(pattern='mysql-server.ecommerce.*')
print("开始消费Debezium CDC事件...")
try:
for message in consumer:
topic = message.topic
value = message.value
print(f"\n=== Topic: {topic} ===")
print(f"操作类型: {value.get('op')}")
print(f"数据库: {value['source']['db']}")
print(f"数据表: {value['source']['table']}")
if value['op'] in ['c', 'r']: # 创建或快照读取
print(f"新增数据: {json.dumps(value['after'], indent=2, ensure_ascii=False)}")
elif value['op'] == 'u': # 更新
print(f"变更前: {json.dumps(value['before'], indent=2, ensure_ascii=False)}")
print(f"变更后: {json.dumps(value['after'], indent=2, ensure_ascii=False)}")
elif value['op'] == 'd': # 删除
print(f"删除数据: {json.dumps(value['before'], indent=2, ensure_ascii=False)}")
except KeyboardInterrupt:
print("\n停止消费")
finally:
consumer.close()8.2 使用kcat命令行工具消费
# 安装kcat(原kafkacat)
# Ubuntu: sudo apt-get install kcat
# macOS: brew install kcat
# 实时消费所有ecommerce相关事件
kcat -b localhost:9092 -t 'mysql-server.ecommerce.*' \
-C -o beginning -q -u -f \
'Topic: %t\nKey: %k\nValue: %s\nPartition: %p\nOffset: %o\n-----------\n'
# 格式化输出JSON
kcat -b localhost:9092 -t mysql-server.ecommerce.users \
-C -o beginning -q -u -f '%s\n' | jq '.'
# 统计事件类型
kcat -b localhost:9092 -t mysql-server.ecommerce.users \
-C -o beginning -q -u -f '%s\n' | \
jq -r '.op' | sort | uniq -c总结
通过以上详细步骤,已经完成了:
MySQL配置:确保Binlog正确开启并配置了ROW模式
连接器配置:创建了详细的Debezium MySQL连接器配置文件
连接器注册:通过REST API将连接器注册到Kafka Connect
数据验证:确认CDC事件正确写入Kafka Topic
实时测试:通过执行SQL操作验证实时捕获功能
问题排查:提供了完整的监控和诊断方法
数据消费:展示了如何消费和应用CDC数据
关键成功因素:
✅ MySQL Binlog必须是ROW模式
✅ Debezium用户需要足够的权限(REPLICATION SLAVE等)
✅ Kafka Connect服务必须正常运行
✅ 连接器配置中的数据库连接信息必须正确
✅ 确保网络连通性(MySQL → Kafka Connect)
参考:
debezium的属性列表:
https://debezium.io/documentation/reference/3.4/connectors/mysql.html#mysql-connector-properties
评论