我爱编程Kafka程序员

无镜--kafka之延迟操作

2018-11-27  本文已影响4人  绍圣

服务端在处理客户端的请求,针对不同的请求,可能不会立即返回响应结果给客户端。在处理这类请求时,服务端会为这类请求创建延迟操作对象放入延迟缓存队列中。延迟缓存的数据结构类似MAP,延迟操作对象从延迟缓存队列中完成并移除有两种方式:

1,延迟操作对应的外部事件发生时,外部事件会尝试完成延迟缓存中的延迟操作 。

2,如果外部事件仍然没有完成延迟操作,超时时间达到后,会强制完成延迟的操作 。

延迟操作接口

DelayedOperation接口表示延迟的操作对象。此接口的实现类包括延迟加入,延迟心跳,延迟生产,延迟拉取。延迟接口相关的方法:

tryComplete:尝试完成,外部事件发生时会尝试完成延迟的操作。该方法返回值为true,表示可以完成延迟操作,会调用强制完成的方法(forceComplete)。返回值为false,表示不可以完成延迟操作。

forceComplete:强制完成,两个地方调用,尝试完成方法(tryComplete)返回true时;延迟操作超时时。

run:线程运行,延迟操作超时后,会调用线程的运行方法,只会调用一次,因为超时就会发生一次。超时后会调用强制完成方法(forceComplete),如果返回true,会调用超时的回调方法。

onComplete:完成的回调方法。

onExpiration:超时的回调方法。

外部事件触发完成和超时完成都会调用forceComplete(),并调用onComplete()。forceComplete和onComplete只会调用一次。多线程下用原子变量来控制只有一个线程会调用onComplete和forceComplete。

延迟生产和延迟拉取完成时的回调方法,尝试完成的延迟操作

副本管理器在创建延迟操作时,会把回调方法传给延迟操作对象。当延迟操作完成时,在onComplete方法中会调用回调方法,返回响应结果给客户端。

创建延迟操作对象需要提供请求对应的元数据。延迟生产元数据是分区的生产结果;延迟拉取元数据是分区的拉取信息。

创建延迟的生产对象之前,将消息集写入分区的主副本中,每个分区的生产结果会作为延迟生产的元数据。创建延迟的拉取对象之前,从分区的主副本中读取消息集,但并不会使用分区的拉取结果作为延迟拉取的元数据,因为延迟生产返回给客户端的响应结果可以直接从分区的生产结果中获取,而延迟的拉取返回给客户端的响应结果不能直接从分区的拉取结果中获取。

元数据包含返回结果的条件是:从创建延迟操作对象到完成延迟操作对象,元数据的含义不变。对于延迟的生产,服务端写入消息集到主副本返回的结果是确定的。是因为ISR中的备份副本还没有全部发送应答给主副本,才会需要创建延迟的生产。服务端在处理备份副本的拉取请求时,不会改变分区的生产结果。最后在完成延迟生产的操作对象时,服务端就可以把 “创建延迟操作对象” 时传递给它的分区生产结果直接返回给生产者 。对应延迟的拉取,读取了主副本的本地日志,但是因为消息数量不够,才会需要创建延迟的拉取,而不用分区的拉取结果而是用分区的拉取信息作为延迟拉取的元数据,是因为在尝试完成延迟拉取操作对象时,会再次读取主副本的本地日志,这次的读取有可能会让消息数量达到足够或者超时,从而完成延迟拉取操作对象。这样创建前和完成时延迟拉取操作对象的返回结果是不同的。但是拉取信息不管读取多少次都是一样的。

延迟的生产的外部事件是:ISR的所有备份副本发送了拉取请求;备份副本的延迟拉取的外部事件是:追加消息集到主副本;消费者的延迟拉取的外部事件是:增加主副本的最高水位。

尝试完成延迟的生产

服务端处理生产者客户端的生产请求,将消息集追加到对应主副本的本地日志后,会等待ISR中所有的备份刚本都向主副本发送应答 。生产请求包括多个分区的消息集,每个分区都有对应的ISR集合。当所有分区的ISR副本都向对应分区的主副本发送了应答,生产请求才能算完成。生产请求中虽然有多个分区,但是延迟的生产操作对象只会创建一个。

判断分区的ISR副本是否都已经向主副本发送了应答,需要检查ISR中所有备份副本的偏移量是否到了延迟生产元数据的指定偏移量(延迟生产的元数据是分区的生产结果中包含有追加消息集到本地日志返回下一个偏移量)。所以ISR所有副本的偏移量只要等于元数据的偏移量,就表示备份副本向主副本发送了应答。由于当备份副本向主副本发送拉取请求,服务端读取日志后,会更新对应备份副本的偏移量数据。所以在具体的实现上,备份副本并不需要真正发送应答给主副本,因为主副本所在消息代理节点的分区对象已经记录了所有副本的信息,所以尝试完成延迟的生产时,根据副本的偏移量就可以判断备份副本是否发送了应答。进而检查分区是否有足够的副本赶上指定偏移量,只需要判断主副本的最高水位是否等于指定偏移量(最高水位的值会选择ISR中所有备份副本中最小的偏移量来设置,最小的值都等于了指定偏移量,那么就代表所有的ISR都发送了应答)。

