Kafka

onPartitionRevoked和onPartitionAs

2022-04-20  本文已影响0人  Alen_ab56

第一个是刚开始重平衡 还没开始

第二是已经分配好了

建议你在第一个回调里提交offset

第二个回调里获取offset进行重置

public void onPartitionsRevoked(Collection<TopicPartition> collection) {

                consumer.commitAsync(); // 提交偏移量

            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

                // 获取该分区下已消费的偏移量

                long commitedOffset = -1;

                for (TopicPartition topicPartition : partitions) {

                    // 获取该分区下已消费的偏移量

                    commitedOffset = consumer.committed(topicPartition).offset();

                    // 重置偏移量到上一次提交的偏移量下一个位置处开始消费

                    consumer.seek(topicPartition, commitedOffset + 1);

                }

            }

assign不受这个影响

这个是subscribe方式进行消费

上一篇 下一篇

猜你喜欢

热点阅读