dubbo

Dubbo——时间轮(Time Wheel)算法应用

2021-03-24  本文已影响0人  小波同学

定时任务

Netty、Quartz、Kafka 以及 Linux 都有定时任务功能。

JDK 自带的 java.util.TimerDelayedQueue 可实现简单的定时任务,底层用的是堆,存取复杂度都是 O(nlog(n)),但无法支撑海量定时任务。

在任务量大、性能要求高的场景,为了将任务存取及取消操作时间复杂度降为 O(1),会采用时间轮算法。

什么是时间轮

时间轮模型及其应用

一种高效批量管理定时任务的调度模型。一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务。

指针周期性跳动,跳动到一个槽位,就执行该槽位的定时任务。

Hashed Timing Wheel 结构示意图

时间轮应用:

计时器维护代价高,如果

需要高效的定时器算法以减少总体中断的开销。
单层时间轮的容量和精度都是有限的,对于精度要求特别高、时间跨度特别大或是海量定时任务需要调度的场景,通常会使用多级时间轮以及持久化存储与时间轮结合的方案。

Dubbo的时间轮结构

Dubbo 时间轮实现位于 dubbo-common 模块的 org.apache.dubbo.common.timer 包,下面我们就来分析时间轮涉及的核心接口和实现。

TimerTask

在 Dubbo 中,所有定时任务都要实现 TimerTask 接口。只定义了一个 run() 方法,入参是一个 Timeout 接口对象。

public interface TimerTask {

    void run(Timeout timeout) throws Exception;
}

Timeout

Timeout 对象与 TimerTask 对象一一对应,类似线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。
通过 Timeout 对象,不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务)。

/**
 * Timeout 对象与 TimerTask 对象一一对应,两者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。
 * 通过 Timeout 对象,我们不仅可以查看定时任务的状态,还可以操作定时任务
 */
public interface Timeout {

    /**
     * 返回创建自己的定时器
     */
    Timer timer();

    /**
     * 返回关联的定时任务
     */
    TimerTask task();

    /**
     * 返回定时任务是否到期
     */
    boolean isExpired();

    /**
     * 返回定时任务是否被取消
     */
    boolean isCancelled();

    /**
     * 尝试取消定时任务,如果任务已经被执行或已经取消,方法正常返回.
     *
     * @return True if the cancellation completed successfully, otherwise false
     */
    boolean cancel();
}

Timer

Timer 接口定义了定时器的基本行为,核心是 newTimeout() :提交一个定时任务(TimerTask)并返回关联的 Timeout 对象,类似于向线程池提交任务。

public interface Timer {

    /**  
     * 提交一个定时任务(TimerTask),类似于向线程池提交任务
     * @return 返回关联的 Timeout 对象
     * @throws IllegalStateException      if this timer has been {@linkplain #stop() stopped} already
     * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
     *                                    can cause instability in the system.
     */
    Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

    /**
     * @return 方法返回被取消的任务对应的Timeout集合
     */
    Set<Timeout> stop();

    /**
     * 判断定时器是否停止
     *
     * @return true for stop
     */
    boolean isStop();
}

HashedWheelTimeout

HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类。HashedWheelTimeout 扮演了两个角色:

HashedWheelTimeout 中的核心字段如下:

HashedWheelTimeout 中的核心方法有:

/**
 * HashedWheelTimeout 是 Timeout 接口的唯一实现
 * 1. 时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器
 * 2. 定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄,用于在时间轮外部查看和控制定时任务
 */
private static final class HashedWheelTimeout implements Timeout {

    /** 状态机 */
    private static final int ST_INIT = 0;
    private static final int ST_CANCELLED = 1;
    private static final int ST_EXPIRED = 2;
    
    /** 实现 state 状态变更的原子性 */
    private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

    // 所在的时间轮,定时器
    private final HashedWheelTimer timer;
    // 关联的定时任务
    private final TimerTask task;
    private final long deadline;

    @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
    private volatile int state = ST_INIT;

    /**
     * 当前任务剩余的时钟周期数,由Worker计算.
     * 时间轮所能表示的时间长度是有限的,在任务到期时间与当前时刻的时间差,超过时间轮单圈能表示的时长,就出现了套圈的情况,需要该字段值表示剩余的时钟周期
     * transferTimeoutsToBuckets() before the HashedWheelTimeout will be added to the correct HashedWheelBucket.
     */
    long remainingRounds;