总结:服务端创建的延迟生产操作对象,在尝试完成时根据主副本的最高水位是否等于延迟生产操作对象中元数据的指定偏移量来判断。具体步骤:

1,服务端处理生产者的生产请求,写入消息集到主副本的本地日志。

2,服务端返回追加消息集的下一个偏移量,并且创建一个延迟生产操作对象。元数据为分区的生产结果(其中就包含下一个偏移量的值)。

3,服务端处理备份副本的拉取请求,首先读取主副本的本地日志。

4,服务端返回给备份副本读取消息集,并更新备份副本的偏移量。

5,选择ISR备份副本中最小的偏移量更新主副本的最高水位。

6,如果主副本的最高水位等于指定的下一个偏移量的值,就完成延迟的生产。

尝试完成延迟的拉取

服务端处理消费者或备份副本的拉取请求,如果创建了延迟的拉取操作对象,一般都是客户端的消费进度能够一直赶上主副本。比如备份副本同步主副本的数据,备份副本如果一直能赶上主副本,那么主副本有新消息写入,备份副本就会马上同步。但是针对备份副本已经消费到主副本的最新位置,而主副本并没有新消息写入时:服务端没有立即返回空的拉取结果给备份副本,这时会创建一个延迟的拉取操作对象,如果有新的消息写入,服务端会等到收集足够的消息集后,才返回拉取结果给备份副本,有新的消息写入,但是还没有收集到足够的消息集,等到延迟操作对象超时后,服务端会读取新写入主副本的消息后,返回拉取结果给备份副本(完成延迟的拉取时,服务端还会再读取一次主副本的本地日志,返回新读取出来的消息集)。

客户端的拉取请求包含多个分区,服务端判断拉取的消息大小时,会收集拉取请求涉及的所有分区。只要消息的总大小超过拉取请求设置的最少字节数,就会调用forceComplete()方法完成延迟的拉取

外部事件尝试完成延迟的生产和拉取操作时的判断条件:

来之《Kafka技术内幕:图文详解Kafka源码设计与实现》

拉取偏移量是指拉取到消息大小。对于备份副本的延迟拉取,主副本的结束偏移量是它的最新偏移量(LEO)。对于消费者的拉取延迟,主副本的结束偏移量是它的最高水位(HW)。备份副本要时刻与主副本同步,消费者只能消费到主副本的最高水位。


生产请求和拉取请求的延迟缓存

客户端的一个请求包括多个分区,服务端为每个请求都会创建一个延迟操作对象。而不是为每个分区创建一个延迟操作对象。服务端的“延迟操作缓存”管理了所有的“延迟操作对象”,缓存的键是每一个分区,缓存的值是分区对应的延迟操作列表。

一个客户端请求对应一个延迟操作,一个延迟操作对应多个分区。在延迟缓存中,一个分区对应多个延迟操作。延迟缓存中保存了分区到延迟操作的映射关系。

根据分区尝试完成延迟的操作,因为生产者和消费者是以分区为最小单位来追加消息和消费消息。虽然延迟操作的创建是针对一个请求,但是一个请求中会有多个分区,在生产者追加消息时,一个生产请求总的不同分区包含的消息是不一样的。这样追加到分区对应的主副本的本地日志中,有的分区就可以去完成延迟的拉取,但是有的分区有可能还达不到完成延迟拉取操作的条件。同样完成延迟的生产也一样。所以在延迟缓存中要以分区为键来存储各个延迟操作。

由于一个请求创建一个延迟操作,一个请求又会包含多个分区,所以不同的延迟操作可能会有相同的分区。在加入到延迟缓存时,每个分区都对应相同的延迟操作。外部事件发生时,服务端会以分区为粒度,尝试完成这个分区中的所有延迟操作 。 如果指定分区对应的某个延迟操作可以被完成,那么延迟操作会从这个分区的延迟操作列表中移除。但这个延迟操作还有其他分区,其他分区中已经被完成的延迟操作也需要从延迟缓存中删除。但是不会立即被删除,因为分区作为延迟缓存的键,在服务端的数量会很多。只要分区对应的延迟操作完成了一个,就要立即检查所有分区,对服务端的性能影响比较大。所以采用一个清理器,会负责定时地清理所有分区中已经完成的延迟操作。

