消费者offset的存储

2022-02-16  本文已影响0人  Shaw_Young

由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

创建一个topic

kafka-topics.sh --create --topic bigdata --zookeeper zkKafka1:2181 --partitions 2 --replication-factor 2

Created topic "bigdata".

连接上这个topic准备发消息

 kafka-console-producer.sh --broker-list zkKafka1:9092 --topic bigdata
> hello

启动一个消费者消费消息

 kafka-console-consumer.sh --zookeeper zkKafka1:2181 --topic bigdata

查看zk

zkCli.sh

[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, kafka-manager, latest_producer_id_block, zookeeper]

[zk: localhost:2181(CONNECTED) 2] ls /brokers
[ids, seqid, topics]

[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
[0, 1, 2]

[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[__consumer_offsets, bigdata, first]

[zk: localhost:2181(CONNECTED) 5] ls /consumers
# 消费者组
[console-consumer-90976]

[zk: localhost:2181(CONNECTED) 6] ls /consumers/console-consumer-90976 
[ids, offsets, owners]

[zk: localhost:2181(CONNECTED) 7] ls /consumers/console-consumer-90976/offsets
[bigdata]

[zk: localhost:2181(CONNECTED) 8] ls /consumers/console-consumer-90976/offsets/bigdata 
[0, 1]

[zk: localhost:2181(CONNECTED) 10] get /consumers/console-consumer-90976/offsets/bigdata/0
1

Kafka 0.9 版本之前,consumer默认将offset保存在zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。

1)修改配置文件consumer.properties

exclude.internal.topics=false

2)读取offset

# 重新启动一个消费者
kafka-console-consumer.sh --bootstrap-server zkKafka1:9092 --topic bigdata

bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper zkKafka1:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.conf
ig config/consumer.properties --from-beginning

[console-consumer-17599,bigdata,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921362785,ExpirationTime 1645007762785]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921362785,ExpirationTime 1645007762785]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921367772,ExpirationTime 1645007767772]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921367772,ExpirationTime 1645007767772]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921372775,ExpirationTime 1645007772775]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921372775,ExpirationTime 1645007772775]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921377777,ExpirationTime 1645007777777]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921377777,ExpirationTime 1645007777777]
上一篇下一篇

猜你喜欢

热点阅读