EventBus3.0源码分析

2018-03-05  本文已影响18人  龙儿筝

本文分为以下几个部分:创建、注册、发送事件、粘性事件来讲解它的实现原理,本文使用Eventbus版本为3.1.1。

注册

在使用EventBus时第一步得注册一下
EventBus.getDefault().register(this);
我们先看getDefault()的源码,EventBus#getDefault()

getDefault
public static EventBus getDefault() {
        if (defaultInstance == null) {
            synchronized (EventBus.class) {
                if (defaultInstance == null) {
                    defaultInstance = new EventBus();
                }
            }
        }
        return defaultInstance;
    }

这里使用了双重检锁加同步的方式实现单例对象,确保在不同线程中只有一个实例。
除了使用单例的方式创建对象外,我们发现Eventbus还提供了一个静态的builder()来创建实例对象,通过建造者方式来创建具有不同功能的Eventbus实例。先看一下EventBusBuilder源码中的属性

EventBusBuilder
public class EventBusBuilder {
    private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    boolean logSubscriberExceptions = true;
    boolean logNoSubscriberMessages = true;
    boolean sendSubscriberExceptionEvent = true;
    boolean sendNoSubscriberEvent = true;
    boolean throwSubscriberException;
    boolean eventInheritance = true;
    boolean ignoreGeneratedIndex;
    boolean strictMethodVerification;
    ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
    List<Class<?>> skipMethodVerificationForClasses;
    List<SubscriberInfoIndex> subscriberInfoIndexes;
    Logger logger;
    MainThreadSupport mainThreadSupport;
}

通过建造者方式来配置各种日志打印,消息事件的处理。我们可能通过具体事件单独创建一个实例来发送消息,这样可以避免一些不必要的处理判断。创建EventBus实例可能通过这两种方式来创建,再看一下EventBus构造方法,对属性做了一系列的初始化,我们以部分属性来分析。

EventBus
EventBus(EventBusBuilder builder) {
        subscriptionsByEventType = new HashMap<>();
        typesBySubscriber = new HashMap<>();
        stickyEvents = new ConcurrentHashMap<>();
        subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
                builder.strictMethodVerification, builder.ignoreGeneratedIndex);
        eventInheritance = builder.eventInheritance;
    }

subscriptionsByEventType:以事件类为key,以订阅列表为value,支持多个订阅方法。事件发送后,在这里寻找订阅者,而Subscription是CopyOnWriteArrayList,线程安全的容器,封装了订阅者,订阅方法。
typesBySubscriber:这是一个用HashMap实现的订阅管理类,负责register与unregister
stickyEvents:使用ConcurrentHashMa来保存粘性事件
subscriberMethodFinder:用于查找订阅类中的Subscribe注解方法
eventInheritance:Eventbus默认会考虑事件的父类,如果事件继承自父类,那么该父类也会作为事件发送给订阅者,设为false则不考虑父类

注册

接下来再看一下注册方法EventBus#register()

register
public void register(Object subscriber) {
        Class<?> subscriberClass = subscriber.getClass();
        List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
        synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

首先是获取订阅类的class,接着是查找订阅类中的注解方法,并保存在List集合中,再分析SubscriberMethodFinder#findSubscriberMethods方法

findSubscriberMethods
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

首先从缓存中获取,若缓存中有,则直接返回。我们在初始化时一般也不设置ignoreGeneratedIndex的值,findUsingReflection()方法是通过反射获取注解方法,所以我们直接分析findUsingInfo()方法

findUsingInfo
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        FindState findState = prepareFindState();
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                findUsingReflectionInSingleClass(findState);
            }
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

这里有个FindState类,Eventbus会将注册后的订阅信息保存在其中,接着再分析initForSubscriber()方法

initForSubscriber
void initForSubscriber(Class<?> subscriberClass) {
            this.subscriberClass = clazz = subscriberClass;
            skipSuperClasses = false;
            subscriberInfo = null;
        }

这里是对FindState部分属性赋值,其中subscriberInfo初始化为null,再看findState.subscriberInfo = getSubscriberInfo(findState);这一步是查找当前类以及父类中的subscriberInfo的值,由此可知,若有多个子类需要订阅处理消息,可以直接在父类中进行注册。由前面可知findState.subscriberInfo的值为null,我们再接着看findUsingReflectionInSingleClass()方法

