先决条件确认

在执行以下步骤前,请确保已满足以下条件:

  • ✅ MySQL已开启Binlog(ROW模式)

  • ✅ Kafka集群已运行

  • ✅ Kafka Connect服务已启动(端口8083可用)

  • ✅ Debezium MySQL连接器插件已安装到Kafka Connect

🚀 详细实施步骤

步骤1:配置MySQL数据库

1.1 检查并配置MySQL Binlog

-- 连接到MySQL
mysql -u root -p

-- 检查当前Binlog配置
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
SHOW VARIABLES LIKE 'binlog_row_image';

-- 如果未开启ROW模式,修改MySQL配置文件(my.cnf或my.ini)
-- [mysqld]
-- server-id        = 1
-- log_bin          = mysql-bin
-- binlog_format    = ROW
-- binlog_row_image = FULL
-- expire_logs_days = 10
-- max_binlog_size  = 100M

-- 重启MySQL后创建专用于Debezium的用户
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;

1.2 创建测试数据库和表

-- 创建测试数据库
CREATE DATABASE IF NOT EXISTS finance;
USE finance;

-- 创建用户表
CREATE TABLE users (
    id INT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    full_name VARCHAR(100),
    status ENUM('active', 'inactive', 'suspended') DEFAULT 'active',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    last_login TIMESTAMP NULL
) ENGINE=InnoDB;

