【kafka】2022-02-25【常用命令】
本人很懒,笔记这些都写在有道云;此外,感觉kafka的资料是真的多,且官网关于命令是真的很详细了,把自己常用的命令记录与分享下。
备注:命令基于kafka-2.1.1,如有雷同,联系侵删
# topic 相关
./kafka-topics.sh --zookeeper=zk_server --describe --topic test_topic
./kafka-topics.sh --zookeeper=zk_server --delete --topic test_topic
./kafka-topics.sh --zookeeper=zk_server --create --partitions 1 --replication-factor 3 --topic test_topic
# 可以将副本指定到对应的节点,结合写个小工具生成分布字符串,方便做隔离,
./kafka-topics.sh --zookeeper=zk_server --create --replica-assignment 1:3,2:1,3:2 --topic test_topic
# 查看topic列表,看是否创建成功
./kafka-topics.sh --zookeeper=zk_server --list
./kafka-topics.sh --zookeeper=zk_server --alter --topic test_topic --partitions 3
./kafka-topics.sh --zookeeper=zk_server --alter --partitions 3 --topic test_topic --replica-assignment 1:3,2:1,3:2
./kafka-configs.sh --zookeeper=zk_server --alter --entity-name test_topic --entity-type topics --delete-config retention.ms
# 迁移相关,可先修改保存时间避免大量数据同步
./kafka-configs.sh --zookeeper=zk_server --alter --entity-type topics --add-config retention.ms=86400000 --entity-name test_topic
# 第一步:指定topic
echo {\"topics\": [{\"topic\": \"test_topic\"}],\"version\": 1} > ../conf/topic.json
# 第二步:生成分配策略
./kafka-reassign-partitions.sh --zookeeper=zk_server --topics-to-move-json-file ../conf/topic.json --broker-list "1,2,3" --generate
# 第三步:不限速执行迁移(没有流量推荐使用,有流量慎重使用)
./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --execute
# 第四步:限速执行迁移(推荐使用),迁移完成需执行下面的命令
./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --execute --throttle 52400000
# 切换目录迁移:any同时限速(如果是鉴权集群可能要加一些账密配置)
./kafka-reassign-partitions.sh --zookeeper=zk_server --bootstrap-server broker:port --reassignment-json-file ../conf/reassignment.json --execute --replica-alter-log-dirs-throttle 52400000
# 第五步:查看迁移进度,如果全部迁移完成执行该命令移除限速
./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --verify
./kafka-reassign-partitions.sh --zookeeper=zk_server --bootstrap-server broker:port --reassignment-json-file ../conf/reassignment.json --verify
# 指定均衡
./kafka-preferred-replica-election.sh --zookeeper=zk_server --path-to-json-file ../conf/reassignment.json
# 全局均衡(生产慎用)
./kafka-preferred-replica-election.sh --zookeeper=zk_server
# 授权相关,查看授权列表
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --list
# 查看用户信息
./kafka-configs.sh --zookeeper=zk_server --describe --entity-name test_user --entity-type users
# 添加账户
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=test_user]' --entity-type users --entity-name test_user
# 添加写
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation Write --topic 'test_topic'
# 添加读
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation Read --topic 'test_topic' --group 'group_test_topic'
# 事务-账户幂等写入集群
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation IdempotentWrite --cluster kafka-cluster
# 事务-账户授事务ID的所有权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation ALL --transactional-id '*'
# 移除授权(生产慎用)
# ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --remove --allow-principal User:'test_user' --operation Read --topic 'test_topic' --group 'group_test_topic' --allow-host '*'
# ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --remove --allow-principal User:'test_user' --operation Write --topic 'test_topic' --allow-host '*'
# LAG
./kafka-consumer-groups.sh --bootstrap-server broker:port --describe --group group_test_topic
# 消费内部topic数据
./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server broker:port --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --max-messages 10000 > ./group.txt
./kafka-console-consumer.sh --topic __consumer_offsets --partition 1 --bootstrap-server broker:port --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
# 设置到某个时间-新加坡不用调整时差,国内有8h时差,比如:国内需要设置为14:00开始,则:2021-02-25T06:00:00.000
./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-datetime 2021-06-25T23:00:00.000 --group group_test_topic --execute
# 控制台生产
./kafka-console-producer.sh --topic resource_author_pond --broker-list broker:port
# 控制台消费
./kafka-console-consumer.sh --bootstrap-server broker:port --topic test_topic --from-beginning
./kafka-console-consumer.sh --bootstrap-server broker:port --max-messages 100 --topic test_topic > ./records.txt
# 限速相关
# 用户-clientId
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=700,request_percentage=200' --entity-type users --entity-name test_user --entity-type clients --entity-name my_client_id
# 删除
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'producer_byte_rate,consumer_byte_rate,request_percentage' --entity-type users --entity-name test_user --entity-type clients --entity-name my_client_id
# clientId
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
# 删除
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'producer_byte_rate,consumer_byte_rate,request_percentage' --entity-type clients --entity-name my_client_id
# 添加限速[关于broker建的副本同步]
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'leader.replication.throttled.rate=629145600,follower.replication.throttled.rate=629145600' --entity-type brokers --entity-name 1
# 必须同步设置topic
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test_topic
# 移除限速
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-type brokers --entity-name 1
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-type topics --entity-name test_topic
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'consumer_byte_rate' --entity-type users --entity-name test_user --entity-type clients --entity-name 'my_client_id'
# 查看文件内容
./kafka-run-class.sh kafka.tools.DumpLogSegments --files ./xxxxx.log --print-data-log | less
## 修改消费组偏移量,最近
./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server broker:port --describe --group group_test_topic
./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-earliest --execute --group group_test_topic
## 修改消费组偏移量,任意
./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-offset 174000 --execute --group group_test_topic
## 修改消费组偏移量,最开始
./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-latest --execute --group group_test_topic
# 压测
./kafka-producer-perf-test.sh --topic tiger_test --num-records 5000000000000 --record-size 10240 --throughput 500000 --producer-props bootstrap.servers=broker:port
./kafka-consumer-perf-test.sh --broker-list broker:port --topic test_topic --fetch-size 1048576 --messages 10000000 --threads 1