findUsingReflectionInSingleClass
private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // This is faster than getMethods, especially when subscribers are fat classes like Activities
            methods = findState.clazz.getDeclaredMethods();
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            methods = findState.clazz.getMethods();
            findState.skipSuperClasses = true;
        }
        for (Method method : methods) {
            int modifiers = method.getModifiers();
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        if (findState.checkAdd(method, eventType)) {
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }

这里利用反射的方式,对订阅类进行扫描,找出符合要求的订阅方法,并用Map进行保存。订阅方法需要是public,参数为1,并且用Subscribe注解修饰。这里会将相关数据保存在findState中,数据包括符合要求的方法,事件类型,线程,优先级以及sticky事件等等。在保存前还做了一些检测,我们接着分析checkAdd()方法

checkAdd
boolean checkAdd(Method method, Class<?> eventType) {
            // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required.
            // Usually a subscriber doesn't have methods listening to the same event type.
            Object existing = anyMethodByEventType.put(eventType, method);
            if (existing == null) {
                return true;
            } else {
                if (existing instanceof Method) {
                    if (!checkAddWithMethodSignature((Method) existing, eventType)) {
                        // Paranoia check
                        throw new IllegalStateException();
                    }
                    // Put any non-Method object to "consume" the existing Method
                    anyMethodByEventType.put(eventType, this);
                }
                return checkAddWithMethodSignature(method, eventType);
            }
        }

这里做了双重检测,第一次是判断eventType的类型,而第二次检验是判断方法的完整签名。首先通过anyMethodByEventType.put(eventType, method) 将eventType以及method放进anyMethodByEventType这个Map中,同时该put方法会返回同一个key的上一个value值,所以如果之前没有别的方法订阅了该事件,那么existing应该为null,可以直接返回true;否则为某一个订阅方法的实例,要进行下一步的判断。接着分析checkAddWithMethodSignature()方法

checkAddWithMethodSignature
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
            methodKeyBuilder.setLength(0);
            methodKeyBuilder.append(method.getName());
            methodKeyBuilder.append('>').append(eventType.getName());

            String methodKey = methodKeyBuilder.toString();
            Class<?> methodClass = method.getDeclaringClass();
            Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
            if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
                // Only add if not already found in a sub class
                return true;
            } else {
                // Revert the put, old class is further down the class hierarchy
                subscriberClassByMethodKey.put(methodKey, methodClassOld);
                return false;
            }
        }

从上面的代码看出,该方法首先获取了当前方法的methodKey、methodClass等,并赋值给subscriberClassByMethodKey,如果方法签名相同,那么返回旧值给methodClassOld,接着是if判断,判断methodClassOld是否为空,由于第一次调用该方法的时候methodClassOld肯定是null,此时就可以直接返回true了。但是,后面还有一个判断即methodClassOld.isAssignableFrom(methodClass),这个的意思是:methodClassOld是否是methodClass的父类或者同一个类。如果这两个条件都不满足,则会返回false,那么当前方法就不会添加为订阅方法了。

那么这两个方法到底有什么作用呢?从这两个方法的逻辑来看,第一层判断根据eventType来判断是否有多个方法订阅该事件,而第二层判断根据完整的方法签名来判断。

第一种情况:比如一个类有多个订阅方法,方法名不同,但它们的参数类型都是相同的,那么遍历这些方法的时候,会多次调用到checkAdd方法,由于existing不为null,那么会进而调用checkAddWithMethodSignature方法,但是由于每个方法的名字都不同,因此methodClassOld会一直为null,因此都会返回true。也就是说,允许一个类有多个参数相同的订阅方法。

第二种情况:类B继承自类A,而每个类都是有相同订阅方法,它们都有着一样的方法签名。方法的遍历会从子类开始,即B类,在checkAddWithMethodSignature方法中,methodClassOld为null,那么B类的订阅方法会被添加到列表中。接着,向上找到类A的订阅方法,由于methodClassOld不为null而且显然类B不是类A的父类,methodClassOld.isAssignableFrom(methodClass)也会返回false,那么会返回false。也就是说,子类继承并重写了父类的订阅方法,那么只会把子类的订阅方法添加到订阅者列表,父类的方法会忽略。
分析完findSubscriberMethods()逻辑,我们再接着分析subscribe()方法

subscribe
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        Class<?> eventType = subscriberMethod.eventType;
        Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
        CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions == null) {
            subscriptions = new CopyOnWriteArrayList<>();
            subscriptionsByEventType.put(eventType, subscriptions);
        } else {
            if (subscriptions.contains(newSubscription)) {
                throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                        + eventType);
            }
        }

        int size = subscriptions.size();
        for (int i = 0; i <= size; i++) {
            if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
                subscriptions.add(i, newSubscription);
                break;
            }
        }

        List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
        if (subscribedEvents == null) {
            subscribedEvents = new ArrayList<>();
            typesBySubscriber.put(subscriber, subscribedEvents);
        }
        subscribedEvents.add(eventType);
        //处理粘性事件
        ...
    }

该方法主要实现了订阅方法与事件直接的关联。以事件为key,方法为value保存在subscriptionsByEventType中。处理订阅事件的优先级,优先级高的会先被通知,最后处理sticky事件

注销

注册完我们还得注销订阅
EventBus.getDefault().unregister(this)
我们再分析一下注销逻辑EventBus#unregister