    /**
     * 在 HashedWheelTimerBucket构建双向链表
     * 只要worker线程操作,不需要同步原语进行同步
     */
    HashedWheelTimeout next;
    HashedWheelTimeout prev;

    /**
     * 所添加的桶
     */
    HashedWheelBucket bucket;

    /**
     * 构造函数,HashedWheelTimer的newTimeout方法中调用
     */
    HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
        this.timer = timer;
        this.task = task;
        this.deadline = deadline;
    }

    @Override
    public Timer timer() {
        return timer;
    }

    @Override
    public TimerTask task() {
        return task;
    }

    @Override
    public boolean cancel() {
        // only update the state it will be removed from HashedWheelBucket on next tick.
        if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
            return false;
        }
        // If a task should be canceled we put this to another queue which will be processed on each tick.
        // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
        // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
        timer.cancelledTimeouts.add(this);
        return true;
    }

    void remove() {
        HashedWheelBucket bucket = this.bucket;
        if (bucket != null) {
            bucket.remove(this);
        } else {
            timer.pendingTimeouts.decrementAndGet();
        }
    }

    public boolean compareAndSetState(int expected, int state) {
        return STATE_UPDATER.compareAndSet(this, expected, state);
    }

    public int state() {
        return state;
    }

    @Override
    public boolean isCancelled() {
        return state() == ST_CANCELLED;
    }

    @Override
    public boolean isExpired() {
        return state() == ST_EXPIRED;
    }

    /**
     * 设置到期,并调用TimeTask的run方法
     */
    public void expire() {
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
            return;
        }

        try {
            task.run(this);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
            }
        }
    }

    @Override
    public String toString() {
        final long currentTime = System.nanoTime();
        long remaining = deadline - currentTime + timer.startTime;
        String simpleClassName = ClassUtils.simpleClassName(this.getClass());

        StringBuilder buf = new StringBuilder(192)
                .append(simpleClassName)
                .append('(')
                .append("deadline: ");
        if (remaining > 0) {
            buf.append(remaining)
                    .append(" ns later");
        } else if (remaining < 0) {
            buf.append(-remaining)
                    .append(" ns ago");
        } else {
            buf.append("now");
        }

        if (isCancelled()) {
            buf.append(", cancelled");
        }

        return buf.append(", task: ")
                .append(task())
                .append(')')
                .toString();
    }
}

HashedWheelBucket

HashedWheelBucket 是时间轮中的一个槽,时间轮中的槽实际上就是一个用于缓存和管理双向链表的容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。

HashedWheelBucket 持有双向链表的首尾两个节点,分别是 head 和 tail 两个字段,再加上每个 HashedWheelTimeout 节点均持有前驱和后继的引用,这样就可以正向或是逆向遍历整个双向链表了。

HashedWheelBucket 中的核心方法:

/**
 * HashedWheelBucket 是时间轮中的一个桶
 * 时间轮中的桶实际上就是一个用于缓存和管理双向链表的容器,
 * 双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定时任务。
 */
private static final class HashedWheelBucket {

    /** 双向链表结构 */
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;

    /**
     * 尾插Timeout
     */
    void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }

    /**
     * 调用所有到期的HashedWheelTimeout的expire,移除cancel的Timeout
     */
    void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        // process all timeouts
        while (timeout != null) {
            HashedWheelTimeout next = timeout.next;
            if (timeout.remainingRounds <= 0) {
                next = remove(timeout);
                if (timeout.deadline <= deadline) {
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
            } else if (timeout.isCancelled()) {
                // timeout是取消状态,直接从桶中干掉
                next = remove(timeout);
            } else {
                // 否则减1轮表盘
                timeout.remainingRounds--;
            }
            timeout = next;
        }
    }

    /**
     * 从桶中移除timeout,并返回下一个
     * @param timeout
     * @return
     */
    public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
        HashedWheelTimeout next = timeout.next;
        // remove timeout that was either processed or cancelled by updating the linked-list
        if (timeout.prev != null) {
            timeout.prev.next = next;
        }
        if (timeout.next != null) {
            timeout.next.prev = timeout.prev;
        }

        if (timeout == head) {
            // if timeout is also the tail we need to adjust the entry too
            if (timeout == tail) {
                tail = null;
                head = null;
            } else {
                head = next;
            }
        } else if (timeout == tail) {
            // if the timeout is the tail modify the tail to be the prev node.
            tail = timeout.prev;
        }
        // null out prev, next and bucket to allow for GC.
        timeout.prev = null;
        timeout.next = null;
        timeout.bucket = null;
        timeout.timer.pendingTimeouts.decrementAndGet();
        return next;
    }

    /**
     * 清空桶,并返回所有没到期或没取消的Timeouts.
     */
    void clearTimeouts(Set<Timeout> set) {
        for (; ; ) {
            HashedWheelTimeout timeout = pollTimeout();
            if (timeout == null) {
                return;
            }
            if (timeout.isExpired() || timeout.isCancelled()) {
                continue;
            }
            set.add(timeout);
        }
    }

    /**
     * 取出头结点
     * @return
     */
    private HashedWheelTimeout pollTimeout() {
        HashedWheelTimeout head = this.head;
        if (head == null) {
            return null;
        }
        HashedWheelTimeout next = head.next;
        if (next == null) {
            tail = this.head = null;
        } else {
            this.head = next;
            next.prev = null;
        }

        // null out prev and next to allow for GC.
        head.next = null;
        head.prev = null;
        head.bucket = null;
        return head;
    }
}

