kafka Consumer — offset的控制
前言
在N久之前,曾写过kafka 生产者使用详解,
今天补上关于 offset 相关的内容。
那么本文主要涉及:
- Kafka 消费者的两个大版本
- 消费者的基本使用流程
- 重点:offset 的控制
消費者版本
-
开源之初使用Scala 语言编写的客户端,
我们可以称之为旧消费者客户端(Old Consumer)
或 Scala 消费者客户端; -
第二个是从Kafka 0.9. x 版本开始推出的使用Java 编写的客户端,
我们可以称之为新消费者客户端( New Consumer )
或Java 消费者客户端,
它弥补了旧客户端中存在的诸多设计缺陷,
不过我不建议你在0.9.x 使用该客户端,
该新客户端再 0.10.0 才算比较稳定了
这里额外提一句就是,客户端从scala 语言转向 java,
并不是 java 比 scala 要怎么怎么样,
仅仅只是因为社区的开发者换人了~~~~
开发一个消费者的正常流程
一个正常的消费逻辑需要具备以下几个步骤:
- 配置消费者客户端参数及创建相应的消费者实例。
- 订阅主题。
- 拉取消息并消费。
- 提交消费位移。
- 关闭消费者实例。
消费者可以订阅多个Topic,
consumer.subscribe(Arrays.asList("t1","t2"))),
如果订阅多次,后面的会覆盖前面的,
所以取消订阅其实也可以去订阅一个空集合。
订阅支持正则表达式:
consumer.subscribe(Pattern.compile("topic .*"));
这样订阅后,如果kafka后面新增了满足该正则的 Topic也会被该消费者消费
消费者也可以直接订阅某个分区的数据,
这里我们贴下代码,如下:
List<TopicPartition> partitions = new ArrayList<>();
// 查询kafka分区信息
List<Partitioninfo> partitioninfos = consumer.partitionsFor( topic );
if (partitioninfos != null) {
for (Partitioninfo tpinfo : partitioninfos) {
partitions.add(new TopicPartition( tpinfo.topic(), tpinfo.partition() )) ;
consumer.assign( partitions ) ;
值得注意的是:
subscribe订阅是具有分区在均衡能力的,
而 assign 是没有的
这里我们只是简单的过了一下 消费者,
因为不是本文的重点,
如果要详细了解的话,
还是去看看这篇 kafka 生产者使用详解
Offset 提交
这里指的是消费者消费的位移,
而不是Kafka端储存的消息的 offset,
这其中的区别希望读者清楚,不要混淆了。
对于offset 的提交,
我们要清楚一点
如果我们消费到了 offset=x 的消息
那么提交的应该是 offset=x+1,
而不是 offset=x
kafka的提交方式分为两种:
自动提交
在Kafka 中默认的消费位移的提交方式是自动提交,
这个由消费者客户端参数enable.auto.commit
配置,
默认值为true。
当然这个默认的自动提交不是每消费一条消息就提交一次,
而是定期提交,
这个定期的周期时间由客户端参数auto.commit.interval.ms
配置,
默认值为5 秒,
此参数生效的前提是enable.auto.commit
参数为true。
自动位移提交的动作是在poll()方法的逻辑里完成的,
在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,
如果可以,那么就会提交上一轮消费的位移。
手动提交
-
commitSync() 同步提交
-
批量提交
该方式的最大问题在于数据是批量处理,
当部分数据完成消费,
还没来得及提交offset就被中断,
则会使得下次消费会重复消费那部分已经消费过的数据。consumer.commitSync()
会在消费完数据后,
将消费完消费的offset+1
提交.
直接使用如下:final int minBatchSize = 200; List<ConsumerRecord> buffer= new ArrayList<>() ; while ( isRunning.get() ) { ConsumerRecords<String , String> records = consumer . poll(1000) ; for (ConsumerRecord<String , String> record : records) { buffer.add(record); if (buffer.size() >= minBatchSize) { //do some logical processing with buffer . consumer.commitSync() ; buffer.clear(); } }
-
单条消息提交一次
该方式每消费一次,就保存一次。
虽然在很大程度上避免了重复消费,
但是其性能是极其低下的,
基本不在企业级考虑的范围,
并且也不是完全的能做到精准一次消费while ( isRunning. get () ) { ConsumerRecords<String , String> records= consumer.poll(1000) ; for (ConsumerRecord<String , String> record : records) { //do some logical processing. //读取消费的消息的 offset long offset= record.offset() ; TopicPartition partition =new TopicPartition(record.topic() , record.partition()) ; // 提交位移 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1))) ; } }
-
按分区提交
该方式其实是综合了 批量提交 和 单条消息提交一次,
按分区的小批次提交,
如果你要使用同步提交的方式,
那么建议你使用该方式try { while (isRunning.get() ) { ConsumerRecords<String , String> records= consumer .poll(1000); for (TopicPartition partition : records.partitions( )) { //取出每个分区的消息 List<ConsumerRecord<String, String> partitionRecords = records . records(partition) for (ConsumerRecord<String , String> record : partitionRecords) { //消费该分区的消息***** //********* //将该分区的 offset 提交 long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1) .offset() ; consumer.commitSync(Collections.singletonMap ( partition , new OffsetAndMetadata(lastConsumedOffset + 1)) ); } } } }finally { consumer.close(); }
-
-
commitAsync() 异步提交
//三个重载方法 public void commitAsync() public void commitAsync(OffsetCommitCallback callback) public void commitAsync(final Map<TopicPartition , OffsetAndMetadata> offsets ,OffsetCommitCallback callback)
commitAsync
属于异步提交,
也就是不会阻塞线程,
比起同步提交commitSync
具有更好的性能。
这里我们主要来讨论下OffsetCommitCallback callback
回调的使用,
理解起来很简单,我们每提交一次 Offset,
callback 都会告诉我们是否提交成功。
那么如果我们提交失败了怎么办呢??-
一般的想法就是:失败了?那重新提交呗。
这种方式是否可行?我们看下面这个列子。
如果一个消费者消费到了 offset=10,
我们就异步提交了 offset=11,
继续拉取消息 offset=11-20,
这个时候 提交的 offset=11 还没有返回成功,
我们提交 offset=21,
返回 offset=21 提交成功。
OK,现在提交 offset=1的那条消息返回了,
并且是失败的,
那么如果你去重试,
提交 offset=11 就会覆盖掉 已经提交的 offset=21
很明显这不是我们想要的。 -
正确的做法:
这个时候需要客户端维护一个序列号,
每次提交成功都 +1,
重试的时候进行对比,
不合法就不需要重试了。
当然实际情况,
一般提交offset不会失败,
并且就算失败一次也不会有问题,
因为后面每次消费一样会进行offset提交,
而对于消费者正常退出,
我们可以使用,commitSync
同步提交,
保证offset的正确。try { while(isRunning.get()) { //poll records and do some log 工cal processing . consumer . commitAsync() ; } ) finally { try { consumer.commitSync() ; ) finally { consumer.close() ; }}
-
再均衡导致的重复消费:
再均衡发生的时候也可能会导致消费者的offset来不及提交,
这时候我们需要在监听到再均衡发生的时候进行一次offset提交://该对象需要保存该消费者消费的分区的最新的 offset //本段代码中没有体现,可以在消费数据之后 进行更新该对象 Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ; consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () { //发生在 再均衡 之前,并且消费者停止读取消息的时候 @Override public void onPartitionsRevoked(Collection<TopicPart ition> partitions) { consume.commitSync(currentOffsets) ; currentOffsets.clear(); } @Override public void onPartitions Assigned(Collection<TopicPartition > partitions) { //do nothing . } } );
-
最后,我们来总结下:
-
一般来说,我们不会使用自动提交的方式管理 offset,
虽然简单,但是缺乏很好的控制,
不过如果能满足业务要求,
那么还是果断的使用起来吧 -
对于手动提交,
一般我们都是使用异步提交的方式,
在考虑准确的消费的情况下,兼顾的效率。 -
同步提交一般用来辅助异步提交,
对于一些特殊情况,保证offset的正确提交。 -
我们考虑到了再均衡的影响,并做了相关的处理
-
对于消费者异常退出 和 崩溃:
很遗憾的是如果出现异常和崩溃,
我们的消费还是很难做到精准的一次消费,
不过一般来说,
以上这些方法是绝对满足大部分企业大部分的业务的需求。
如果你实在要保证精准的一次消费,
你可能还需要一些其他的辅助,
比如:消费和提交 当做一次事务,
或者 重复消费是幂等 等等方式。要精准一次消费,
还得依靠开发人员来自己保证,
当然,如果你使用 Kafka 的stream 方式消费,
是可以做到精准一次消费的,
不过这不在本文的讨论范围了...