EventBus源码解析

2020-03-24  本文已影响0人  慕北人

EventBus源码阅读

一、几个简单类

1. Subscribe注解

对于该注解没啥好说的:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;
    boolean sticky() default false;
    int priority() default 0;
}  

三个属性的作用:

2. ThreadMode

这是一个枚举类:

 public enum ThreadMode {
    POSTING,
    MAIN,
    MAIN_ORDERED,
    BACKGROUND,
    ASYNC
}  

3. Subscription

我们先不看别的,只看他的普通成员:

就知道该类代表一个订阅,该订阅包含了:订阅者和订阅方法,EventBus通过这种简单粗暴的方式将这两个东西绑定到一起。

其构造器异常简单:

Subscription(Object subscriber, SubscriberMethod subscriberMethod) {
    this.subscriber = subscriber;
    this.subscriberMethod = subscriberMethod;
    active = true;
} 

唯一值得注意的是,该类的equals方法实现

public boolean equals(Object other) {
    if (other instanceof Subscription) {
        Subscription otherSubscription = (Subscription) other;
        return subscriber == otherSubscription.subscriber
                && subscriberMethod.equals(otherSubscription.subscriberMethod);
    } else {
        return false;
    }
}  

该类的equals方法返回为true的要求是:内部的订阅者是同一个对象(这里使用==判定),且内部的订阅方法是同一个方法(使用订阅方法的equals方法判定)

4. SubscriberMethod

SubscriberMethod该类代表一个订阅方法.

他有三个属性是我们的老朋友了:

没错,就是自定义注解Subscribe中声明的三个;另外,SubscriberMethod的成员中还有三个新面孔:

而其构造器依然简单:

public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) {
    this.method = method;
    this.threadMode = threadMode;
    this.eventType = eventType;
    this.priority = priority;
    this.sticky = sticky;
}  

该类和Subscription一样,只是用来表示一个订阅方法,其属性也都是简单的定义了方法的一些信息。

同样,这个类注意其equals方法:

public boolean equals(Object other) {
    if (other == this) {
        return true;
    } else if (other instanceof SubscriberMethod) {
        checkMethodString();
        SubscriberMethod otherSubscriberMethod = (SubscriberMethod)other;
        otherSubscriberMethod.checkMethodString();
        return methodString.equals(otherSubscriberMethod.methodString);
    } else {
        return false;
    }
}    

SubscriberMethod的equals判断非常直观:methodString内容相等即可,实际上这同将method、eventType等挨个进行比较的效果意义是一样的,但是这么通过字符串一转换,就大大提高了效率

其中checkMethodString方法用来为成员变量methodString初始化

private synchronized void checkMethodString() {
    if (methodString == null) {
        // Method.toString has more overhead, just take relevant parts of the method
        StringBuilder builder = new StringBuilder(64);
        builder.append(method.getDeclaringClass().getName());
        builder.append('#').append(method.getName());
        builder.append('(').append(eventType.getName());
        methodString = builder.toString();
    }
}

可见,methodString组成为:声明该具体Method的类名、该具体Method方法名、eventtype的类名的组合

5. PendingPost

PendingPost表示一个待处理的Post,其有四个成员:

只看这四个成员,首先event和subscription这两个可以看出EventBus需要把该event发送到subscription来使得订阅者能够通过订阅方法处理这个Event,这符合我们使用EventBus的情形。pendingPostPool可能会表示所有待处理的post,而next看起来想要把PendingPost构造成一个链表的样子。

其构造器依旧简单:

private PendingPost(Object event, Subscription subscription) {
    this.event = event;
    this.subscription = subscription;
}  

毫无营养。我们来看看其obtainPendingPost方法:

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
    synchronized (pendingPostPool) {
        int size = pendingPostPool.size();
        if (size > 0) {
            PendingPost pendingPost = pendingPostPool.remove(size - 1);
            pendingPost.event = event;
            pendingPost.subscription = subscription;
            pendingPost.next = null;
            return pendingPost;
        }
    }
    return new PendingPost(event, subscription);
}  

其流程为:将pendingPostPool池中最后一个待处理Post取出来,将其event和subscription用参数列表中的值覆盖掉,然后返回这个新的PendingPost。可以理解,该方法的目的是从待处理Post池中最后一个取出,并修改其内容后返回,可以理解为消费了一个事件,但是为何要先修改其内容,我们暂且不去理会,后面会有答案。

