无镜--kafka之消费者(七)
书接上回。在写完上一篇文章后,回想内容时,其实还有疑问还没有解开:消费组状态从等待同步状态转换到准备再平衡状态,会不会生成延迟对象?
要解开以上的疑问,就得看看协调者是怎么处理消费者的同步组请求。
协调者处理同步组请求
消费组的状态进入等待同步后,协调者就会发送加入组请求响应给所有的消费者(注意这个时候消费组中的所有消费者都发送了加入组请求,并协调者启动了心跳机制对消费者进行监控)。收到加入组请求响应后,如果是非主消费者会立即发送同步组请求,协调者接收到非主消费者的同步组请求只会设置其对应的消费者元数据中的回调方法。主消费者还没有发送同步组请求,就不会发送同步组响应给消费者,只能将回调方法暂时存放到消费者元数据中。如果是主消费者,会马上执行分区分配算法,完成后会发送同步组请求到协调者。协调者在收到主消费者的同步组请求后,这时就会发送同步组响应给发送了同步请求的消费者(注意这时有可能还有的消费者没有发送同步组请求到协调者,这个消费者的同步回调方法对象就是为空,不会收到协调者的同步组请求响应)。改变消费组的状态为稳定。
协调者在处理同步组请求时,重点分析从两种状态进入:准备再平衡和稳定(如果消费组的状态本来就是等待同步状态,这种状态本身就在等消费者的同步请求)
准备再平衡状态
从上一节的分析了解到,第一个消费者发送加入组请求时,消费组的状态会变到等待同步,并且第一个消费者会作为主消费者执行分区分配。
当有第二个消费者作为新的消费者发送加入组请求,会将消费组的状态改为准备再平衡。当第一个消费者执行完分区分配后(在这种情况下分区的分配其实是无效的,因为消费组里面又多了一个消费者)。会发送同步组请求到协调者,由于现在消费组处于准备再平衡状态,就不能接受第一个消费者的分区分配结果。这时需要返回一个错误信息给第一个消费者,让它重新发送加入组请求。
稳定状态
稳定状态下并不意味着消费组中的所有消费者都发送了同步组请求,但是都已经发送了加入组请求。协调者在没有收到主消费者的同步组请求之前,收到的任何非主消费者的同步组请求都只会设置其消费者元数据中的回调方法。当收到主消费者的同步组请求后,协调者会立即发送同步组响应给所有发了同步组请求的消费者,并之后更改消费组的状态为稳定状态。在此之后的消费者发送同步组请求,协调者会直接发送同步组响应给消费者,因为这时协调者已经存了该消费者的分区分配信息。
消费组状态转换
再聊准备再平衡
一次再平衡的正常消费组的状态变化过程是:稳定-->准备再平衡-->等待同步-->稳定。消费组从等待同步转到准备再平衡,从稳定转到准备再平衡,只要是进入准备再平衡状态都会创建一个延迟操作。协调者处理多个消费者的请求,并不会创建多个延迟的操作对象,当遇到消费组中的一个消费者发送加入组请求把消费组的状态改为准备再平衡,这时就会为消费组创建一个延迟操作对象,等消费组中其他的消费者发送加入组请求时,需要做的是去尝试完成该延迟操作对象。等消费组中所有的消费者都发送加入组请求后,就完成该延迟操作对象。更新消费组的状态为等待同步。
协调者处理同步组请求,如果是非主消费者发送同步组请求,它没有带分区分配结果,协调者在处理请求时只会更新消费者元数据的回调方法,结束流程。如果是主消费者发送的同步组请求,因为带有分区分配结果,会将分区分配结果同步到内部主题中,并设置消费者元数据的回调方法,最后会调用每个发送同步组请求的消费者的回调方法,发送带有分区结果的同步响应给消费者,改变消费组的状态为稳定。
协调者在处理加入组请求和同步组请求,会对消费组元数据进行加锁。只有协调者释放了消费组元数据的锁,才可以处理其他消费者的请求。协调者处理同一个消费者的加入组请求和同步组请求这个顺序是固定的。协调者在处理不同消费者的请求时,会出现锁等待,比如:协调者在处理第一个消费者的加入组请求,就不能同时处理其他消费者的加入组请求;协调者正在处理第一个消费者的同步组请求,就不能同时处理其他消费者的加入组请求和同步组请求(注意,在协调者把主消费者的分区分配结果保存到内部主题中时,这个时候是不需要操作消费组元数据的,此时会释放锁)。
协调者者处理不同事件,会根据状态机的状态执行不同的动作,而执行动作又会间接影响状态机的状态。
协调者处理加入组请求
协调者处理消费者的加入组请求,根据消费组的不同状态分别执行不同的分支流程:
消费组状态为准备再平衡
第一个消费者C1加入组,从稳定到准备再平衡再到等待同步;第二个消费者C2加入组,从等待同步到准备再平衡;这个情况下新的消费者C3和旧消费者C1都会向协调者发送加入组请求从准备再平衡状态进去(C3是新加入,C1是重新加入)。
新消费者先于旧消费者发送加入组
1,新消费者在旧消费者重新发送加入组之前,先发送了加入组请求,对应创建消费者元数据,并加入组到消费组元数据中。
2,旧消费者重新发送加入组请求,对应会更新消费者元数据信息。
旧消费者先于新消费者发送加入组
协调者处理旧消费者的加入组,会完成延迟操作,并更改消费组状态为等待同步。然后新消费者发送加入组请求就是从等待同步状态进入,然后改变消费组状态为准备再平衡,其他消费者重新发送加入组请求。
消费组状态为等待同步
在等待同步状态时,新消费者加入组和前面讲的过程一样。但是如果遇到旧消费者加入组就有点怪了,因为正常情况下到了等待同步状态时,消费组里面的消费者都发送了加入组后协调者才会完成延迟操作对象,并更改状态为等待同步。但是如果消费者由于原因没有收到加入组响应,它就会重新发送加入组请求,这种情况下协调者处理已知的消费者的加入组请求时并不会更改消费组状态,也不会执行新增或者更新消费者元数据的方法,只需要直接返回加入组响应给此消费者。
消费组状态为稳定
如果消费者发送了加入组请求,但是没有收到加入组请求的响应,协调者的状态会是等待同步;如果消费者发送了同步组请求,但是没有收到同步组请求的响应,协调者的状态会是稳定。协调者给每个消费者发送响应,并不保证每个消费者就一定能收到响应结果。如果消费者自己的问题,就需要重新发送请求。
消费组达到稳定状态后,表示协调者处理了主消费者发送的同步组请求,但并不意味着其他的消费者都发送了同步组请求。
非主消费者发送了加入组请求,但没有收到加入组响应,需要重新发送加入组请求,协调者处理这个消费者重新发送的加入组请求时,因为现在消费组的状态是稳定状态,所以协调者会立即返回加入组请求的响应给消费者。
步骤:
1,三个消费者发送加入组请求给协调者,消费者状态从准备再平衡到等待同步。
2,协调者返回加入组响应给三个消费者,前两个消费者收到了响应,第三个消费者没有收到响应。
3,第二个消费者是普通消费者,收到加入组响应后,立即发送同步请求,协调者处理请求只会设置其消费者元数据中的回调方法。
4,第一个消费者是主消费者,它完成了分区分配,发送同步组请求,协调者改变消费组的状态为稳定。
5,协调者返回同步组响应给前两个消费者,因为第三个消费者没有收到加入组响应,也就没有发送同步组请求,所以没有设置回调方法。
6,第三个消费者没有收到加入组响应,重新发送加入组请求。
7,协调者处理第三个消费者的加入组请求,消费组的状态是稳定,立即返回加入组响应。
8,第三个消费者不是主消费者,收到加入组响应,会立即发送同步组请求。
9,协调者处理第三个消费者的同步组请求,消费组的状态是稳定,立即返回同步组响应。
以上是非主消费者没有收到加入组响应,如果是主消费没有收到加入组响应。由于主消费者没有收到加入组响应,它就不会执行分区分配,也不会发送同步组请求。协调者也不会把消费组的状态改为稳定,依然是等待同步。只能等待主消费者重新的发送加入组请求。
步骤:
1,三个消费者发送加入组请求给协调者,消费者状态从准备再平衡到等待同步。
2,协调者返回加入组响应给三个消费者,前两个消费者收到了响应,第三个消费者没有收到响应。
3,前两个消费者是普通消费者,收到加入组响应后,立即发送同步请求,协调者处理请求只会设置其消费者元数据中的回调方法。
4,主消费者没有收到加入组响应,重新发送加入组请求。
5,协调者处理主消费者的加入组请求,消费组的状态是等待同步,立即返回加入组响应。
6,主消费者收到加入组响应,执行分区分配,并发送同步组请求。
7,协调者处理主消费者的同步组请求,返回给所有消费者同步组响应,并改变消费组的状态为稳定。
协调者先改变消费组状态为等待同步再发送加入组响应,而后是先返回同步组响应,再改变消费组状态为稳定。为什么?
我们现在不防这样来看看,如果协调者先发送加入组响应,再改变消费组的状态为等待同步:在这样的情况下,消费组的状态是准备再平衡,能够发送加入组响应就代表延迟操作可以完成,那么表示消费组里面的消费者都发送了加入组请求。如果此时新的一个消费者发送了加入组请求,现在的状态本来就是准备再平衡,所以首先不会创建延迟操作,再者会执行新消费者元数据的创建和添加到消费组元数据中。这时消费组中就会多出一个消费者,收到加入组响应的主消费者再执行分区分配是没有意义的也是不对的,并且更严重的是协调者并没有办法告知消费者,并且后面也会改变消费组的状态为等待同步,消费者也会发送同步组请求。但是新的消费者收不到加入组响应,也就不会发送同步组请求。随即消费组的状态变为了稳定。只有等到新的那个消费者发现还没有收到加入组响应,重新发送加入组请求到协调者。又执行一遍再平衡。这样的情况就增加消费者和协调者之间网络通信次数。所以协调者先改变消费组状态为等待同步再发送加入组响应可以减少消费者和协调者之间的冗余通信。
再来看看,如果协调者先把消费组改为稳定,再发送同步组响应给客户端,会发生什么?其实这个是要看协调者什么时候才会把消费组的状态变为稳定。协调者是在收到主消费者的同步组请求后,才会去更改消费组的状态为稳定。设想如果只有一个消费者的情况下,那么那个消费者就是主消费者,在这种情况下,是先修改状态为稳定,还是先发送同步组响应给客户端都没什么关系。那么如果存在多个消费者的情况下,在没有收到主消费者的同步组请求之前,先改状态和先发送同步组响应都是无意义的。因为本身没有收到主消费者的同步组请求,就没有收到分区的分配结果。只有收到了主消费者的同步组请求后,就可以发送同步组响应和修改状态为稳定了,这个时候先执行哪个动作都没什么影响。
参考资料:
Kafka技术内幕:图文详解Kafka源码设计与实现