unregister
public synchronized void unregister(Object subscriber) {
        List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
        if (subscribedTypes != null) {
            for (Class<?> eventType : subscribedTypes) {
                unsubscribeByEventType(subscriber, eventType);
            }
            typesBySubscriber.remove(subscriber);
        } else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

首先是获取所有订阅事件,再遍历订阅事件进行注销,注销完后移除订阅者,我们再看一下unsubscribeByEventType()方法

unsubscribeByEventType
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
        List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
        if (subscriptions != null) {
            int size = subscriptions.size();
            for (int i = 0; i < size; i++) {
                Subscription subscription = subscriptions.get(i);
                if (subscription.subscriber == subscriber) {
                    subscription.active = false;
                    subscriptions.remove(i);
                    i--;
                    size--;
                }
            }
        }
    }

根据事件类型获取订阅信息subscriptions集合,对其进行遍历移除,相比注册简单了很多。

发送事件

注册注销分析完后,我们再看一下发送消息逻辑,以最简单的发送字符串为例
EventBus.getDefault().post("aloe");
查看post()方法

post
public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

第一行里的PostingThreadState封装了当前线程信息,订阅者以及订阅事件,currentPostingThreadState是ThreadLocal对象,是线程安全的。后面是将事件放入消息队列中。我们再看一下postSingleEvent()方法

postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

对于一个事件,默认地会搜索出它的父类,并把父类也作为事件之一发送给订阅者,我们再看一下postSingleEventForEventType()方法

postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

这里获取subscriptions并调用postToSubscription()发送事件

postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

首先获取订阅方法运行的线程,如果是POSTING,那么直接调用invokeSubscriber()方法即可,如果是MAIN,则要判断当前线程是否是MAIN线程,如果是也是直接调用invokeSubscriber()方法,否则会交给mainThreadPoster来处理,其他情况相类似。最后利用反射的方式来调用订阅方法,将事件发送给订阅者。

粘性事件的发送及接收

粘性事件与一般的事件不同,粘性事件是先发送出去,然后让后面注册的订阅者能够收到该事件。粘性事件的发送是通过EventBus#postSticky方法进行发送的
EventBus.getDefault().postSticky("aloe");
我们看一下postSticky()源码

postSticky
public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        post(event);
    }

把该事件放进了 stickyEvents这个map中,接着调用了post()方法,那么流程和上面分析的一样了,只不过是找不到相应的subscriber来处理这个事件罢了。那么当注册订阅者的时候是怎么匹配的呢?我们再来看一下subscribe()方法

subscribe
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
        ...
        if (subscriberMethod.sticky) {
            if (eventInheritance) {
                // Existing sticky events of all subclasses of eventType have to be considered.
                // Note: Iterating over all events may be inefficient with lots of sticky events,
                // thus data structure should be changed to allow a more efficient lookup
                // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
                Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
                for (Map.Entry<Class<?>, Object> entry : entries) {
                    Class<?> candidateEventType = entry.getKey();
                    if (eventType.isAssignableFrom(candidateEventType)) {
                        Object stickyEvent = entry.getValue();
                        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                    }
                }
            } else {
                Object stickyEvent = stickyEvents.get(eventType);
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    }

EventBus并不知道当前的订阅者对应了哪个粘性事件,因此需要全部遍历一次,找到匹配的粘性事件后,会调用checkPostStickyEventToSubscription()方法,内部又调用了postToSubscription。因此无论对于普通事件还是粘性事件,都会根据threadMode来选择对应的线程来执行订阅方法,而切换线程的关键就是mainThreadPoster、backgroundPoster和asyncPoster。

HandlerPoster

我们先看mainThreadPoster,在EventBus构造方法中初始化了mainThreadSupport,分析createPoster可知mainThreadSupport是HanlderPoster对象

HanlderPoster
public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;
    ...
}

HandlerPoster内部有一个PendingPostQueue,这是一个队列,保存了PendingPost,即待发送的post,该PendingPost封装了event和subscription,方便在线程中进行信息的交互。在postToSubscription方法中,当前线程如果不是主线程的时候,会调用HandlerPoster#enqueue方法

enqueue

public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

首先会从PendingPostPool中获取一个可用的PendingPost,接着把该PendingPost放进PendingPostQueue,发送消息,那么由于该HandlerPoster在初始化的时候获取了UI线程的Looper,所以它的handleMessage()方法运行在UI线程。

handleMessage

public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                ...
                eventBus.invokeSubscriber(pendingPost);
                ...
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

这里调用了EventBus#invokeSubscriber方法,在这个方法里面,将PendingPost解包,进行正常的事件分发。

BackgroundPoster

BackgroundPoster继承自Runnable,与HandlerPoster相似的,它内部都有PendingPostQueue这个队列,当调用到它的enqueue的时候,会将subscription和event打包成

enqueue
public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                eventBus.getExecutorService().execute(this);
            }
        }
    }

该方法通过Executor来运行run()方法,run()方法内部也是调用到了EventBus#invokeSubscriber方法。

AsyncPoster

与BackgroundPoster类似,它也是一个Runnable,实现原理与BackgroundPoster大致相同,但有一个不同之处,就是它内部不用判断之前是否已经有一条线程已经在运行了,它每次post事件都会使用新的一条线程。

参考链接

EventBus 3.0进阶:源码及其设计模式 完全解析

上一篇下一篇

猜你喜欢

热点阅读