PendingPost还有另一个方法,releasePendingPost:

static void releasePendingPost(PendingPost pendingPost) {
    pendingPost.event = null;
    pendingPost.subscription = null;
    pendingPost.next = null;
    synchronized (pendingPostPool) {
        // Don't let the pool grow indefinitely
        if (pendingPostPool.size() < 10000) {
            pendingPostPool.add(pendingPost);
        }
    }
}  

这个方法初看让人很疑惑,怎么先将其内容都复制为null之后才往pendingPostPool中添加?我们可以大胆的猜想这个方法是同上一个方法obtainPendingPost搭配使用的,该方法添加一个值全部为null的待处理Post;而obtain的时候取出的就是一个值全部为null的PendingPost,所以在obtian中修改其内容就显得合理了。我们先做出这样一个假设,答案我们接着看。

6. PendingPostQueue

PendingPostQueue让我们猜中了开头,可惜没有猜中结尾,原来作者是想要通过上文中提到的PendingPost中的next成员构建一个队列,该类只定义了队列的头尾两个成员:

自然而然,也就定了了两种方法,进队和出队

synchronized void enqueue(PendingPost pendingPost) {
    if (pendingPost == null) {
        throw new NullPointerException("null cannot be enqueued");
    }
    if (tail != null) {
        tail.next = pendingPost;
        tail = pendingPost;
    } else if (head == null) {
        head = tail = pendingPost;
    } else {
        throw new IllegalStateException("Head present, but no tail");
    }
    notifyAll();
}  

synchronized PendingPost poll() {
    PendingPost pendingPost = head;
    if (head != null) {
        head = head.next;
        if (head == null) {
            tail = null;
        }
    }
    return pendingPost;
}

synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
    if (head == null) {
        wait(maxMillisToWait);
    }
    return poll();
}  

作者也很直接,不整那些花里胡哨的,poll(int toWait)方法直接给我们整一个wait(toWait)就完事儿了。

7. 小结

到这里我们先小结一下这几个重要的实体类:

二、Poster

各种event都需要Poster发送到正确的地方,而EventBus为各种ThreadMode都实现了一个Poster,在分析各种Poster之前,我们先看看祖宗Poster

1. Poster

Poster代表了一次推送事件

Poster是一个接口,只定义了一个方法:

void enqueue(Subscription subscription, Object event);  

哇,只有一个方法,对于我们阅读者太友好了。

2. BackgroundPoster

BackgroundPoster看名字就能猜出来,这个Poster一定时在ThreadMode为BACKGROUND情况下使用的。

final class BackgroundPoster implements Runnable, Poster  

可以看出,这个类实现了Runnable接口和Poster接口,也就是说可以直接把它作为线程的参数来跑。

这个类指定义了三个成员:

其构造器简单依旧:

BackgroundPoster(EventBus eventBus) {
    this.eventBus = eventBus;
    queue = new PendingPostQueue();
}  

注意,这里的queue是自己初始化的,不是从别的地方传过来的,我们从这里也可以大胆猜测,不同ThreadMode下的Poster各自维护一个PendingPostQueue,它们之间老死不相往来

而作者再一次展现了体恤万千码农的贴心,该类的方法就两个,分别是Runnable接口的run方法和Poster接口的enqueue方法。

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!executorRunning) {
            executorRunning = true;
            eventBus.getExecutorService().execute(this);
        }
    }
}  

来了来了,这里看到了PendingPost的obtainPendingPost方法的调用,该方法在PendingPost池不为空的时候取出来最后一个修改内容之后返回,或者池为空时直接新建一个PendingPost返回。这里,直接将该PendingPost加入到了pendingPostQueue中,之后调用了:

eventBus.getExecutorService().execute(this);  

嗯,感觉最终Event能够传递到订阅者那里是依靠这里,暂且记下。

public void run() {
    try {
        try {
            while (true) {
                PendingPost pendingPost = queue.poll(1000);
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            executorRunning = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
            }
        } catch (InterruptedException e) {
            eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
        }
    } finally {
        executorRunning = false;
    }
}

这里有一个无限循环,一直从queue中取PendingPost,如果队列为空了,那么就直接return了(这里不同于Looper的loop,这里为空是直接return了),对于一个Runnable来说,主动return就意味着它的生命结束了。如果不为空,那么将取到的PendingPost交给

