RocketMQ Runtime ServiceThread的设

2020-04-19  本文已影响0人  affe

最近正好在看Java Concurrency In Practice(以下简称JCIP), 里面的很多思想都在rocketmq runtime的源码中有所体现,因此就尝试着从ServiceThread这个类出发,看看我能从这本书里悟到多少东西。

我首先要做的是猜ServiceThread是用来解决什么问题的。在我看来,ServiceThread主要有如下几个特性

一个典型的使用场景是某个Service 直接集成ServiceThread,run()方法重写如下

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(10000);
             this.rebalanceImpl.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

这段代码就是告诉线程,每隔十秒钟跑一次rebalanceImpl.doRebalance()(当然这个函数根据业务不同而异)
然后其他地方可以显式调用wakeup(),在还没到十秒的时候后就执行rebalanceImpl.doRebalance(),比如当有worker变化的时候,我们要立刻触发一次doRebalance()

    class WorkerStatusListenerImpl implements ClusterManagementService.WorkerStatusListener {

        /**
         * When alive workers change.
         */
        @Override
        public void onWorkerChange() {
            log.info("Wake up rebalance service");
            RebalanceService.this.wakeup();
        }
    }

ServiceThread 实现了Runnable接口,但是同时也持有一个Thread对象,至于为什么要这么设计我还不是很懂。

成员变量

ServiceThread的主要成员变量如下

    protected final Thread thread;
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
    protected volatile boolean stopped = false;
    private static final long JOIN_TIME = 90 * 1000;

问题:如果有两个线程同时调用waitForRunning(),第三个线程调用wakeup(),那此时两个线程都被允许继续执行,那么这是expected behaviour嘛

核心方法

最重要的方法就如下所示:

    protected void waitForRunning(long interval) {
        // if hasNotified == True, means either wakeup() or stop() 
        // has been executed by another thread
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
            // TODO why we swallowed the exceptions here
        } catch (InterruptedException e) {
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

那我们先看第一个CAS操作,首先如果hasNotified是false的话,意味着要么没有其他线程执行了waitForRunning(), 要么有其他线程执行了waitForRunnning()只是没有任何的wakeup()。那这时会继续执行到waitPoint.reset(),也就是即使之前有人已经调用过waitForRunning()了,我们现在也给它reset()掉,重新开始计时。如果是True的话,意味着已经有一个wakeup()或者stop()被执行了,于是此次调用把hasNotified设置为false,相当于清空标志位,下一次waitForRunning就可以正常进行了。这里有个值得注意的点在于,假设hasNotified是True,但是wakeup()里的waitPoint.countDown();仍然没有被执行,所以理论上正在执行waitPoint.await(...)的前一个线程没有被释放,而立刻调用waitForRunning()的线程因为看到hasNotified是true之后,将hasNotified设置为false后就退出了。

一开始我会想,这可能是个不公平的实现,因为最开始调用waitForRunning()的线程需要等到waitPoint.await()才能执行,而当前因为提前return而继续执行的线程之则完全不用经过waitPoint.await()。当然这样也不会有太大的差别,尤其是考虑到几乎不会有多个线程同时调用waitForRunning()的情况下()

这样做应该没有问题的,因为我们认为waitPoint.countDown()立即就会执行,所以最终waitPoint.await()也会很快被释放:要么因为timeout或者

问题:waitPoint.countDown() 以及 waitPoint.await()之间能保证不插入其他任何指令吗,也就是说,waitPoint.countDown()只要变成0之后,一定会让当时正在等待的waitPoint.await()得到释放吗?我觉得应该要这样才对,否则如果在waitPoint.countDown() 中有另外一个线程调用了waitPoint.reset(),那么await()就不会被立即释放了,之后可以去CountDownLatch2里仔细看看是不是有这样的保证。

    public void wakeup() {
        // TODO end the waiting before interval happens
        if (hasNotified.compareAndSet(false, true)) {
            waitPoint.countDown(); // notify
        }
    }

在wakeup()里,首先使用CAS将hasNotified设置为true,然后调用waitPoint.countDown(),这两步如前所述不是原子性的,但我们证明了即使这样写也没有什么问题。(至少我现在还看不出来)

一些其他的思考

注意这一段

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
            // TODO why we swallowed the exceptions here
        } catch (InterruptedException e) {
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }

我们catch住了InterruptedException e但是却没有做任何处理(也就是swallow吞掉了这个Exception),按照JCIP里的说法,interrupt通常的语义是cancellation,只有最顶层的调用代码应该对InterruptedException做处理,而所有不拥有这个Thread的代码要么:继续抛出InterruptedException, 要么将interrupted标志位重新设置为true。

基于这个原则,我们就应该判断,ServiceThread是不是应该来swallow这个InterruptedException。通常情况下waitPoint.await() 会跑在调用者的线程内,而会调用waitForRunning()的线程就是ServiceThread所持有的那个Thread object

    public ServiceThread() {
        this.thread = new Thread(this, this.getServiceName());
    }

而一般来说waitForRunning只会在这个Runnable的run()方法中被调用,因run()方法才是最上层的调用者,看起来说waitForRunning()不应该去swallow这个InterruptedException。当然这只是理论,实际上,我们知道即使这个InterruptedException被吞掉了也无伤大雅:如果被interrupt,那就当成被wakeup(),直接提前结束。

不过现在又有个问题了,什么情况下,waitPoint.await()会被Interrupt呢?是Crtl + C嘛?我们又要去看JVM的Interrupt是怎么传播的,以及为什么我按了Ctrl+C之后java程序就结束了。

当然这里还有一些publish相关的问题,如果waitForRunning只应该在run()中使用的话,是不是waitForRunning就不应该是public的呢?

上一篇 下一篇

猜你喜欢

热点阅读