创建topic:

/home/phoenix/apps/kafka_2.13-3.9.1/bin/kafka-topics.sh \
  --create \
  --bootstrap-server 192.168.1.211:9092 \
  --topic mysql-bin-tab1 \
  --partitions 1 --replication-factor 3 \
  --config cleanup.policy=compact

删除topic:

/home/phoenix/apps/kafka_2.13-3.9.1/bin/kafka-topics.sh \
  --delete \
  --topic mysql-bin-tab1 \
  --bootstrap-server 192.168.1.211:9092

列出所有Kafka Topic:

/home/phoenix/apps/kafka_2.13-3.9.1/bin/kafka-topics.sh \
  --bootstrap-server 192.168.1.211:9092 \
  --list

查看Topic详细信息:

/home/phoenix/apps/kafka_2.13-3.9.1/bin/kafka-topics.sh \
--bootstrap-server 192.168.1.211:9092 \
 --topic mysql-bin-tab1   --describe

# 查看Topic的详细配置
kafka-configs.sh --bootstrap-server 192.168.1.211:9092 \
  --entity-type topics \
  --entity-name your_topic \
  --describe

调整保留时间(数据会自动清理):

# kafka默认保存7天
# retention.ms=604800000 表示保留时间为 604,800,000 毫秒(即7天)。如果值为 -1,则表示永久保留。
/home/phoenix/apps/kafka_2.13-3.9.1/bin/kafka-configs.sh \
--bootstrap-server 192.168.1.211:9092 --entity-type topics \
  --entity-name topicname_ \
  --alter --add-config retention.ms=1000

消费:

kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.211:9092 \
  --topic connect-offsets \
  --from-beginning \
  --max-messages 3

kafka-console-consumer.sh \
  --bootstrap-server 192.168.1.211:9092 \
  --topic mysql-server.ecommerce.orders \
  --partition 0 \
  --offset latest \
  --max-messages 1 \
  --property print.key=false | jq '.'