EventBus简析
概述
EventBus 是适用于Android与Java的一款发布/订阅式事件总线。官方给出的演示图如下:
EventBus-Publish-Subscribe.pngEventBus有如下特点(翻译自官方文档,语言水平不足,请见谅):
- 简化各组件之间的通信:
- 解藕事件发送者和接收者
- 在
Activities
、Fragments
以及后台线程中表现良好 - 避免了复杂和易于出错的依赖及生命周期问题
- 简化代码编写
- 更快
- 更小(~50k jar)
- 通过100,000,000+已安装应用的实践证明
- 拥有诸如线程交付、订阅者优先级等高级功能
使用示例
添加依赖库
在app的build.gradle中添加依赖库:
compile 'org.greenrobot:eventbus:3.1.1'
定义事件类
定义需要传输的事件,通常包含get()
方法,用以获取传输的对象。
public class MessageEvent {
MessageBean mMessage;
public MessageEvent(MessageBean message) {
this.mMessage = message;
}
public MessageBean getMessage() {
return mMessage;
}
}
public class MessageBean {
public String title;
public String message;
}
订阅
在需要接收事件的组件中,需要注册订阅者,这里我们在Activity
的onCreate()
中注册:
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
...
EventBus.getDefault().register(this);
}
通常有注册的地方,都不要忘了解注册,我们在onCreate()
中注册,相应的就应该在onDestroy()
中解注册:
@Override
protected void onDestroy() {
super.onDestroy();
EventBus.getDefault().unregister(this);
}
注册了之后,需要添加对事件的处理,EventBus 3.0之前是通过约定的方法名来实现的,订阅者需要实现如下方法:
public void onEvent(MessageBean message) {
...
}
3.0之后,EventBus改用注解的方式:
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEvent(MessageBean message) {
mTextView.setText(message.title + "\n" + message.message);
}
发布事件
发布事件十分简单,构建发布对象,调用EventBus
的post()
方法即可:
MessageBean message = new MessageBean();
message.title = "MessageBean";
message.message = "HelloWorld";
EventBus.getDefault().post(message);
如此,便完成了一个简单的事件发布、订阅流程,更多详细操作请参阅官方文档。
接下来我们便对EventBus的代码进行解析。
解析
首先看下注册订阅者做了哪些操作。
订阅
EventBus
先来看下EventBus维护的几个映射表:
-
Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType
:该表维护了一个事件类型到Subscription对象的映射表,事件类型即订阅方法(@Subscribe
标识的方法)中的参数类型,Subscription
类封装了一个订阅者与一个订阅方法。 -
Map<Object, List<Class<?>>> typesBySubscriber
:该表维护了一个订阅者到事件类型的映射表。 -
Map<Class<?>, Object> stickyEvents
:粘性事件类型到事件的映射表(调用postSticky()
时用到)。
订阅/发布的时候,我们发现都是通过EventBus.getDefault()
方法获取到EventBus
实例,再进行相应操作的,可见EventBus
使用了单例模式,并且是线程安全的双重锁模式:
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
获取到EventBus
实例后,组件需要调用register()
方法将自己注册为订阅者,我们来看下register()
做了什么:
private void register(Object subscriber) {
// 获取订阅者Class对象
Class<?> subscriberClass = subscriber.getClass();
// 获取订阅者的订阅方法
List<SubscriberMethod subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
// 对订阅者中的每个订阅方法进行订阅
subscribe(subscriber, subscriberMethod):
}
}
}
register()
方法主要是获取订阅者的订阅方法,然后调用subscirbe()
方法进行订阅操作。
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
// Step 1:构建Subscription对象并添加到subscriptionsByEventType表中
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
// 按订阅优先级高低插入到队列中
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// Step 2:将事件类型添加到typesBySubscriber表中
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// Step 3: 如果是粘性事件(@Subscribe(sticky = true))
// 且stickyEvents表中存在该事件,则发布粘性事件
if (subscriberMethod.sticky) {
...
}
}
看下来发现, 其实注册订阅者就是把订阅者的Class
对象以及查找到的订阅方法添加到subscriptionsByEventType
表以及typesBySubscriber
表中。
接下来看下是怎么找到订阅方法的。
SubscriberMethodFinder
SubscriberMethodFinder
,顾名思义,订阅方法查找器,通过该类的一些方法调用可以找到订阅者的订阅方法,我们主要看下findSubscriberMethods()
方法:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 先查看缓存中是否有订阅方法,如果有的话直接返回,不再查找
List<SubscriberMethod> subscriberMEthods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// ignoreGeneratedIndex默认为false
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingRefelction(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
·SubscriberMethodFinder·维护了一个METHOD_CACHE
映射表,用来存放订阅者到订阅方法的映射。查找订阅方法时,首先会看下缓存中是否有该方法,如果有的话,就不再查找,直接返回缓存的方法列表。
这里需要注意的是,注册了订阅者的类必须实现订阅方法,否则运行时会抛出异常。
查找方法时根据ignoreGeneratedIndex
的值决定是调用findUsingReflection()
方法还是使用findUsingInfo()
方法。ignoreGeneratedIndex
默认为false
,初始化EventBus
时可通过Builder
自定义。findUsingReflection()
与findUsingInfo()
最终调用的都是findUsingReflectionInSingleClass()
方法,通过反射来查找订阅方法:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 比getMethods()更快,尤其当订阅者是Activitity之类的fat classes(?)
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
...
}
首先通过订阅者的类对象获取订阅者的方法列表,接下去就对这些方法进行过滤:
private void findUsingReflectionInSingleClass(FindState findState) {
...
for (Method method : methods) {
// 获取方法的修饰符public/private/abstract/...
int modifiers = method.getModifiers();
// 只有以public修饰,且非abstract、static或编译器合成的方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
// 获取@Subscribe注解的参数
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
// 满足这些条件之后,将该方法作为@Subscriber方法添加到列表里
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
// @Subscribe 方法有且仅有1个参数
else if (strictMethodVerification & method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
}
// @Subscribe 方法不合法,必须是public、非static、非abstract
else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
由上可以看到我们注册订阅时必须满足以下条件:
- 订阅者必须定义订阅方法
- 订阅方法必须是public、非static、非abstract的
- 订阅方法必须有且仅有一个参数,该参数的类型即为事件类型
- 订阅方法由@Subscribe注解标识
事件分发
接下来我们来了解下事件是如何分发的
post()
先来看下post()
方法
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
// 事件队列
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
// 是否主线程
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 循环分发事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
可以看到,EventBus
首先会将事件加入到一个eventQueue
列表中,然后循环分发事件队列。
private void postSingleEvent(Object event, PostingThreadState postingState) {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
// 如果支持事件继承,则层层往上查找它的父类或接口,
// 只要其中一个父类有被注册即可
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
...
}
}
首先判断是否支持事件继承(默认不支持),如果支持的话,会层层往上获取父类或接口,并对每个父类进行post操作,具体的操作位于postSingleEventForEventType()
方法中:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
// 获取该事件类型对应的Subscription对象
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 将事件发送到每个订阅者的订阅方法中
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscripiton = null;
postingState.canceled = false;
}
// 如果事件取消了,则中断发送
if (aborted) {
break;
}
}
// 找到了事件的订阅者,返回true
return true;
}
// 没有事件相关订阅者,返回false
return false;
}
postSingleEventForEventType()
方法主要是从subscriptionsByEventType
映射表中取出事件对应的订阅列表,一一发送事件,发送事件的具体实现在postToSubscription()
方法中:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
这里根据订阅方法@Subscribe注解中threadMode
的参数来决定发送的方式,threadMode
有以下几种类型:
-
POSTING
:默认值,订阅者与发送者在同一进程,直接调用订阅方法。 -
MAIN
:订阅者将在主线程调用,如果发送者也在主线程,则直接调用订阅方法;否则将事件加入队列中等待分发。 -
MAIN_ORDERED
:订阅者在主线程调用,与MAIN
模式的区别在于,事件总是被加入队列中等待分发。 -
BACKGROUND
:订阅者将在后台线程中调用。如果发送线程不是主线程,则直接分发事件,否则加到队列中等待。 -
ASYNC
:订阅者将在一个单独的线程中被调用,并且该线程独立于主线程和分发线程。如果订阅者处理耗时长的事件时,建议使用这种模式,例如网络访问。为了防止触发大量异步线程,EventBus
采用线程池的方式来管理这些订阅线程。
根据以上的代码可以轻松地理解这几种模式的特点。
接下来我们分别看下几种分发方式。
直接调用
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscripiton.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
直接调用很简单,就是根据之前记录的类名、方法名,通过反射方法直接调用。
主线程分发器
主线程分发器 -- mainThreadPoster
,主要用于ThreadMode.MAIN
与ThreadMode.MAIN_ORDERED
模式,它的类型是HandlerPoster
,我们来看下它的实现:
public class HandlerPoster extends Handler implements Poster {
// 自定义的阻塞队列
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 构建挂起事件
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 添加到阻塞队列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// 发送消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
// 开始时间
long started = SystemClock.uptimeMillis();
// 开始轮循
while (true) {
// 取出队首,如果队列为空,则阻塞
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
// 调用订阅方法
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
// 当轮循时间等于或大于maxMillisInsideHandleMessage时
// 重新发送消息
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
可以看出来,mainThreadPoster
就是使用Handler
机制,利用主线程的looper
来分发事件。
后台分发器
backgroundPoster
用于ThreadMode.BACKGROUND
模式,BackgroundPoster
类型。与HandlerPoster
相似,它也维护了一个PendingPostQueue
队列,我们主要看下它的enqueue()
与run()
方法。
final class BackgroundPoster implements Runnable, Poster {
...
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
// 如果线程已在执行,不再添加
if (!executorRunning) {
executorRunning = true;
// 调用EventBus的线程池来处理该Runnable
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
// 取出队首,如果队列为空,则阻塞
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
// 调用订阅方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
由BackgroundPoster
的实现可以看出,它是在后台线程调用订阅方法的,并由EventBus
的线程池来管理,默认使用的是CachedThreadPool
:
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE =
Executors.newCachedThreadPool();
异步分发器
asyncPoster
用于处理ThreadMode.ASYNC
模式的订阅方法,它是AsyncPoster
对象,这里也直接看enqueu()
与run()
方法。
class AsyncPoster implements Runnable, Poster {
...
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
可以看到,AsyncPoster
虽然也实现了Runnable
,但它与BackgroundPoster
最明显的区别在于,调用enqueue()
时,backgroundPoster
将事件添加到队列中后,会判断当前线程是否已在运行,如果是的话,将不再加入线程池。而AsyncPoster
则是每一次调用enqueue()
时,都会往线程池添加一个对象,来将当前事件分发到对应的订阅方法中。
解注册
最后再说一下解注册。
我们前面说到,注册订阅者就是把订阅者的Class
对象以及查找到的订阅方法添加到subscriptionsByEventType
表以及typesBySubscriber
表中。那么,很显然,解注册就是把订阅者相关信息从表中删除。
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
// 从subscriptionsByEventType中移除相关信息
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}