EventBus3.0源码解析
前言
EventBus是一个优秀的事件订阅/发布框架,充分解耦了事件的发布者和订阅者,简化了代码。 记得刚来公司,老大让我看的第一个框架就是EventBus, 用了这么久的EventBus,居然没看过源码,实在惭愧。。。虽然网上已经有很多解析EventBus的文章了,还是决定自己写一篇,这样可以有更深层次、印象更深的理解。
EventBus的使用
这篇文章详细讲解了EventBus的三要素、四种线程模型,以及普通事件和粘性事件的订阅和分发,这里不再多说。
EventBus3.0源码解析
通过EventBus的使用可以看到EventBus主要分为两部分,事件的订阅(注册)和分发。
看源码之前先来熟悉几个概念:
- 事件: 可以是任意类型的对象, 包括自定义的事件. 通过EventBus.getDefault().post(event)分发出去, 在订阅方法的参数中接收事件.
- 订阅者: EventBus.getDefault().register(MainActivity.this), MainActivity就是订阅者.
- 订阅方法: 被@Subscribe修饰的方法, SubscriberMethod描述了订阅方法相关信息
final Method method;//订阅方法
final ThreadMode threadMode;//线程模型
final Class<?> eventType;//传递进来的事件的class对象
final int priority;//订阅方法的优先级
final boolean sticky;//是否是粘性事件
String methodString;//方法的名称
- 订阅关系Subscription:关联订阅者和订阅方法
final Object subscriber;//订阅者
final SubscriberMethod subscriberMethod;//订阅方法
volatile boolean active;//是否有效, 初始化为true, unregister时置为false
- 注解Subscribe: 用来标识订阅者的订阅方法
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
//线程模型
ThreadMode threadMode() default ThreadMode.POSTING;
//是否是粘性事件,默认false
boolean sticky() default false;
//优先级, 影响同一个分发线程中事件分发的顺序
int priority() default 0;
}
- 线程模型ThreadMode: 决定了事件将在哪一个线程中执行,分以下四种:
POSTING(默认): 在哪个线程分发事件,就在哪个线程处理事件,不需要切换线程.避免执行耗时操作, 有可能会阻塞主线程.
适用于那些不需要主线程就能快速完成的简单任务
MAIN: 在主线程中处理事件, 如果在主线程中分发事件,就直接处理事件, 如果不在主线程中分发事件,就通过Handler post到主线程进行处理. 避免进行耗时操作,阻塞主线程.
适用于更新UI等必须在主线程处理的事件.
BACKGROUND:在后台进程中处理事件. 如果不在主线程中分发事件, 就在同一个线程中处理事件. 如果在主线程中分发事件, 就启用唯一的后台进程, 依次执行队列中所有的事件. 避免执行长时间耗时操作, 影响队列中其他事件的处理;
适用于一般的耗时操作;
ASYNC: 单独开一个线程处理事件, 独立于主线程和分发事件的线程. 因为并发线程数量有限, 应避免同一时间触发大量长时间耗时操作的异步处理方法. EventBus会使用一个线程池来有效回收空闲的异步处理线程.
适用于访问网络等长时间耗时操作.
事件注册流程
OK, 基本概念讲清楚了, 下面来看下事件注册的入口EventBus.getDefault().register(this). 一般通过EventBus.getDefault()获取EventBus的实例, 来看下创建及初始化的过程.
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public static EventBus getDefault() {
//单例
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
public EventBus() {
this(DEFAULT_BUILDER);
}
//初始化了所有订阅/发布过程中用到的参数
EventBus(EventBusBuilder builder) {
//键: 事件的class对象 值: 所有订阅关系集合CopyOnWriteArrayList<Subscription>
subscriptionsByEventType = new HashMap<>();
//键: 订阅者 值: 事件class对象的集合
typesBySubscriber = new HashMap<>();
//键:事件class对象 值: 粘性事件
stickyEvents = new ConcurrentHashMap<>();
//主线程处理器
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
//后台线程处理器
backgroundPoster = new BackgroundPoster(this);
//异步线程处理器
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//查找缓存订阅方法的工具类
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
//是否继承事件, true的话, 发送子事件时, 也会发送父类事件.默认为true
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
下面来看注册的方法:
public void register(Object subscriber) {
//获取到订阅者对象
Class<?> subscriberClass = subscriber.getClass();
//查找订阅者中所有的订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//在同步代码块中对订阅方法逐一进行订阅
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
SubscriberMethodFinder是查找和缓存订阅方法的工具类, 看下findSubscriberMethods方法:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//METHOD_CACHE缓存订阅者和订阅者中所有订阅方法的map
//键: 订阅者对象, 值: List<SubscriberMethod>订阅方法的集合
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
//直接使用缓存中的订阅方法结合
if (subscriberMethods != null) {
return subscriberMethods;
}
//是否忽略注解器EventBusAnnotationProcessor生成的MyEventBusIndex
if (ignoreGeneratedIndex) {
//通过反射查找订阅者中的订阅方法集合
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//在MyEventBusIndex类中查找订阅者的订阅方法集合
subscriberMethods = findUsingInfo(subscriberClass);
}
//搜索完为空,抛异常, 不为空, 放入METHOD_CACHE缓存
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
注解器EventBusAnnotationProcessor是什么鬼, 还能自动生成MyEventBusIndex来保存所有订阅者与订阅方法集合, 后面再详细讲解. 先看通过反射查找订阅方法findUsingReflection.
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//内部类FindState, 用来保存和校验订阅方法
FindState findState = prepareFindState();
//初始化FindState, 把订阅者class对象传进去
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//查找当前订阅者对象中的订阅方法
findUsingReflectionInSingleClass(findState);
//查找父类中的订阅方法, clazz变为父类对象,以此循环查找父类中的订阅方法
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
从FindState池中找到不为空的FindState,返回,达到复用的目的
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
//池中都为空, new一个FindState
return new FindState();
}
再来看如何根据FindState查找所有订阅方法findUsingReflectionInSingleClass.
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 拿到该订阅者的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
int modifiers = method.getModifiers();
//方法的修饰符必须是public,不是abstract,不是static
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
//方法必须只有一个参数
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
//方法被Subscribe注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
//检查findstate是否添加了该方法,没有返回true
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//将订阅方法的所有信息保存到findState.subscriberMethods
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
循环保存订阅者及父类中所有订阅方法之后, 返回getMethodsAndRelease(findState)
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
//获取到之前保存的findState.subscriberMethods,最终返回
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
//清空findState的状态
findState.recycle();
synchronized (FIND_STATE_POOL) {
//把findState放到池中空闲的位置, 以复用
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}
这是通过反射查找订阅者中订阅方法集合, 通过注解器EventBusAnnotationProcessor生成的MyEventBusIndex查找的方法findUsingInfo也放在后面介绍.
至此, 查找订阅者及父类中所有订阅方法结束了, 再看下逐一订阅的方法subscribe.
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//获取到事件
Class<?> eventType = subscriberMethod.eventType;
//创建这种订阅关系
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//从缓存中根据事件class对象获取订阅关系集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
//缓存中没有, new一个订阅关系集合,保存
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//缓存中订阅关系集合,且已经包含了这种订阅关系,抛异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//如果当前订阅关系优先级大于集合中其他订阅关系,插在前面
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//缓存中根据订阅者查找事件对象的集合
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
//缓存没有, new一个加入缓存
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//如果是粘性事件,直接分发, 因此先分发再订阅也能处理事件
if (subscriberMethod.sticky) {
//继承事件的话, 从缓存中获取所有键是该事件对象或其父类的事件, 然后分发
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
//获取键: 事件对象
Class<?> candidateEventType = entry.getKey();
//键是当前订阅方法的事件对象或其父类
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
//分发粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
//只从缓存中获取该事件对象对应的事件
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
盗个图总结一下整个注册流程: 图来源
注册流程看完了, 在看一下取消注册的代码:
public synchronized void unregister(Object subscriber) {
//根据订阅者获取所有事件对象
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
//删除订阅者中事件是eventType 的订阅方法
unsubscribeByEventType(subscriber, eventType);
}
//typesBySubscriber中删除订阅者相关信息
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//根据事件对象获取所有该事件的订阅方法
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
//遍历所有订阅方法
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
//是当前要取消注册的订阅者, 就删除该订阅方法, 并把active状态置为false
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
事件分发流程
事件分普通事件和粘性事件, 分发也分两种:
普通事件EventBus.getDefault().post(event);
粘性事件:EventBus.getDefault().postSticky(event);
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
由此可以看出粘性事件就是在分发之前保存到stickyEvents中了, 其他与普通事件相同, 看普通事件如何分发;
public void post(Object event) {
//当前分发线程的状态
PostingThreadState postingState = currentPostingThreadState.get();
//当前分发线程的事件队列
List<Object> eventQueue = postingState.eventQueue;
//把当前事件加入队列
eventQueue.add(event);
if (!postingState.isPosting) {
//是否在主线程分发
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//循环从队列中移除一个事件并分发
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
currentPostingThreadState是一个包含PostingThreadState的ThreadLocal对象.
PostingThreadState记录了当前分发线程的具体信息,包括事件队列,分发状态, 正在分发的事件及订阅关系等.
ThreadLocal是一个线程内部的数据存储类, 通过它可以在不同的线程之中互不干扰地存储并提供数据, 具体介绍戳这里.
下面看如何分发队列中的事件postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//继承事件的话, 要分发当前事件及其所有父类及接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//只要有一个分发成功就返回true
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//不继承的话, 只分发当前事件
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果没有订阅该事件
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
//发送一个NoSubscriberEvent
post(new NoSubscriberEvent(this, event));
}
}
}
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
//键: 事件class对象 值: 事件及其父类或父接口的class对象集合
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {
eventTypes.add(clazz);//加入当前事件class对象
//如果是接口, 加入所有父接口对象
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();//循环遍历父类
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
postSingleEvent判断了一下是否要继承事件, 继承的话再对每一个事件(包括父类及父接口)分发, 不继承的话只分发当前事件, 真正分发事件是在postSingleEventForEventType.
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//根据事件对象,获取订阅关系的集合
subscriptions = subscriptionsByEventType.get(eventClass);
}
//有订阅者订阅了该事件, 就一一处理, 最后返回true
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
//把事件及订阅关系保存到分发线程中
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//处理事件
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
//分发完清空分发线程状态
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
//没有订阅者订阅该事件,返回false
return false;
}
private final HandlerPoster mainThreadPoster;
private final BackgroundPoster backgroundPoster;
private final AsyncPoster asyncPoster;
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
//不管当前在哪个线程, 直接处理事件
invokeSubscriber(subscription, event);
break;
case MAIN:
//如果当前在主线程, 则直接处理事件, 不在主线程则转到mainThreadPoster中处理事件
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
//如果当前在主线程, 则转到backgroundPoster中处理, 如果不在主线程中,就直接处理
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//直接转到asyncPoster中处理
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
invokeSubscriber就是利用反射调用订阅者的订阅方法.
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
盗个图来总结一下整个Post流程就是:
post.pngmainThreadPoster, backgroundPoster, asyncPoster是如何实现线程切换的呢?
线程切换
看三大Poster之前先看两个类: PendingPost和PendingPostQueue.
PendingPost是一个待发送事件, 里面保存了将要发送的事件及订阅方法, 并且维护了一个静态待发送事件池,达到复用的目的.
final class PendingPost {
//静态待发送事件池
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;//将要发送的事件
Subscription subscription;//将要发送的订阅关系
PendingPost next;//下一个待发送事件
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
//从池中获取一个待发送事件
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
//从末尾获取待发送事件, 保存传递进来的事件和订阅关系,返回
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
//如果池中数量为空, new一个PendingPost返回
return new PendingPost(event, subscription);
}
//释放当前pendingPost
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
//先把pendingPost中数据置空, 在保存在池中, 以复用
synchronized (pendingPostPool) {
//保证池中数量不超过10000
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
PendingPostQueue相当于一个待发送事件队列, 维护了一个头和一个尾.
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
//把传进来的pendingPost放到队列尾部
synchronized void enqueue(PendingPost pendingPost) {
if (pendingPost == null) {
throw new NullPointerException("null cannot be enqueued");
}
if (tail != null) {
tail.next = pendingPost;
tail = pendingPost;
} else if (head == null) {
head = tail = pendingPost;
} else {
throw new IllegalStateException("Head present, but no tail");
}
notifyAll();
}
//取出队列的头部
synchronized PendingPost poll() {
PendingPost pendingPost = head;
if (head != null) {
head = head.next;
if (head == null) {
tail = null;
}
}
return pendingPost;
}
//队列为空的话,等maxMillisToWait毫秒再取出头部
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
if (head == null) {
wait(maxMillisToWait);
}
return poll();
}
}
OK, 理解了这两个类再来看三大Poster, 就好理解了.
- HandlerPoster主线程处理器
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;//待发送事件队列
private final int maxMillisInsideHandleMessage;//处理消息超时时间
private final EventBus eventBus;
private boolean handlerActive;//是否正在处理消息
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();//初始化队列
}
void enqueue(Subscription subscription, Object event) {
//从静态池中取出一个PendingPost, 保存当前事件和订阅关系
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入队列
queue.enqueue(pendingPost);
//如果当前没有正在处理消息, 则发送一个空消息, 触发handleMessage去处理消息
if (!handlerActive) {
handlerActive = true;
//如果发送失败, 抛异常
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;//是否重新处理
try {
long started = SystemClock.uptimeMillis();
//循环从队列中取出一个待发送事件处理
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
//确保队列中没有待发送事件之后, handlerActive置为false, 退出循环
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//交给eventBus处理待发送的事件
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//如果消息处理超时, 发送一个空消息, 重新出发消息处理
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
可以看到EventBus处理事件的时候, 通过mainThreadPoster.enqueue(subscription, event)把事件加入了mainThreadPoster的队列中, Handler的消息处理机制不断循环从队列中取出待发送事件, 通过eventBus.invokeSubscriber(pendingPost)又传给EventBus处理, 这就实现了线程的切换, 因为EventBus初始化mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10)时指定了Looper.getMainLooper(), 也就指定了mainThreadPoster 是主线程中的Handler.
然后来看下EventBus的invokeSubscriber(pendingPost)
void invokeSubscriber(PendingPost pendingPost) {
//取出待发送事件中的事件及订阅关系
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
//把pendingPost置空,并加入到静态待发送事件池中
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
//通过反射调用订阅者的订阅方法
invokeSubscriber(subscription, event);
}
}
- BackgroundPoster后台线程处理器
final class BackgroundPoster implements Runnable {
private final PendingPostQueue queue;//待发送事件队列
private final EventBus eventBus;
private volatile boolean executorRunning;//是否正在处理
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();//初始化队列
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
//当前线程没有运行的话, 从线程池中new一个线程或取一个空闲线程来执行run()方法
if (!executorRunning) {
executorRunning = true;
//eventBus.getExecutorService()默认为Executors.newCachedThreadPool();
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
//循环从队列中取出待发送事件,交给EventBus处理
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
//确保队列中没有事件, executorRunning置为false, 退出循环
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
//在当前子线程中把待发送事件交给EventBus处理
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
BackgroundPoster 原理类似HandlerPoster, 把事件加入BackgroundPoster 的队列中, 然后启用了线程池中的线程来取出待发送事件, 并交给EventBus, 通过反射来处理事件.
- AsyncPoster 异步线程处理器
class AsyncPoster implements Runnable {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
是不是跟BackgroundPoster 几乎一样, 不同点就在于每次enqueue加入队列之后都调用了eventBus.getExecutorService()线程池中的一个新线程或空闲线程来执行操作( 取出一个待发送事件, 交给EventBus处理), 及每个事件都是在独立的线程中处理的. 而BackgroundPoster加入队列之后, 只有在未运行的时候才会从线程池中取一个新线程或空闲线程来执行操作(循环从队列中取出待发送事件并处理), 如果正在运行, 就在当前线程中执行操作.
EventBusAnnotationProcessor
EventBus源码中的EventBusAnnotationProcessor, 继承了AbstractProcessor, 是的, 就是应用APT技术, 在编译时扫描所有@Subscribe注解, 自动生成MyEventBusIndex来保存所有订阅者中的订阅信息, 当然MyEventBusIndex名称可以自己配置, 具体配置方法可参考这篇文章.
首先看下生成的MyEventBusIndex的结构:
public class MyEventBusIndex implements SubscriberInfoIndex {
//键: 订阅者对象 值: 订阅者的订阅信息
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
//扫描所有订阅者中被@Subscribe注解的所有方法, 以SimpleSubscriberInfo对象的形式保存在SUBSCRIBER_INDEX中
putIndex(new SimpleSubscriberInfo(TestRunnerActivity.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventMainThread", TestFinishedEvent.class, ThreadMode.MAIN),
}));
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscribeClassEventBusMain.class,
true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventMainThread", TestEvent.class, ThreadMode.MAIN),
}));
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.SubscribeClassEventBusDefault.class,
true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEvent", TestEvent.class),
}));
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscriberClassEventBusAsync.class,
true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventAsync", TestEvent.class, ThreadMode.ASYNC),
}));
putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscribeClassEventBusBackground.class,
true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventBackgroundThread", TestEvent.class, ThreadMode.BACKGROUND),
}));
}
private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}
//通过订阅者对象, 取出所有订阅信息
@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}
如何利用自动生成的MyEventBusIndex呢?
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
public EventBusBuilder addIndex(SubscriberInfoIndex index) {
if(subscriberInfoIndexes == null) {
subscriberInfoIndexes = new ArrayList<>();
}
//把传进来的MyEventBusIndex保存到subscriberInfoIndexes中
subscriberInfoIndexes.add(index);
return this;
}
public EventBus installDefaultEventBus() {
synchronized (EventBus.class) {
if (EventBus.defaultInstance != null) {
throw new EventBusException("Default instance already exists." +
" It may be only set once before it's used the first time to ensure consistent behavior.");
}
EventBus.defaultInstance = build();
return EventBus.defaultInstance;
}
}
public EventBus build() {
return new EventBus(this);
}
MyEventBusIndex继承了SubscriberInfoIndex, 把MyEventBusIndex加入到EventBusBuilder中, 再通过installDefaultEventBus生成EventBus实例, 传到EventBus静态实例defaultInstance中.到此就把MyEventBusIndex中的订阅信息导入EventBus实例了.
还记得EventBus构造方法中订阅方法扫描器subscriberMethodFinder 的初始化吗, 就是把EventBusBuilder的subscriberInfoIndexes信息传给了扫描器.
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
然后来看通过MyEventBusIndex查找订阅者的订阅方法findUsingInfo.
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//查找订阅信息
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//获取到该订阅者的所有订阅方法
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
//校验是否有重复的事件相同的订阅方法, 没有返回true
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
//没有重复, 把这个订阅方法存到findState.subscriberMethods中
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//没有找到在通过发射查找
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private SubscriberInfo getSubscriberInfo(FindState findState) {
//如果已经找到订阅者相关信息
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
//没有订阅者相关信息, 遍历所有通过EventBusAnnotationProcessor生成的SubscriberInfoIndex 类, 查找对应订阅者的订阅信息
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
关于EventBusAnnotationProcessor是如何在编译期间扫描注解生成MyEventBusIndex的, 我也没看太懂(尴尬...), 感兴趣的可以自己去看源码.
最后感谢:
http://www.jianshu.com/p/f057c460c77e
http://blog.csdn.net/lsyz0021/article/details/51985307