Kafka Consumer

2019-05-16  本文已影响0人  ands999
新版本consumer

Kafka检查点机制(checkpointing)会定期对offset进行持久化,从而简化应答机制。

位移(offset)

这里的位移指的是consumer端的offset,不同于分区日志中的位移。
每个consumer实例都会为它消费的分区维护属于自己的位置信息,来记录当前消费了多少条消息,即分区中当前最新消费消息的位置。这即是位移(offset)。
把消费端的位移保存在服务器端,会有以下三个问题:

位移提交

consumer定期向Kafka集群汇报消费数据的进度,这被称为位移提交(offset submit)。
旧版本consumer会定期把位移信息提交到ZK下的固定节点上。
新版本consumer把位移信息提交到Kafka的内部topic(__consumer_offsets)上。有50个分区。
Kafka日志文件目录中至少包括一个日志文件(.log)和两个索引文件(.index和.timeindex)。

位移管理

offset是实现消息交付语义保证的基石。
常见3种消息交付语义保证:

从0.11.0.0版本开始正式支持事务以及精确一次处理语义。

新版本consumer位移管理

consumer在Kafka集群的所有broker中选择一个broker作为消费者组的coordinator,用于实现组成员管理、消费分配方案制定以及提交位移等。选择coordinator的依据是内部topic(__consumer_offsets)。
consumer提交位移是通过向coordinator发送位移提交请求。
默认情况下,consumer是自动提交位移。
手动提交位移分为同步和异步两种方式。

消息轮询

consumer是用来读取消息的,而且要能够同时读取多个topic的多个分区的消息。若要实现并行的消息读取,一种方法是使用多线程,为每个要读取的分区都创建一个专有的线程去消费(旧版consumer模式)。另一种方法是采用类似于epoll或select等,即使用一个线程同时管理多个Socket连接,同时与多个broker通信,实现消息的并行读取(新版consumer模式)。
一旦consumer订阅了topic。所有的消费逻辑包括rebalance、消息获取、coordinator管理、异步任务结果的处理和位移提交等都会在poll方法的中被执行。这样可以在一个线程管理所有的consumer I/O操作。
当poll方法被首次调用时,新的消费者组会被创建并根据对应的位移重设策略(auto.offest.reset)来设定消费者组的位移。一旦consumer开始提交位移。每个后续的rebalance完成后都会将位置设置为上次已提交的位移。
一定要在finally中手动关闭consumer,这样不仅可以清除consumer创建的Socket资源,还会通知消费者组coordinator主动离组从而更快开启新一轮rebalance。
新版本的consumer不是线程安全的。但可以安全的在另一个线程中调用consumer.wakeup()。
poll方法使用建议:

消费者组

消费者使用消费者组(即group.id)来标记自己,topic的消息都会被发送到消费者组的一个消费者实例上。

consumer group是用于实现伸缩性、容错性的consumer机制。组内多个consumer实例可以同时读取消息。某个consumer出现故障,consumer group会立即将已崩溃consumer负责的分区转交给其他consumer,从而保证整个group可以继续工作,不丢失数据。
Kafka目前只能提供单个分区内的消息顺序,而不能维护topic级别的消息顺序,如果要实现topic级别的消息读取顺序,只能通过使consumer group中只有一个consumer实例的方式来间接实现。

消费者组重平衡(consumer group rebalance)

consumer group的rebalance本质上是一组协议,规定了消费者组达成分配订阅的topic的所有分区一致结果的过程。
和旧版本consumer依托于ZK进行rebalance不同,新版本consumer使用Kafka的全新组协调协议。对于每个组而言,Kafka的某个broker会被选举为组协调者。coordinator负责对组的状态进行管理,它的主要职责是当新成员到达时促成组内所有成员达成新的分区分配方案,即coordinator负责对组执行rebalance操作。
发生下面变更,会触发rebalance:

consumer的3种分区分配策略:range策略(新版本默认)、round-robin策略和sticky策略。

rebalance generation

隔离每次rebalance的数据,rebalance generation用于标识某次rebalance。有利于防止无效的offset提交。

rebalance流程

consumer group在执行rebalance前需要确定coordinator所在的broker,并创建与该broker相互通信的Socket连接。确定coordinator的算法与确定offset被提交到__consumer_offsets目标分区的算法是相同的。算法如下:

rebalance监听器

rebalance监听器常用于手动提交位移到第三方存储或者在rebalance前后执行审计操作。

解序列化
多线程消费实例
consumer group状态机

新版本consumer依赖于broker端的组协调者(coordinator)来管理组内的所有consumer实例并负责把分配方案下发到每个consumer上。分配方案是由组内的leader consumer根据指定的分区分配策略制定的。

group管理协议

coordinator的组管理协议分两个阶段,即组成员加入阶段和状态同步阶段。第一个阶段用于为group指定active成员并从中选出leader consumer。第二个阶段则让leader consumer制定分配方案并同步到其他组成员中。
可以理解为,第一阶段收集所有consumer的topic订阅信息,第二个阶段利用上面信息给consumer分配要消费的分区。

参考

《Apache Kafka实战》

上一篇 下一篇

猜你喜欢

热点阅读