HashedWheelTimer

HashedWheelTimer 是 Timer 接口的实现,它通过时间轮算法实现了一个定时器。HashedWheelTimer 会根据当前时间轮指针选定对应的槽(HashedWheelBucket),从双向链表的头部开始迭代,对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减一操作。

HashedWheelTimer 的核心属性:

时间轮对外提供了一个 newTimeout() 接口用于提交定时任务,在定时任务进入到 timeouts 队列之前会先调用 start() 方法启动时间轮,其中会完成下面两个关键步骤:

之后根据 startTime 计算该定时任务的 deadline 字段,最后才能将定时任务封装成 HashedWheelTimeout 并添加到 timeouts 队列。

下面分析时间轮指针一次转动的全流程:

上述核心逻辑在 HashedWheelTimer$Worker.run() 方法中,

Worker

private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
    
    /** cnt滴答数 */
    private long tick;

    /**
     * 时间轮指针一次转动的全流程。
     *
     * 1. 时间轮指针转动,时间轮周期开始。
     * 2. 清理用户主动取消的定时任务,这些定时任务在用户取消时,会记录到 cancelledTimeouts 队列中。
     *      在每次指针转动的时候,时间轮都会清理该队列。
     * 3. 将缓存在 timeouts 队列中的定时任务转移到时间轮中对应的槽中。
     * 4. 根据当前指针定位对应槽,处理该槽位的双向链表中的定时任务。
     * 5. 检测时间轮的状态。如果时间轮处于运行状态,则循环执行上述步骤,不断执行定时任务。
     *      如果时间轮处于停止状态,则执行下面的步骤获取到未被执行的定时任务并加入 unprocessedTimeouts 队列:
     *      遍历时间轮中每个槽位,并调用 clearTimeouts() 方法;对 timeouts 队列中未被加入槽中循环调用 poll()。
     * 6. 最后再次清理 cancelledTimeouts 队列中用户主动取消的定时任务。
     */
    @Override
    public void run() {
        // Initialize the startTime.
        startTime = System.nanoTime();
        if (startTime == 0) {
            // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
            startTime = 1;
        }

        // Notify the other threads waiting for the initialization at start().
        startTimeInitialized.countDown();

        // Worker启动时循环执行,相当于时间轮不停地转动
        do {
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                // 计算指针指向的桶
                int idx = (int) (tick & mask);
                // 先处理被取消的任务
                processCancelledTasks();
                HashedWheelBucket bucket = wheel[idx];
                // 遍历 timeouts 队列中的定时任务添加到桶中
                transferTimeoutsToBuckets();
                // 调用所有到期的HashedWheelTimeout的expire,移除cancel的Timeout
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        // 上面的循环结束后,遍历清空所有的桶,将未过期或未取消的任务保存至unprocessedTimeouts集合,便于返回给stop()方法
        for (HashedWheelBucket bucket : wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        
        // 将timeouts缓冲队列中未取消的任务也添加到unprocessedTimeouts中
        for (; ; ) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        
        // remove所有cancelledTimeouts队列中的任务
        processCancelledTasks();
    }

    private void transferTimeoutsToBuckets() {
        // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
        // adds new timeouts in a loop.
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                // Was cancelled in the meantime.
                continue;
            }

            long calculated = timeout.deadline / tickDuration;
            timeout.remainingRounds = (calculated - tick) / wheel.length;

            // 计算落入的桶index,确保不会是过去时间
            final long ticks = Math.max(calculated, tick);
            int stopIndex = (int) (ticks & mask);

            HashedWheelBucket bucket = wheel[stopIndex];
            bucket.addTimeout(timeout);
        }
    }

    private void processCancelledTasks() {
        for (; ; ) {
            HashedWheelTimeout timeout = cancelledTimeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            try {
                timeout.remove();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while process a cancellation task", t);
                }
            }
        }
    }

    /**
     * 根据startTime和tick数计算目标ns,然后等待到目标ns
     *
     * @return Long.MIN_VALUE if received a shutdown request,
     * current time otherwise (with Long.MIN_VALUE changed by +1)
     */
    private long waitForNextTick() {
        // 下一次滴答时间
        long deadline = tickDuration * (tick + 1);

        for (; ; ) {
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
                }
            }
            if (isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

    Set<Timeout> unprocessedTimeouts() {
        return Collections.unmodifiableSet(unprocessedTimeouts);
    }
}

