【kafka】2022-02-25【常用命令】

2022-02-25  本文已影响0人  SweetMojito

本人很懒,笔记这些都写在有道云;此外,感觉kafka的资料是真的多,且官网关于命令是真的很详细了,把自己常用的命令记录与分享下。

备注:命令基于kafka-2.1.1,如有雷同,联系侵删

# topic 相关

./kafka-topics.sh --zookeeper=zk_server --describe --topic test_topic

./kafka-topics.sh --zookeeper=zk_server --delete --topic test_topic

./kafka-topics.sh --zookeeper=zk_server --create --partitions 1 --replication-factor 3 --topic test_topic

# 可以将副本指定到对应的节点,结合写个小工具生成分布字符串,方便做隔离,

./kafka-topics.sh --zookeeper=zk_server --create --replica-assignment 1:3,2:1,3:2 --topic test_topic

# 查看topic列表,看是否创建成功

./kafka-topics.sh --zookeeper=zk_server --list

./kafka-topics.sh --zookeeper=zk_server --alter --topic test_topic --partitions 3

./kafka-topics.sh --zookeeper=zk_server --alter --partitions 3 --topic test_topic --replica-assignment 1:3,2:1,3:2

./kafka-configs.sh --zookeeper=zk_server --alter --entity-name test_topic --entity-type topics --delete-config retention.ms

# 迁移相关,可先修改保存时间避免大量数据同步

./kafka-configs.sh --zookeeper=zk_server --alter --entity-type topics --add-config retention.ms=86400000 --entity-name test_topic

# 第一步:指定topic

echo {\"topics\": [{\"topic\": \"test_topic\"}],\"version\": 1} > ../conf/topic.json

# 第二步:生成分配策略

./kafka-reassign-partitions.sh --zookeeper=zk_server --topics-to-move-json-file ../conf/topic.json --broker-list "1,2,3" --generate

# 第三步:不限速执行迁移(没有流量推荐使用,有流量慎重使用)

./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --execute

# 第四步:限速执行迁移(推荐使用),迁移完成需执行下面的命令

./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --execute --throttle 52400000

# 切换目录迁移:any同时限速(如果是鉴权集群可能要加一些账密配置)

./kafka-reassign-partitions.sh --zookeeper=zk_server --bootstrap-server broker:port --reassignment-json-file ../conf/reassignment.json --execute --replica-alter-log-dirs-throttle 52400000

# 第五步:查看迁移进度,如果全部迁移完成执行该命令移除限速

./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --verify

./kafka-reassign-partitions.sh --zookeeper=zk_server --bootstrap-server broker:port --reassignment-json-file ../conf/reassignment.json --verify

# 指定均衡

./kafka-preferred-replica-election.sh --zookeeper=zk_server --path-to-json-file ../conf/reassignment.json

# 全局均衡(生产慎用)

./kafka-preferred-replica-election.sh --zookeeper=zk_server

# 授权相关,查看授权列表

./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --list

# 查看用户信息

./kafka-configs.sh --zookeeper=zk_server --describe --entity-name test_user --entity-type users

# 添加账户

./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=test_user]' --entity-type users --entity-name test_user

# 添加写

./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation Write --topic 'test_topic'

# 添加读

./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation Read --topic 'test_topic' --group 'group_test_topic'

# 事务-账户幂等写入集群

./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation IdempotentWrite --cluster kafka-cluster

# 事务-账户授事务ID的所有权限

./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation ALL --transactional-id '*'

# 移除授权(生产慎用)

# ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --remove --allow-principal User:'test_user' --operation Read --topic 'test_topic' --group 'group_test_topic' --allow-host '*'

# ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --remove --allow-principal User:'test_user' --operation Write --topic 'test_topic' --allow-host '*'

# LAG

./kafka-consumer-groups.sh --bootstrap-server broker:port --describe --group group_test_topic

# 消费内部topic数据

./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server broker:port --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"  --max-messages 10000 > ./group.txt

./kafka-console-consumer.sh --topic __consumer_offsets --partition 1 --bootstrap-server broker:port --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

# 设置到某个时间-新加坡不用调整时差,国内有8h时差,比如:国内需要设置为14:00开始,则:2021-02-25T06:00:00.000

./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-datetime 2021-06-25T23:00:00.000 --group group_test_topic --execute

# 控制台生产

./kafka-console-producer.sh --topic resource_author_pond --broker-list broker:port

# 控制台消费

./kafka-console-consumer.sh --bootstrap-server broker:port --topic test_topic --from-beginning

./kafka-console-consumer.sh --bootstrap-server broker:port --max-messages 100 --topic test_topic > ./records.txt

# 限速相关

# 用户-clientId

./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=700,request_percentage=200' --entity-type users --entity-name test_user --entity-type clients --entity-name my_client_id

# 删除

./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'producer_byte_rate,consumer_byte_rate,request_percentage' --entity-type users --entity-name test_user --entity-type clients --entity-name my_client_id

# clientId

./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA

# 删除

./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'producer_byte_rate,consumer_byte_rate,request_percentage' --entity-type clients --entity-name my_client_id

# 添加限速[关于broker建的副本同步]

./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'leader.replication.throttled.rate=629145600,follower.replication.throttled.rate=629145600' --entity-type brokers --entity-name 1

# 必须同步设置topic

./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test_topic

# 移除限速

./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-type brokers --entity-name 1

./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-type topics --entity-name test_topic

./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'consumer_byte_rate' --entity-type users --entity-name test_user --entity-type clients --entity-name 'my_client_id'

# 查看文件内容

./kafka-run-class.sh kafka.tools.DumpLogSegments --files ./xxxxx.log --print-data-log | less

## 修改消费组偏移量,最近

./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server broker:port --describe --group group_test_topic

./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-earliest --execute --group group_test_topic

## 修改消费组偏移量,任意

./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-offset 174000 --execute --group group_test_topic

## 修改消费组偏移量,最开始

./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-latest --execute --group group_test_topic

# 压测

./kafka-producer-perf-test.sh --topic tiger_test --num-records 5000000000000 --record-size 10240 --throughput 500000 --producer-props bootstrap.servers=broker:port

./kafka-consumer-perf-test.sh --broker-list broker:port --topic test_topic --fetch-size 1048576 --messages 10000000 --threads 1

上一篇下一篇

猜你喜欢

热点阅读