-- 创建订单表
CREATE TABLE orders (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id INT NOT NULL,
    order_number VARCHAR(20) UNIQUE NOT NULL,
    total_amount DECIMAL(15, 2) NOT NULL,
    currency VARCHAR(3) DEFAULT 'USD',
    status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_user_id (user_id),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB;

-- 创建订单商品表
CREATE TABLE order_items (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    order_id BIGINT NOT NULL,
    product_id INT NOT NULL,
    product_name VARCHAR(200) NOT NULL,
    quantity INT NOT NULL DEFAULT 1,
    unit_price DECIMAL(10, 2) NOT NULL,
    subtotal DECIMAL(10, 2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (order_id) REFERENCES orders(id) ON DELETE CASCADE
) ENGINE=InnoDB;

-- 插入初始测试数据
INSERT INTO users (username, email, full_name) VALUES
('john_doe', 'john@example.com', 'John Doe'),
('jane_smith', 'jane@example.com', 'Jane Smith'),
('bob_wilson', 'bob@example.com', 'Bob Wilson');

INSERT INTO orders (user_id, order_number, total_amount, status) VALUES
(1, 'ORD-2024-001', 299.97, 'delivered'),
(2, 'ORD-2024-002', 89.99, 'shipped'),
(1, 'ORD-2024-003', 150.50, 'pending');

INSERT INTO order_items (order_id, product_id, product_name, quantity, unit_price) VALUES
(1, 101, 'Wireless Headphones', 1, 199.99),
(1, 102, 'Phone Case', 2, 49.99),
(2, 103, 'USB-C Cable', 1, 19.99),
(2, 104, 'Power Bank', 1, 69.99),
(3, 105, 'Smart Watch', 1, 150.50);

步骤2:创建Debezium连接器配置文件

创建 debezium-mysql-connector-finance.json 文件:

{
  "name": "debezium-mysql-connector-finance",
  "config": {
    // 连接器基本配置
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    
    // MySQL连接配置
    "database.hostname": "192.168.1.183",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "123456",
    "database.server.id": "188056",
    "database.server.name": "mysql183",

    "topic.prefix": "mysql183",
    
    // 数据库和表过滤配置
    "database.include.list": "finance",
    // 全库同步可以不添加这个参数
    // "table.include.list": "ecommerce.users,ecommerce.orders,ecommerce.order_items",
    // "database.exclude.list": "mysql,sys,information_schema,performance_schema",
    
    // 快照配置
    "snapshot.mode": "initial",
    "snapshot.locking.mode": "minimal",
    "snapshot.fetch.size": 2000,
    "snapshot.delay.ms": 0,
    
    // Binlog配置
    "binlog.buffer.size": 0,
    "max.batch.size": 2048,
    "max.queue.size": 8192,
    "poll.interval.ms": 500,
    "connect.timeout.ms": 30000,
    "connect.keep.alive": true,
    "connect.keep.alive.interval.ms": 30000,
    
    // 库表结构变更记录存储
    "schema.history.internal.kafka.bootstrap.servers": "192.168.1.211:9092,192.168.1.212:9092,192.168.1.213:9092",
    "schema.history.internal.kafka.topic": "mysql183-schema-history-finance",
    "schema.history.internal.kafka.topic.partitions": 1,
    "schema.history.internal.kafka.topic.replication.factor": 3,
    "schema.history.internal.kafka.topic.cleanup.policy": "compact",
    "schema.history.internal.store.only.monitored.tables.ddl": true,
    "schema.history.internal.skip.unparseable.ddl": true,
    
    // 事件处理
    "tombstones.on.delete": true,
    "event.processing.failure.handling.mode": "fail",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    
    // 字段黑名单(排除敏感字段)
    // "column.blacklist": "finance.users.password",
    
    // Kafka Connect配置
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true,
    
    // 心跳配置(用于监控)
    "heartbeat.interval.ms": 5000,
    "heartbeat.topics.prefix": "debezium_heartbeats",
    
    // 错误处理
    "errors.retry.delay.max.ms": 60000,
    "errors.tolerance": "none",
    "errors.deadletterqueue.topic.name": "mysql183-errors",
    "errors.deadletterqueue.topic.replication.factor": 1,
    
    // 指标监控
    "metrics.enabled": true,
    "metrics.accumulation.level": "CONNECTOR"
  }
}

步骤3:注册Debezium连接器

3.1 使用REST API注册连接器

# 注册Debezium MySQL连接器
curl -X POST \
  -H "Content-Type: application/json" \
  --data-binary @/home/phoenix/apps/kafka_2.13-3.9.1/debezium/debezium-mysql-connector-finance.json \
  http://192.168.1.211:8083/connectors

# 预期成功响应:
# {"name":"debezium-mysql-connector-finance","config":{...},"tasks":[]}

# 验证连接器是否创建成功
curl -s http://192.168.1.211:8083/connectors | jq .

# 查看连接器详细信息
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance | jq .

# 获取当前配置
curl http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/config | jq

# 检查连接器状态
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | jq .

# 暂停连接器(停止读取binlog,但保持状态)
curl -X PUT http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/pause
# 恢复连接器
curl -X PUT http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/resume

# 重启连接器
 curl -X POST http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/restart

3.2 监控连接器启动过程

# 查看Kafka Connect日志(查看连接器启动过程)
tail -f /home/phoenix/apps/kafka_2.13-3.9.1/logs/connect.log | grep -E "(debezium-mysql-connector-finance|MySqlConnector)"

# 或直接查看连接器任务状态
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | \
  jq '.tasks[] | {id: .id, state: .state, trace: .trace}'

步骤4:验证Kafka Topic创建

4.1 查看创建的Topic

# 列出所有Kafka Topic
kafka-topics.sh --bootstrap-server 192.168.1.211:9092 --list

# 预期会看到以下Topic:
# - mysql183.finance.users          # 用户表变更事件
# - mysql183.finance.orders         # 订单表变更事件
# - mysql183.finance.order_items    # 订单商品变更事件
# - mysql183                         # 默认Topic
# - schema-history-finance                 # 数据库历史记录
# - __debezium_heartbeats.mysql183   # 心跳Topic
# - connect-configs, connect-offsets, connect-statuses  # Kafka Connect内部Topic

# 查看Topic详细信息
kafka-topics.sh --bootstrap-server 192.168.1.211:9092 \
  --topic mysql183.finance.app_stock_index_percnet_df \
  --describe

4.2 查看初始快照数据

# 消费初始快照数据
kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.211:9092 \
  --topic mysql183.finance.app_stock_index_percnet_df \
  --from-beginning \
  --timeout-ms 10000 \
  --property print.key=true \
  --property key.separator=" | " \
  --max-messages 5

# 查看数据库模式变更历史
kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.211:9092 \
  --topic mysql183-schema-history-finance \
  --from-beginning \
  --max-messages 3

步骤5:测试实时数据变更捕获

5.1 实时监控Kafka中的变更事件

创建实时监控脚本 monitor-debezium.sh

#!/bin/bash
# monitor-debezium.sh - 实时监控Debezium CDC事件

BOOTSTRAP_SERVER="localhost:9092"
TOPIC_PREFIX="mysql-server.ecommerce"

echo "=== 开始监控Debezium CDC事件流 ==="
echo "监控主题: ${TOPIC_PREFIX}.*"
echo "按 Ctrl+C 停止监控"
echo ""

# 监控所有ecommerce相关主题
while true; do
  echo "====== $(date '+%Y-%m-%d %H:%M:%S') ======"
  
  # 检查每个主题的最新消息
  for topic in users orders order_items; do
    full_topic="${TOPIC_PREFIX}.${topic}"
    
    echo "--- ${full_topic} ---"
    
    # 获取主题最后一条消息
    kafka-console-consumer.sh \
      --bootstrap-server ${BOOTSTRAP_SERVER} \
      --topic ${full_topic} \
      --max-messages 1 \
      --timeout-ms 5000 \
      --property print.timestamp=true \
      --property print.key=true \
      --property key.separator=" | " 2>/dev/null || echo "无新消息"
    
    # 获取主题当前消息数量
    count=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
      --bootstrap-server ${BOOTSTRAP_SERVER} \
      --topic ${full_topic} \
      --time -1 2>/dev/null | awk -F: '{sum += $3} END {print sum}')
    
    echo "消息总数: ${count:-0}"
    echo ""
  done
  
  # 检查连接器状态
  echo "--- 连接器状态 ---"
  curl -s http://localhost:8083/connectors/mysql-ecommerce-connector/status | \
    jq -r '.tasks[0] | "状态: \(.state) | 工作线程: \(.worker_id)"'
  
  echo ""
  sleep 5
done

5.2 查看特定操作的CDC事件

# 查看插入事件的详细结构
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic mysql-server.ecommerce.users \
  --offset latest \
  --max-messages 1 \
  --property print.key=false | jq '.'

# 查看更新事件的详细结构(包含before/after)
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic mysql-server.ecommerce.orders \
  --offset latest \
  --max-messages 1 \
  --property print.key=false | jq '.'

# 查看删除事件
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic mysql-server.ecommerce.order_items \
  --offset latest \
  --max-messages 1 \
  --property print.key=false | jq '.'

步骤6:理解CDC事件数据结构

6.1 事件类型解析

插入事件示例:

{
  "before": null,
  "after": {
    "id": 4,
    "username": "alice_jones",
    "email": "alice@example.com",
    "full_name": "Alice Jones",
    "status": "active",
    "created_at": 1704038400000,
    "updated_at": 1704038400000,
    "last_login": null
  },
  "source": {
    "version": "2.5.0.Final",
    "connector": "mysql",
    "name": "mysql-server",
    "ts_ms": 1704038400123,
    "snapshot": "false",
    "db": "ecommerce",
    "table": "users",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 457892,
    "row": 0,
    "thread": 7,
    "query": null
  },
  "op": "c",
  "ts_ms": 1704038400123,
  "transaction": null
}

更新事件示例:

{
  "before": {
    "id": 3,
    "username": "bob_wilson",
    "email": "bob@example.com",
    "full_name": "Bob Wilson",
    "status": "active",
    "created_at": 1703952000000,
    "updated_at": 1703952000000,
    "last_login": null
  },
  "after": {
    "id": 3,
    "username": "bob_wilson",
    "email": "bob@example.com",
    "full_name": "Bob Wilson",
    "status": "inactive",
    "created_at": 1703952000000,
    "updated_at": 1704038400500,
    "last_login": null
  },
  "source": {
    "version": "2.5.0.Final",
    "connector": "mysql",
    "name": "mysql-server",
    "ts_ms": 1704038400500,
    "snapshot": "false",
    "db": "ecommerce",
    "table": "users",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 458123,
    "row": 0,
    "thread": 7,
    "query": null
  },
  "op": "u",
  "ts_ms": 1704038400500,
  "transaction": null
}

6.2 关键字段说明

字段

说明

示例值

op

操作类型

c=创建, r=读取(快照), u=更新, d=删除

source.name

逻辑服务器名

mysql-server

source.db

数据库名

ecommerce

source.table

表名

users

source.ts_ms

源库时间戳

1704038400123

source.file

Binlog文件名

mysql-bin.000003

source.pos

Binlog位置

457892

ts_ms

处理时间戳

1704038400123

before

变更前数据

null 或 完整行数据

after

变更后数据

null 或 完整行数据

步骤7:验证与问题排查

7.1 验证数据一致性

# 1. 统计MySQL中的数据行数
mysql -u root -p -e "
USE ecommerce;
SELECT 'users' as table_name, COUNT(*) as count FROM users
UNION ALL
SELECT 'orders', COUNT(*) FROM orders
UNION ALL
SELECT 'order_items', COUNT(*) FROM order_items;
"

# 2. 统计Kafka Topic中的消息数
for topic in mysql-server.ecommerce.users mysql-server.ecommerce.orders mysql-server.ecommerce.order_items; do
  count=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
    --bootstrap-server localhost:9092 \
    --topic $topic \
    --time -1 2>/dev/null | awk -F: '{sum += $3} END {print sum}')
  echo "${topic}: ${count:-0} 条消息"