eventBus.invokeSubscriber(pendingPost);  

看名字,好像这里也像是将PendingPost发送给订阅者的方法,我们也暂且记下。

Q:这里从队列中取出PendingPost时首先调用的是带等待时间的poll,作者必有深意?

3. AsyncPoster

AsyncPoster的名字也暴露了它,显然这是为ThreadMode为ASYNC的情况下准备的Poster。

该类也同BackgroundPoster一样,同样实现了Runnable和Poster接口,只有两个成员:

构造器简单依旧:

AsyncPoster(EventBus eventBus) {
    this.eventBus = eventBus;
    queue = new PendingPostQueue();
}  

其run方法和enqueue更加简单,没有任何新意:

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    queue.enqueue(pendingPost);
    eventBus.getExecutorService().execute(this);
}

@Override
public void run() {
    PendingPost pendingPost = queue.poll();
    if(pendingPost == null) {
        throw new IllegalStateException("No pending post available");
    }
    eventBus.invokeSubscriber(pendingPost);
}  

4. HandlerPoster

HandlerPoster就显得很特立独行,从名字无法知道其用于那种ThreadMode。

其有四个成员:

从成员来看,其更像BackgroundPoster一些。

public class HandlerPoster extends Handler implements Poster  

其继承了Handler,实现了Poster接口。

protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
    super(looper);
    this.eventBus = eventBus;
    this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
    queue = new PendingPostQueue();
}  

倒是构造器,简单不减当年。

public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        queue.enqueue(pendingPost);
        if (!handlerActive) {
            handlerActive = true;
            if (!sendMessage(obtainMessage())) {
                throw new EventBusException("Could not send handler message");
            }
        }
    }
}  

别的没啥说的,鸡贼的作者吧发送message放到了if条件中。

public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // Check again, this time in synchronized
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            eventBus.invokeSubscriber(pendingPost);
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}  

注意,这里两次调用的poll都是没有等待的重载方法。这里有个判断,如果该事件被处理的时间超过了maxMillisInsideHandleMessage就会尝试发送一个空的Message,怀疑这里是用来判断HandlerPoster死了没

三、EventBus

终于要到EventBus了。

几个Map成员

EventBus下有几个Map映射需要提前了解:

1. 一切的开始--EventBus.register(Object)

该方法用来注册,否则的话,EventBus没法用了。

 public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}  

首先,EventBus根据我们注册时传递的对象获取到了其Class对象,这样一来,EventBus就知道订阅发生在该Class里面了;接下来,EventBus通过SubscriberMethodFinder这个工具从这个Class中找到(具体怎么找我们后面分析)所有使用了Subscribe注解的方法,这些就是所有的订阅方法;最后,EventBus为每一个方法执行了subscribe方法

subscribe(Object subscriber, SubscriberMethod subscriberMethod)

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);

    -------------- 重点 1 ----------------
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    -------------- 重点 2 ---------------
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    ------------- 重点 3 -------------
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    ---------------- 重点 4 ------------------
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}  

好长一方法,不慌,我们慢慢看,只看思路,细节不管:

2. 干活儿的地方--post(Object event)

public void post(Object event) {  
    ----------- 重点 1 --------
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);

    if (!postingState.isPosting) {
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            ----------- 重点 2 ----------
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}  

postSticky

该方法用来post一个黏性事件:

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
}  

可见,就是将其先放入stickyEvents中,然后再尝试调用post方法看看有没有人能够消费掉

postSingleEvent(Object event, PostingThreadState postingState)

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        ---------- 重点 1 -----------
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}  

postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass)

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {  
        ----------- 重点 1 ---------------
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {  
        ---------- 重点 2 -----------
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}  

postToSubscription(Subscription subscription, Object event, boolean isMainThread)

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}  

哇偶,好兴奋啊,我们在这个方法里看到了对不同ThreadMode的处理

各个Poster的enqueue操作都会调起EventBus里的线程池来执行消费事件

invokeSubscriber(Subscription subscription, Object event)

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}  

最终,会通过反射的机制来调用订阅方法消费掉post事件

3. 小结

让我们重新开始捋一捋,从register到post发生了什么:

未完待续~

上一篇下一篇

猜你喜欢

热点阅读