EventBus源码解读下篇:事件的发送与分发

2019-01-03  本文已影响14人  一线游骑兵

目的:分析post事件的发送与分发

从post开始:

      /** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();      //获取到当前线程中的PostingThreadState
        List<Object> eventQueue = postingState.eventQueue;  //获取该线程中事件的队列
        eventQueue.add(event);  //将新post的事件入队

        if (!postingState.isPosting) {  //如果没有正在发送
            postingState.isMainThread = isMainThread();     //判断当前线程是否在主线程运行,如果在非安卓环境,总是返回true
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {     //循环遍历队列所有事件,然后发送
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

特别说明:

  1. currentPostingThreadState.get();返回的是当前线程中ThreadLocal中存储的PostingThreadState对象,ThreadLocal是线程内部的存储类,线程私有的,每个线程对同一个key都可以有不同的一个副本。具体解析:ThreadLocal源码解析
  2. PostingThreadState对象是一个静态类:存储了该线程中要发送的事件队列,发送事件是否为UI线程,该事件的订阅者等信息
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }
  1. 将该事件加入该线程的事件队列,然后循环发送单个事件。

发送单个事件:

    //发送某个事件【包含该事件的父类或接口事件】
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {     //是否也向订阅了该事件的父类的方法发送,默认为true
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);        //查询到订阅了该事件类型的所有的父类或接口类型
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);       //发送具体的事件类型
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {       //没有找到接受者
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) { //发送一个默认的没有找到监听者的事件
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

方法说明:

  1. 获取到所有订阅了该事件类型的父类型,然后发送该类型及其所有父类型。
  2. 如果没有找到订阅者,则系统会默认发送给一个事件类型为 NoSubscriberEvent的通知,外部可通过该类型进行事件发送失败的监听。

发送某一具体类型:

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);       //获取到该事件的所有订阅者
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {        //如果订阅者不为空
            for (Subscription subscription : subscriptions) {       //循环遍历所有订阅者
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread); //将事件发送给订阅者
                    aborted = postingState.canceled;
                } finally { //重置事件状态
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

也很简单,找到该订阅该事件的所有订阅者(如何通过事件类型获取所有订阅者可以看上篇文章),然后将事件发送给订阅者:

    //将事件发送给具体的订阅者,入参isMainThread是指post的线程是否为主线程
   private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {     //首先判断一下订阅者所在的线程
            case POSTING:       //发送者与订阅者在统一线程,则直接反射调用
                invokeSubscriber(subscription, event);
                break;
            case MAIN:      //订阅者在主线程,
                if (isMainThread) {     //发送者也在主线程,则直接反射调用
                    invokeSubscriber(subscription, event);
                } else {        //发送者在非UI线程,则加入主线程发送器的队列等待发送
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:      //订阅者在主线程,且始终有序【不并发】
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:        //订阅者在后台线程,
                if (isMainThread) {     //发送者在UI线程,则添加到后台发送器队列等待发送
                    backgroundPoster.enqueue(subscription, event);
                } else {        //都在后台线程,直接反射调用
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:     //在一个独立于发送线程和Ui线程的线程中发送
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

通过上边的几个方法,我们已经知道了post发送事件所在线程,要发送的对象,订阅者所在的线程,订阅者类,订阅者方法关键信息。因此,下边主要搞明白如何进行线程间通信的·。
首先要明白一个概念:发射器:Poster

interface Poster {

    /**
     * Enqueue an event to be posted for a particular subscription.
     *
     * @param subscription Subscription which will receive the event.
     * @param event        Event that will be posted to subscribers.
     */
    void enqueue(Subscription subscription, Object event);
}

该接口包含一个方法:将订阅者对象以及要发送的信息添加到发送队列等候发送。
该接口共有3个实现类:


image.png

说明:

BackgroundPosterAsnchPoster之间的区别在于:
1. 如果发送者与接受者都在同一后台线程,BackgroundPoster会直接在该线程反射调用。而AsycnPoster则不管发送者与接收者是否在同一线程,始终会开启一个新的后台线程反射调用接受者函数。
2. 在短时间内并发发送大量事件时,BackgroundPoster会保证接受者接收到的事件也是有序的,而由于AsycnPoster每次实在单独线程回调,因此接受者接收的事件是不固定的。

下面先看一下BackgroundPoster的实现:

final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
            try {
                while (true) {
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

该方法同时实现了两个接口:RunnablePoster
外部通过调用enqueue将事件与订阅者入队,然后会通过EventBus中的一个线程池来执行自身的run方法。在run方法中开启了一个死循环,会一直取出该队列中的PendingPost对象,然后通过eventBus.invokeSubscriber(pendingPost);来反射调用订阅者方法。
这里对PendingPost类说明一下:

final class PendingPost {
    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

    Object event;
    Subscription subscription;
    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {
        this.event = event;
        this.subscription = subscription;
    }

    static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }

}
  1. 该类中有一个静态列表作为对象池来存放自身
  2. 除了必要的发送事件对象以及订阅者对象,还有一个next指针指向下一个PendingPost
  3. 在外部通过静态方法obtainPendingPost时,会先从池中获取一个干净的对象然后赋值返回,否则会创建一个新的。
  4. 外部使用完后会通过releasePendingPost来释放抹去数据加入池中【如果池大小<10000】

上边的BackgroundPoster类中持有一个PendingPostQueue,该队列中声明了两个PendingPost对象作为head 和 tail。每次enqueue入队操作会将新入队的PendingPost作为队尾。每次发送回通过poll来获取head队头,然后将头指针后移来达到队列的结构。

最后来看一下真正反射调用的方法:

    void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);  //在pendingPost使命完成后会release入池
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }

    void invokeSubscriber(Subscription subscription, Object event) {
        try {  //通过反射回调方法。
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

OK,想后台线程发送事件分析完毕,下面看一下如何向主线程发送事件:

public class HandlerPoster extends Handler implements Poster{
    private final PendingPostQueue queue;

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

   @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

不难猜想,是通过new Handler(Looper.getMainLooper())来获取主线程的handler,在入队操作中通过handler.sendMessage()发送一个空消息,进入handleMessage方法中处理,在handleMessage方法中通过while(true)方法来不断获取队列中将要发送的事件,最后通过反射调用将事件发送到订阅者方法中。


总结
  1. 通过ThreadLocal来存放post到每个线程中的事件队列。
  2. 查找订阅了发送事件类型的父类型的方法,然后加入发送队列。
  3. 通过HandlerPosterBackgroundPosterAsyncPoster来实现不同线程之间的事件发送。
  4. 使用了对象池技术来实现PendingPost对象的获取与释放。
  5. 通过PendingPostQueue实现PendingPost的队列操作。
  6. 通过反射调用订阅者方法将发送事件传递给订阅者。

完。

上一篇下一篇

猜你喜欢

热点阅读