done

7.2 常见问题排查

# 1. 检查连接器错误
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | \
  jq -r '.tasks[0].trace // "无错误信息"'

# 2. 检查Binlog位置
# 查看当前处理的Binlog位置
kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.211:9092 \
  --topic connect-offsets \
  --from-beginning \
  --property print.key=true \
  --property key.separator=" | " \
  --max-messages 3 | grep debezium-mysql-connector-finance

# 3. 重启连接器(如有问题)
curl -X POST http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/restart

# 4. 查看连接器配置
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/config | jq '.'

# 5. 监控连接器指标
curl -s http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/status | \
  jq '.tasks[0].metrics'

7.3 重置连接器(谨慎操作)

# 删除连接器(会删除相关配置)
curl -X DELETE http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance

# 清空偏移量(重新开始捕获)
# 注意:这会从快照模式重新开始
curl -X DELETE \
  http://192.168.1.211:8083/connectors/debezium-mysql-connector-finance/offsets

# 重新注册连接器
curl -X POST \
  -H "Content-Type: application/json" \
  --data-binary @/home/phoenix/apps/kafka_2.13-3.9.1/debezium/debezium-mysql-connector-finance.json \
  http://192.168.1.211:8083/connectors

步骤8:消费CDC数据进行应用

8.1 使用Kafka消费者读取数据

