EventBus源码分析-总览全局
EventBus系列解析(基于3.2.0)
在上一篇的《EventBus学习之基础使用》中,主要对照着官方文档介绍了EventBus的一些用法,本文将基于这些用法探究一下各个用法的实现细节。在使用EventBus时,EventBus
是一个必须使用的入口类,注册/注销订阅者、发送事件都需要用到EventBus.getDefault()
,本篇文章主要关注EventBus这个类本身,分析以下三个问题:
-
EventBus
是什么? - 消息怎么来?
- 消息到哪去?
EventBus是什么?
首先总览EventBus这个类的声明,并大概了解各个全局变量的作用:
public class EventBus {
/** Log tag, apps may override it. */
public static String TAG = "EventBus";
// 全局静态单例,即事件总线中的 总线
static volatile EventBus defaultInstance;
// 默认的建造器
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
// 事件类型缓存,key为某一事件的类型,value为该事件的所有父类及组件类型列表,用于事件可继承时快速查找父类
private static final Map<Class<?>, List<Class<?>>> eventTypesCache = new HashMap<>();//
// 订阅者集合,key为事件类型,value为所有订阅了该事件的订阅者信息list
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 当前订阅者订阅的所有事件集合,key为订阅者对象,value为当前订阅者订阅的所有事件集合
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 粘滞事件集合
private final Map<Class<?>, Object> stickyEvents;
// 当前执行线程的状态 **ThreadLocal**
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
// android主线程兼容配置,在非Android平台为null
// @Nullable
private final MainThreadSupport mainThreadSupport;
// android主线程事件执行器,当threadMode指定为ThreadMode.MAIN时会使用此执行器调用订阅者方法
// @Nullable
private final Poster mainThreadPoster;
// 后台执行器,当threadMode的值为 ThreadMode.BACKGROUND时会使用此执行器调用订阅者方法
private final BackgroundPoster backgroundPoster;
// 异步执行器,当threadMode的值为ThreadMode.ASYNC时会使用此执行器调用订阅者方法
private final AsyncPoster asyncPoster;
// 订阅者方法查找器
private final SubscriberMethodFinder subscriberMethodFinder;
// 线程池,AsyncPoster 和 BackgroundPoster会使用此线程池调用订阅者方法,默认会new一个Executors.newCachedThreadPool(),
private final ExecutorService executorService;
// 是否抛出订阅者方法调用异常,默认为 false
private final boolean throwSubscriberException;
// 是否打印订阅者方法调用异常信息,默认为 true
private final boolean logSubscriberExceptions;
// 是否打印未找到订阅者方法异常信息,默认为 true
private final boolean logNoSubscriberMessages;
// 是否发送订阅者方法调用异常事件,默认为true,即当发生订阅者方法调用异常时,会自动发送一个 SubscriberExceptionEvent
private final boolean sendSubscriberExceptionEvent;
// 是否发送未找到订阅者方法异常事件,默认为true,即当发生订阅者方法调用异常时,会自动发送一个 NoSubscriberEvent
private final boolean sendNoSubscriberEvent;
// 事件是否可继承,默认为true,即如果事件A继承了事件B,那么发送事件A时,所有B事件的订阅者也会被调用
private final boolean eventInheritance;
// 当前总线已有的编译时索引的个数
private final int indexCount;
private final Logger logger;
// 方法省略。。。
}
所有的全局变量大概可以分为以下4部分
- 静态单例,整个框架就是通过这个静态单例实现了全局的消息发送与接收
- 订阅者/粘滞消息 集合类,存储了当前总线所有的订阅者和事件 的相关信息
- 执行器,用于在不同的线程调用订阅者方法
- 各种布尔变量标志位
通过对EventBus所有的全局变量的了解分析,不难总结出它大概的运行机制:
通过全局单例存储了所有事件和订阅者的集合,当发送某个消息事件时,通过查找这些集合找到所有的该事件的订阅者,然后通过不同的执行器在指定的线程调用订阅者方法
接下来我们可以一步步的来验证上述的运行机制,首先是全局单例
/** Convenience singleton for apps using a process-wide EventBus instance. */
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}
public EventBus() {
this(DEFAULT_BUILDER);
}
由代码可见,该全局单例采用双重校验锁实现,但又不是严格意义上的单例模式,因为他的构造方法是用public
修饰的,这就意味着我们可以自己构造出新的EventBus对象,显而易见,不同的EventBus对象之间是隔离的,即EventBus实例A发送的消息,并不能被EventBus实例B接收。
在对象的构造上,为了简化构造方法,采用了builder模式,同时通过这个EventBusBuilder
我们也可以对EventBus的行为做一些定制,例如在上一篇EventBus学习之基础使用中,在使用编译时注解解释器时我们就层生成过一个自己的EventBus对象,并将其设置为了默认的总线对象
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
事实上通过builder我们还可以做更多的自定义行为:
image.png
从上面的方法中我们可以看出,EventBus的大部分属性,都可以通过builder进行自定义,在实际使用时可以根据需求进行配置,不过需要强调的是,如果确认需要做自定义配置,并且全局是通过EventBus.getDefault()
使用,那么必须要在第一次调用getDefault方法之前调用EventBusBuilder的installDefaultEventBus()
方法完成自定义配置,如果已经存在了默认实例再调用此方法,会直接抛出异常
public EventBus installDefaultEventBus() {
synchronized (EventBus.class) {
if (EventBus.defaultInstance != null) {
throw new EventBusException("Default instance already exists." +
" It may be only set once before it's used the first time to ensure consistent behavior.");
}
EventBus.defaultInstance = build();
return EventBus.defaultInstance;
}
}
消息从哪来?
在使用时,我们通常会调用以下代码发送一个事件
EventBus.getDefault().post(new MessageEvent("Hello everyone!"));
那么post(event)
究竟做了什么呢?
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
代码十分简单,首先通过ThreadLocal
类型的对象currentPostingThreadState
获取了当前线程的事件发送状态,这个状态中存储了以下信息
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>(); // 待发送事件队列
boolean isPosting; // 是否正在发送事件
boolean isMainThread;// 是否主线程
Subscription subscription;// 当前正在被调用的订阅者相关信息
Object event;// 当前正在发送的事件对象
boolean canceled;// 是否已取消发送
}
在将当前事件对象加入到了待发送事件队列中后,便通过while
循环一直发送队列中的事件
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
在postSingleEvent
方法中,首先拿到了被发送事件的Class类型,然后我们看到了一个熟悉的变量eventInheritance
,这个在前面了解EventBus的全局变量时有说明了它代表的是 事件是否可继承,在这我们可以看到,当它是true
时,会直接通过lookupAllEventTypes()
去查找当前事件的所有父类,具体的查找逻辑为:优先看当前事件类的父类缓存是否存在于全局变量eventTypesCache
中,如果不存在则通过clazz.getSuperclass()
循环查找所有父类,通过clazz.getInterfaces()
递归查找所有实现的接口,一直找到所有的父类和接口之后,刷新eventTypesCache
并返回,然后从事件的本身到父类依次调用postSingleEventForEventType
发送事件。
如果事件配置为不可继承,逻辑更为简单,直接调用postSingleEventForEventType
发送事件即可,postSingleEventForEventType
的实现逻辑较为简单,直接找到了所有订了了待发送事件的订阅者,然后通过postToSubscription
方法完成一次发送,
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
由于篇幅所限,事件的发送流程暂时告一段落,后续的postToSubscription
目前可以直接理解为在指定的线程中通过反射的方式调用了订阅者的订阅方法即可,一次完整的事件发送流程总结如下:
消息到哪去?
在上一节我们分析了事件的整个发送流程,本节主要分析事件发送之后,订阅者是怎么接收到事件的,在使用EventBus时,所有的订阅者类都需要进行2个操作,一个是 注册/注销,另一个是使用@Subscribe
方法标记响应事件的方法,接下来首先来看一下注册时发生了什么
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
代码逻辑很简单,直接通过subscriberMethodFinder
找到当前对象的所有订阅方法信息,具体的查找方式在使用篇里有介绍,默认为采用反射的方式,如果配置了apt/kapt,则采用编译时注解处理的方式生成订阅者方法索引,在得到所有的订阅者方法之后,又针对每个方法调用了subscribe
方法
// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
代码比较长,不过逻辑很简单,共做了以下几件事
- 维护全局的
subscriptionsByEventType
key-value集合,存储所有订阅者信息,key为事件类型,value为所有订阅了该事件的订阅者信息list,在向订阅者信息list加入新的订阅者时,是按照method.priority
属性进行排序的,以此实现了优先级事件,保证高优先级的订阅者先收到事件 - 维护全局的
typesBySubscriber
key-value集合,这个集合存储了订阅者订阅的所有事件集合,key为订阅者对象,value为当前订阅者订阅的所有事件集合 - 维护全局的
stickyEvents
集合,这个集合存储了所有的粘滞事件,此处有个小细节,如果当前订阅者订阅的是一个已存在的粘滞事件,那么需要立即响应已存在的事件
总结一下,整个的订阅者register的过程,其实就是对总线中几个关键集合的数据维护,在这几个集合中存储了所有订阅者和事件的信息,通过这些信息,我们在post事件时才能将事件准确的送到每一个订阅者。
注销时所作的操作与注册时类似,只不过所有的操作都是相反的同样也是维护了那几个集合,只是都是做的相反的操作,将需要订阅者的信息从集合中移除,特别的是stickyEvents
集合需要另外调用removeStickyEvent
进行维护。