HashedWheelTimer

/**
 * 每个滴答耗时  通常 100ms
 * 时间轮大小 通常512
 * 保持单例模式使用
 * 
 */
public class HashedWheelTimer implements Timer {

    /**
     * may be in spi?
     */
    public static final String NAME = "hased";

    private static final Logger logger = LoggerFactory.getLogger(HashedWheelTimer.class);

    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
    private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
    private static final int INSTANCE_COUNT_LIMIT = 64;
    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");

    /** 真正执行定时任务的逻辑封装这个 Runnable 对象中 */
    private final Worker worker = new Worker();
    /** 时间轮内部真正执行定时任务的线程 */
    private final Thread workerThread;

    /** worker状态机 */
    private static final int WORKER_STATE_INIT = 0;
    private static final int WORKER_STATE_STARTED = 1;
    private static final int WORKER_STATE_SHUTDOWN = 2;

    /**
     * 0 - init, 1 - started, 2 - shut down
     */
    @SuppressWarnings({"unused", "FieldMayBeFinal"})
    private volatile int workerState;

    // 每个tick的时间,时间轮精度
    private final long tickDuration;
    
    // 时间轮桶
    private final HashedWheelBucket[] wheel;
    
    // 掩码, mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟槽
    private final int mask;
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    
    /** timeouts 队列用于缓冲外部提交时间轮中的定时任务 */
    private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
    
    /** cancelledTimeouts 队列用于暂存取消的定时任务 */
    private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
    
    // 统计待定的Timeouts数量
    private final AtomicLong pendingTimeouts = new AtomicLong(0);
    private final long maxPendingTimeouts;

    /** 当前时间轮的启动时间,提交到该时间轮的定时任务的 deadline 字段值均以该时间戳为起点进行计算 */
    private volatile long startTime;

    public HashedWheelTimer() {
        this(Executors.defaultThreadFactory());
    }

    public HashedWheelTimer(long tickDuration, TimeUnit unit) {
        this(Executors.defaultThreadFactory(), tickDuration, unit);
    }

