文章总结(9)— 时间轮算法
- 【方案思索】如果一台机器上有10w个定时任务,如何做到高效触发?
最简单的方案:
- 用户线程:负责定时任务的注册;
- 定时任务队列轮询线程:负责扫描任务队列上符合要求的任务,如果任务的时间戳达到规定的时刻,首先从队列中取走此任务,然后将其交给异步线程池来处理;
- 异步线程池:负责定时任务的执行;
缺点:如果有 1k 个任务,那么定时任务队列轮询线程每次都需要扫描 1k 个任务来确定哪一个任务达到规定时刻,这种轮询效率非常差,尤其是在大部分任务并没有达到规定执行时刻的情况下。
改进措施1:使用有序的队列(例如最小堆)这样可以显著提升遍历效率。插入一个定时任务的事件复杂度为 O(nlogn),普通任务队列的插入仅仅是 O(1)。
改进措施2:任务分类+多队列+并发线程,这种方式主要是试图利用现代 CPU 的多核并发性来解决遍历效率低的问题。例如我们将 10w 大小的任务队列分为 10 个任务队列,此时每一个任务队列的大小仅仅是 1w。在线程完全并发执行的情况下,将 10w 规模的问题简化为 1w 规模的问题。多并发线程轮询的副作用非常大:线程是一种宝贵资源,如果一个系统有大量的定时调度任务,那么CPU 会因为多条并发轮询线程而有着非常低的执行效率。
- 【时间轮算法】如果一台机器上有10w个定时任务,如何做到高效触发?
对于海量数据的处理,核心的处理方式是对问题分治:按照时间刻度对任务进行划分。
时间轮算法的核心是:轮询线程不再负责遍历所有任务,而是仅仅遍历时间刻度。时间轮算法好比指针不断在时钟上旋转、遍历,如果一个发现某一时刻上有任务(任务队列),那么就会将任务队列上的所有任务都执行一遍。
优点:时间轮算法可以解决遍历效率低的问题。时间轮算法中,轮询线程遍历到某一个时间刻度后,总是执行对应刻度上任务队列中的所有任务(通常是将任务扔给异步线程池来处理),而不再需要遍历检查所有任务的时间戳是否达到要求。
- 【时间刻度增多】任务有毫秒、分钟、小时、天等等时,单纯增加时间刻度会导致轮询效率低,如何去处理?
-
带有round的时间轮算法:时间轮的精度设置为秒,时间刻度个数固定为 60。每一个任务拥有一个 round 字段。
轮询线程的执行逻辑是:每隔一秒处理一个时间刻度上任务队列中的所有任务,任务的 round 字段减 1,接着判断如果 round 字段的值变为 0,那么将任务移出任务队列,交给异步线程池来执行对应任务。如果是重复执行任务,那么再将任务添加到任务队列中。
缺点:这种方式虽然简化了时间轮的刻度个数,但是并没有简化轮询线程运行效率不高的问题。时间轮每次处理一个时间刻度,就需要处理其上任务队列的所有任务。其运行效率甚至与基于普通任务队列实现的定时任务框架没有区别。
- 分层时间轮算法:类似现实的钟表,由多个环形数组组成,将任务散列到多个环形数组中。(例如:每个环形数组包含20个时间单位,表示一个时间维度(一轮),如:第一层时间轮,数组中的每个元素代表1ms,一圈就是20ms,当延迟时间大于20ms时,就“进位”到第二层时间轮,第二层中,每“一格”表示20ms,依此类推…)
- 分层时间轮的降级方式?
当时间“推进”到某个bucket时,说明该bucket中的任务在当前时间轮中的时间已经走完,需要进行“降级”,即进入更小粒度的时间轮中。
- 如何减少空推进(时间是如何推进的)?
一种直观的想法是,像现实中的钟表一样,“一格一格”地走,这样就需要有一个线程一直不停的执行,而大多数情况下,时间轮中的bucket(桶)大部分是空的,指针的“推进”就没有实质作用,因此,为了减少这种“空推进”,Kafka引入了DelayQueue,以bucket为单位入队,每当有bucket到期,即queue.poll能拿到结果时,才进行时间的“推进”,减少了 ExpiredOperationReaper 线程空转的开销。
image.png
- 有一个APP实时消息通道系统,对每个用户会维护一个APP到服务器的TCP连接,用来实时收发消息,对这个TCP连接,有这样一个需求:“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。其中,单机TCP同时在线量约在10w级别,keepalive请求包较分散大概30s一次,吞吐量约在3000qps。
结构:
- 30s超时,就创建一个index从0-30的环形队列(本质是数组)
- 环上每一个slot是一个Set<uid>,任务集合
- 同时还有一个Map<uid,index>,记录uid落在环上哪个slot中。
请求达到时:
- 从Map结构中,查找这个uid存储在哪一个slot里面;
- 从这个slot的Set结构中,删除这个uid;
- 将这个uid重新加到 Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到。
- 更新Map,这个uid对应slot的index值。
- Current Index 每秒种移动一个 slot,这个 slot 对应的 Set<uid> 中所有 uid 都应该被集体超时。