《Kafka 实战》笔记2

2019-02-17  本文已影响0人  恶魔幻心

consumer group重新设置位移

前提是 consumer group不 能处于运行状态,也就是说它必须是 inactive 的

第一步是确定消费者组下 topic的作用域,当前支持 3种作用域,它们分别如下。

--all-topics:为消费者组下所有 topic 的所有分区调整位移 。

--topic t1,--topic t2:为指定的若干个 topic 的所有分区调整位移 。

--topic t1:0,1,2:为 topic的指定分区调整位移。

确定了 topic作用域之后,第二步就是确定位移重设策略 。当前支持如下 8种设置规则 。

--to-earliest:把位移调整到分区当前最早位移处 。

--to-latest:把位移调整到分区当前最新位移处。

--to-current:把位移调整到分区当前位移处 。

--to-offset <offset>:把位移调整到指定位移处。

--shift-by N:把位移调整到当前位移+N处。 N可以是负值。

--to-datetime <datetime>:把位移调整到大于给定时间的最早位移处 。datatime 格式是yyyy-MM-ddTHH:mm:ss.xxx,比如 20l7-08-04T00:00:00.000。

--by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处 。duration格式 是 PnDTnHnMnS,比如 PT0H5M0S。

--from-file <file>:从csv文件中读取位置调整策略

最后一步是确定执行方案,当前支持如下 3种方案 。

不加任何参数:只是打印位移调整方案,不实际执行 。

--execute:执行真正的位移调整。

--export:把位移调整方案保存成 csv 格式并输出到控制台,方便用户保存成 csv 文件,供后续结合from-file参数使用 。

测试

终端下启动 一个 console consumer程序,组名设置为 test-group:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal-example --from-beginning --consumer-property group.id=test-group

待运行一段时间后按下 Ctrl+C 组合键关闭 console consumer 程序,将 test-group 设置为inactive状态 。 之后运行 kafka-consumer-groups.sh脚本,确定当前 group 的消费进度:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --describe

最早:--to-earliest

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --topic canal-example --to-earliest --execute

大于指定时间:--to-datetime 

bin/kafka- consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets  --all-topics  --to-datetime 2018-04-01T14:30:00. 000

获取 topic 当前消息数

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1

bin/kafka-run-class .sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2

--time -1 命令表示要获取指定 topic所有分区当前的最大位移:而一time -2表示获取当前最早位移。随着集群的不断运行,topic 的数据可能会被移除 一部分 ,因--time 斗的结果其实表示的是历史上该 topic 生产的最大消息数。如果用户要统计当前的消息总数就必须减去 --time -2 的结果 。

上一篇下一篇

猜你喜欢

热点阅读