    public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
    }

    public HashedWheelTimer(ThreadFactory threadFactory) {
        this(threadFactory, 100, TimeUnit.MILLISECONDS);
    }

    public HashedWheelTimer(
            ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
        this(threadFactory, tickDuration, unit, 512);
    }

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel) {
        this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
    }

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel,
            long maxPendingTimeouts) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // wheel大小处理为2的指数,并创建时间轮——桶数组
        wheel = createWheel(ticksPerWheel);
        mask = wheel.length - 1;

        // Convert tickDuration to nanos.
        this.tickDuration = unit.toNanos(tickDuration);

        // Prevent overflow.
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }
        workerThread = threadFactory.newThread(worker);

        this.maxPendingTimeouts = maxPendingTimeouts;

        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

    @Override
    protected void finalize() throws Throwable {
        try {
            super.finalize();
        } finally {
            // This object is going to be GCed and it is assumed the ship has sailed to do a proper shutdown. If
            // we have not yet shutdown then we want to make sure we decrement the active instance count.
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                INSTANCE_COUNTER.decrementAndGet();
            }
        }
    }

    /**
     * 创建时间轮——桶数组
     * @param ticksPerWheel
     * @return
     */
    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException(
                    "ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException(
                    "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }

        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

    private static int normalizeTicksPerWheel(int ticksPerWheel) {
        int normalizedTicksPerWheel = ticksPerWheel - 1;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
        normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
        return normalizedTicksPerWheel + 1;
    }

    /**
     * 显式启动后台线程
     * 即使未调用此方法,后台线程也将根据需要自动启动。
     * 1. 确定时间轮的 startTime 字段;
     * 2. 启动 workerThread 线程,开始执行 worker 任务
     *
     * @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
     */
    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

    @Override
    public Set<Timeout> stop() {
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(
                    HashedWheelTimer.class.getSimpleName() +
                            ".stop() cannot be called from " +
                            TimerTask.class.getSimpleName());
        }

        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            // workerState can be 0 or 2 at this moment - let it always be 2.
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                INSTANCE_COUNTER.decrementAndGet();
            }

            return Collections.emptySet();
        }

        try {
            boolean interrupted = false;
            while (workerThread.isAlive()) {
                workerThread.interrupt();
                try {
                    workerThread.join(100);
                } catch (InterruptedException ignored) {
                    interrupted = true;
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } finally {
            INSTANCE_COUNTER.decrementAndGet();
        }
        
        // 返回未处理的Timeouts
        return worker.unprocessedTimeouts();
    }

    @Override
    public boolean isStop() {
        return WORKER_STATE_SHUTDOWN == WORKER_STATE_UPDATER.get(this);
    }

    /**
     * 新建Timeout,并加入缓冲队列
     */
    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        // 超过maxPendingTimeouts,直接拒绝
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // 将该timeout添加至timeouts队列中,下一个tick会进行处理
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // 防止溢出
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

    /**
     * Returns the number of pending timeouts of this {@link Timer}.
     */
    public long pendingTimeouts() {
        return pendingTimeouts.get();
    }

    private static void reportTooManyInstances() {
        String resourceType = ClassUtils.simpleClassName(HashedWheelTimer.class);
        logger.error("You are creating too many " + resourceType + " instances. " +
                resourceType + " is a shared resource that must be reused across the JVM," +
                "so that only a few instances are created.");
    }
}

Dubbo 中如何使用定时任务

在 Dubbo 中,时间轮并不直接用于周期性操作,而是只向时间轮提交执行单次的定时任务,在上一次任务执行完成的时候,调用 newTimeout() 方法再次提交当前任务,这样就会在下个周期执行该任务。即使在任务执行过程中出现了 GC、I/O 阻塞等情况,导致任务延迟或卡住,也不会有同样的任务源源不断地提交进来,导致任务堆积。

Dubbo 中对时间轮的应用主要体现在如下两个方面:

测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class HashedWheelTimerTest {

    private class PrintTask implements TimerTask {

        @Override
        public void run(Timeout timeout) {
            final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            System.out.println("task :" + LocalDateTime.now().format(formatter));
        }
    }

    @Test
    public void newTimeout() throws InterruptedException {
        final Timer timer = newTimer();
        // 每隔1s向时间轮添加任务。定时任务也是1s
        for (int i = 0; i < 10; i++) {
            timer.newTimeout(new PrintTask(), 3, TimeUnit.SECONDS);
            System.out.println("task" + i + "added into the timer");
            Thread.sleep(1000);
        }
        Thread.sleep(5000);
    }

    @Test
    public void stop() throws InterruptedException {
        final Timer timer = newTimer();
        for (int i = 0; i < 10; i++) {
            timer.newTimeout(new PrintTask(), 5, TimeUnit.SECONDS);
            Thread.sleep(100);
        }
        //stop timer
        timer.stop();

        try {
            //this will throw a exception
            timer.newTimeout(new PrintTask(), 5, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Timer newTimer() {
        // 100ms间隔的时间轮
        return new HashedWheelTimer(
                new NamedThreadFactory("dubbo-future-timeout", true),
                100,
                TimeUnit.MILLISECONDS);
    }
}

参考:
https://developer.51cto.com/art/202010/628734.htm?mobile

https://blog.csdn.net/weixin_38308374/article/details/105862201

https://wangguoping.blog.csdn.net/article/details/108293948

https://blog.csdn.net/weixin_42588665/article/details/81865156

上一篇下一篇

猜你喜欢

热点阅读