# debezium_consumer.py
from kafka import KafkaConsumer
import json

# 配置Kafka消费者
consumer = KafkaConsumer(
    bootstrap_servers='localhost:9092',
    group_id='debezium-consumer-group',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# 订阅Debezium Topic
consumer.subscribe(pattern='mysql-server.ecommerce.*')

print("开始消费Debezium CDC事件...")
try:
    for message in consumer:
        topic = message.topic
        value = message.value
        
        print(f"\n=== Topic: {topic} ===")
        print(f"操作类型: {value.get('op')}")
        print(f"数据库: {value['source']['db']}")
        print(f"数据表: {value['source']['table']}")
        
        if value['op'] in ['c', 'r']:  # 创建或快照读取
            print(f"新增数据: {json.dumps(value['after'], indent=2, ensure_ascii=False)}")
        elif value['op'] == 'u':  # 更新
            print(f"变更前: {json.dumps(value['before'], indent=2, ensure_ascii=False)}")
            print(f"变更后: {json.dumps(value['after'], indent=2, ensure_ascii=False)}")
        elif value['op'] == 'd':  # 删除
            print(f"删除数据: {json.dumps(value['before'], indent=2, ensure_ascii=False)}")
            
except KeyboardInterrupt:
    print("\n停止消费")
finally:
    consumer.close()

8.2 使用kcat命令行工具消费

# 安装kcat(原kafkacat)
# Ubuntu: sudo apt-get install kcat
# macOS: brew install kcat

# 实时消费所有ecommerce相关事件
kcat -b localhost:9092 -t 'mysql-server.ecommerce.*' \
  -C -o beginning -q -u -f \
  'Topic: %t\nKey: %k\nValue: %s\nPartition: %p\nOffset: %o\n-----------\n'

# 格式化输出JSON
kcat -b localhost:9092 -t mysql-server.ecommerce.users \
  -C -o beginning -q -u -f '%s\n' | jq '.'

# 统计事件类型
kcat -b localhost:9092 -t mysql-server.ecommerce.users \
  -C -o beginning -q -u -f '%s\n' | \
  jq -r '.op' | sort | uniq -c

总结

通过以上详细步骤,已经完成了:

  1. MySQL配置:确保Binlog正确开启并配置了ROW模式

  2. 连接器配置:创建了详细的Debezium MySQL连接器配置文件

  3. 连接器注册:通过REST API将连接器注册到Kafka Connect

  4. 数据验证:确认CDC事件正确写入Kafka Topic

  5. 实时测试:通过执行SQL操作验证实时捕获功能

  6. 问题排查:提供了完整的监控和诊断方法

  7. 数据消费:展示了如何消费和应用CDC数据

关键成功因素:

  • ✅ MySQL Binlog必须是ROW模式

  • ✅ Debezium用户需要足够的权限(REPLICATION SLAVE等)

  • ✅ Kafka Connect服务必须正常运行

  • ✅ 连接器配置中的数据库连接信息必须正确

  • ✅ 确保网络连通性(MySQL → Kafka Connect)

参考:

debezium的属性列表:

https://debezium.io/documentation/reference/3.4/connectors/mysql.html#mysql-connector-properties