副本管理器针对生产请求和拉取请求都分别有一个全局的延迟缓存。生产请求对应延迟缓存中存储了延迟的生产。拉取请求对应延迟缓存中存储了延迟的拉取。

延迟缓存提供了两个方法:

tryCompleteElseWatch():尝试完成延迟的操作,如果不能完成,将延迟操作加入延迟缓存中。一旦将延迟操作加入延迟缓存的监控,延迟操作的每个分区都会监视该延迟操作。换句话说就是每个分区发生了外部事件后,都会去尝试完成延迟操作。

checkAndComplete():参数是延迟缓存的键,外部事件调用该方法,根据指定的键尝试完成延迟缓存中的延迟操作。

延迟缓存在调用tryCompleteElseWatch方法将延迟操作加入延迟缓存之前,会先尝试一次完成延迟的操作,如果不能完成,会调用方法将延迟操作加入到分区对应的监视器,之后还会尝试完成一次延迟操作,如果还不能完成,会将延迟操作加入定时器。如果前面的加入过程中,可以完成延迟操作后,那么就可以不用加入到其他分区的延迟缓存了。

延迟操作不仅存在于延迟缓存中,还会被定时器监控。定时器的目的是在延迟操作超时后,服务端可以强制完成延迟操作返回结果给客户端。延迟缓存的目的是让外部事件去尝试完成延迟操作。

监视器

延迟缓存的每个键都有一个监视器(类似每个分区有一个监视器),以链表结构来管理延迟操作。当外部事件发生时,会根据给定的键,调用这个键的对应监视器的tryCompleteWatch()方法,尝试完成监视器中所有的延迟操作。监视器尝试完成所有延迟操作的过程中,会调用每个延迟操作的tryComplete()方法,判断能否完成延迟的操作。如果能够完成,就从链表中删除对应的延迟操作。

清理线程

清理线程的作用是清理所有监视器中已经完成的延迟操作。

定时器

服务端创建的延迟操作会作为一个定时任务,加入定时器的延迟队列中。当延迟操作超时后,定时器会将延迟操作从延迟队列中弹出,并调用延迟操作的运行方法,强制完成延迟的操作。

定时器使用延迟队列管理服务端创建的所有延迟操作,延迟队列的每个元素是定时任务列表,一个定时任务列表可以存放多个定时任务条目。服务端创建的延迟操作对象,会先包装成定时任务条目,然后加入延迟队列指定的一个定时任务列表。延迟队列是定时器中保存定时任务列表的全局数据结构,服务端创建的延迟操作不是直接加入定时任务列表,而是加入时间轮。

时间轮和延迟队列的关系:

1,定时器拥有一个全局的延迟队列和时间轮,所有时间轮公用一个计数器。

2,时间轮持有延迟队列的引用。

3,定时任务条目添加到时间轮对应的时间格(槽)(槽中是定时任务列表)中,并且把该槽表也会加入到延迟队列中。

3,一个线程会将超时的定时任务列表会从延迟队列的poll方法弹出。定时任务列表超时并不一定代表定时任务超时,将定时任务重新加入时间轮,如果加入失败,说明定时任务确实超时,提交给线程池执行。

4,延迟队列的poll方法只会弹出超时的定时任务列表,队列中的每个元素(定时任务列表)按照超时时间排序,如果第一个定时任务列表都没有过期,那么其他定时任务列表也一定不会超时。

延迟操作本身的失效时间是客户端请求设置的,延迟队列的元素(每个定时任务列表)也有失效时间,当定时任务列表中的getDelay()方法返回值小于等于0,就表示定时任务列表已经过期,需要立即执行。

如果当前的时间轮放不下加入的时间时,就会创建一个更高层的时间轮。定时器只持有第一层的时间轮的引用,并不会持有更高层的时间轮。因为第一层的时间轮会持有第二层的时间轮的引用,第二层会持有第三层的时间轮的引用。定时器将定时任务加入到当前时间轮,要判断定时任务的失效时间首是否在当前时间轮的范围内,如果不在当前时间轮的范围内,则要将定时任务上升到更高一层的时间轮中。时间轮包含了定时器全局的延迟队列。

时间轮中的变量:tickMs=1:表示一格的长度是1毫秒;wheelSize=20表示一共20格,时间轮的范围就是20毫秒,定时任务的失效时间小于等于20毫秒的都会加入到这一层的时间轮中;interval=tickMs*wheelSize=20,如果需要创建更高一层的时间轮,那么低一层的时间轮的interval的值作为高一层数据轮的tickMs值;currentTime当前时间轮的当前时间,往前移动时间轮,主要就是更新当前时间轮的当前时间,更新后重新加入定时任务条目。

关于时间轮可以查看:无镜--kafka之服务端--时间轮 - 简书

参考资料:

Kafka技术内幕:图文详解Kafka源码设计与实现

上一篇下一篇

猜你喜欢

热点阅读