三方库-EventBus源码解析(二)
EventBus源码解析(二)
源码版本:
- EventBus:3.3.1
导航:
- 三方库-EventBus源码解析(一)
- 三方库-EventBus源码解析(二)
- 更多的文章看这里:主页
EventBus的post
EventBus --> post()
public void post(Object event) {
// 1、获取当前线程的PostingThreadState,currentPostingThreadState是一个ThreadLocal对象,保证每个线程都有一个PostingThreadState对象,保证了线程安全。
PostingThreadState postingState = currentPostingThreadState.get();
// 2、获取当前线程的事件队列
List<Object> eventQueue = postingState.eventQueue;
// 3、将要发送的事件加入到当前线程的事件队列中
eventQueue.add(event);
// 4、判断是否正在发送事件,如果正在发送中,则不再执行里面的逻辑,因为事件已经添加到eventQueue中,它会继续走里面while循环的判断逻辑。
if (!postingState.isPosting) {
// 5、事件未被发送中,则初始化PostingThreadState,遍历事件队列发送。
// 6、判断当前线程是否是主线程
postingState.isMainThread = isMainThread();
// 7、标记当前线程正在发送中
postingState.isPosting = true;
// 8、如果当前线程已经取消,则抛出异常。
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {eventQueue.remove(0)
// 9、遍历事件队列,只要有事件,就一直发送。
while (!eventQueue.isEmpty()) {
// 10、发送事件,eventQueue.remove(0)说明post进来的事件,会依次发送。
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 11、当前线程的所有事件完成,还原状态。
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post()
方法,为事件的发送,兼容了多线程多发送多事件(多事件后面介绍)的情况。它使用ThreadLocal
对象,保证每个线程都有一个PostingThreadState
对象,保证了线程安全,它通过postingState.isMainThread
记录了发送线程是否是主线程,然后它会依次进行发送。
说明:
postingState.isMainThread
,记录了发送线程是否是主线程。- 步骤10,
eventQueue.remove(0)
,说明post()
进来的事件,会依次发送。
我们再来看一下currentPostingThreadState
属性和PostingThreadState
类。
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>(); // 发送线程的事件队列
boolean isPosting; // 发送线程是否正在发送中
boolean isMainThread; // 发送线程是否在主线程
Subscription subscription; // 发送线程正在发布的Subscription
Object event; // 发送线程正在发布的Event
boolean canceled; // 发送线程是否已取消
}
PostingThreadState
类为发送线程的状态类,状态信息看上面注释。currentPostingThreadState
是一个ThreadLocal
对象,保证每个线程都有一个PostingThreadState
对象,保证了线程安全。
EventBus --> postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 11、获取发送事件的Class
Class<?> eventClass = event.getClass();
// 12、记录是否找到Subscription
boolean subscriptionFound = false;
if (eventInheritance) { // eventInheritance:事件是否有继承性,默认为true。
// 13、事件有继承性,则发送[发送Event的所有父类以及所有父接口]的Event。
// 14、获取到[发送Event的所有父类以及所有父接口]。
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
// 15、遍历集合,发送单个事件。
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 16、发送单个事件,并判断是否找到Subscription(只要有一个有,就代表找到)。
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 17、事件没有继承性,则发送[发送Event]的Event。
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
// 18、没有找到,判断是否打印Log,或者发送NoSubscriberEvent。
if (logNoSubscriberMessages) {
// 19、没有订阅者时,打印异常信息。
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
// 20、没有订阅者时,发送NoSubscriberEvent事件。
post(new NoSubscriberEvent(this, event));
}
}
}
postSingleEvent()
方法,为事件的发送,兼容了多线程单发送多事件的情况。如果事件有继承性,则发送[发送Event的所有父类以及所有父接口]的Event
(此为多事件),否则则发送[发送Event]的Event
,并判断没有找到,是否打印Log
,或者发送NoSubscriberEvent
。
我们再来看一下lookupAllEventTypes
方法。
EventBus --> lookupAllEventTypes
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
// 使用了同步并且锁唯一,保证了线程有序执行。
synchronized (eventTypesCache) {
// 从缓存中获取当前Event的已维护的所有事件(父类、父接口)集合。
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
// 集合为空,说明是第一次获取,则创建集合,并维护集合,并将其添加到缓存中。
eventTypes = new ArrayList<>();
// 当前clazz默认为eventClass
Class<?> clazz = eventClass;
// 遍历当前clazz,只要不为空,就继续遍历。
while (clazz != null) {
// 添加当前clazz,第一次会把eventClass添加进去。
eventTypes.add(clazz);
// 添加当前clazz的所有接口class
addInterfaces(eventTypes, clazz.getInterfaces());
// 获取当前clazz的父类,以继续添加。
clazz = clazz.getSuperclass();
}
// 将维护好的集合,添加到缓存。
eventTypesCache.put(eventClass, eventTypes);
}
// 最后返回此维护好的所有事件(父类、父接口)集合。
return eventTypes;
}
}
// 循环递归添加当前类的所有的接口class到eventTypes。
// 举例:当前类,实现A、B两个接口,A实现C、D接口,B实现E接口,C、D、E未实现任何接口,最终会把A、B、C、D、E接口全部添加到eventTypes中。
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
// 遍历所有接口
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
// 此接口未被包含,则进行添加。
eventTypes.add(interfaceClass);
// 继续递归增加父接口的所有接口,使用递归以完成其所有父接口。
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
lookupAllEventTypes()
方法,为获取到[发送Event的所有父类以及所有父接口]。
EventBus --> postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 21、根据事件获取所有订阅它的订阅者,在同步方法内执行,保证了HashMap的get方法线程安全。
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 22、有订阅者,遍历集合进行发送,返回true(代表已经找到)。
// 23、遍历所有此类型的订阅者进行发送
for (Subscription subscription : subscriptions) {
// 24、记录发送线程正在发布的Event
postingState.event = event;
// 25、记录发送线程正在发布的Subscription
postingState.subscription = subscription;
// 26、是否是中断的
boolean aborted;
try {
// 27、将事件发送给订阅者,postingState.isMainThread为post方法记录的。
postToSubscription(subscription, event, postingState.isMainThread);
// 28、事件是否被取消
aborted = postingState.canceled;
} finally {
// 29、重置postingState
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
// 30、事件被取消,则取消此类型的全部发布。
break;
}
}
return true;
}
// 31、无订阅者,不进行发送,返回false(代表没有找到)。
return false;
}
postSingleEventForEventType()
方法,为事件的发送,兼容了多线程单发送单事件的情况。根据事件获取所有订阅它的Subscription
(持有订阅者对象、订阅方法对象),然后进行遍历依次发送。
EventBus --> postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 32、根据订阅方法ThreadMode调用订阅方法
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 33、发布线程(默认)
// 34、调用反射直接通知将事件
invokeSubscriber(subscription, event);
break;
case MAIN:
// 35、主线程
if (isMainThread) {
// 36、在Android上并且是主线程,或者不在Android上,调用反射直接通知。
invokeSubscriber(subscription, event);
} else {
// 37、在Android上并且不是主线程,将事件添加到主线程队列中,在主线程调用。
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
// 38、主线程(有序)
if (mainThreadPoster != null) {
// 39、在Android上,始终将事件添加到主线程队列中,在主线程调用。
mainThreadPoster.enqueue(subscription, event);
} else {
// 40、不在Android上,直接通过反射调用。
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
// 41、后台线程
if (isMainThread) {
// 42、在Android上并且是主线程,或者不在Android上,则将事件添加到后台线程队列中,在后台线程调用。
backgroundPoster.enqueue(subscription, event);
} else {
// 43、在Android上并且不是主线程,直接通过反射调用。
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 44、异步线程
// 45、将事件添加到异步线程队列中,在异步线程调用。
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
postToSubscription()
方法,为事件的通知,兼容了多线程单发送单事件的情况。根据订阅方法的ThreadMode
调用订阅方法,如果需要线程切换,则切换线程进行调用;否则,直接调用。
说明:
mainThreadPoster
(即HandlerPoster
)、backgroundPoster
、asyncPoster
这3个Poster
,最终都会调用invokeSubscriber()
方法进行通知,详细介绍看后面的-Poster
。EventBus
的注册,如果是粘性方法并且已经发送了此事件,也会调用postToSubscription()
方法进行通知。
EventBus --> invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 46、反射调用订阅者对象的订阅方法,并传入event参数。
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
// 47、处理调用订阅方法异常
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
private void handleSubscriberException(Subscription subscription, Object event, Throwable cause) {
if (event instanceof SubscriberExceptionEvent) {
// SubscriberExceptionEvent事件,判断调用订阅方法异常时是否打印Log(不发送事件防止递归)。
if (logSubscriberExceptions) {
// 调用订阅方法异常时,打印异常信息。
// Don't send another SubscriberExceptionEvent to avoid infinite event recursion, just log
logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass()
+ " threw an exception", cause);
SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event;
logger.log(Level.SEVERE, "Initial event " + exEvent.causingEvent + " caused exception in "
+ exEvent.causingSubscriber, exEvent.throwable);
}
} else {
// 普通事件,判断调用订阅方法异常时是否抛出异常、打印Log、发送SubscriberExceptionEvent。
if (throwSubscriberException) {
// 调用订阅方法异常时,抛出SubscriberException异常。
throw new EventBusException("Invoking subscriber failed", cause);
}
if (logSubscriberExceptions) {
// 调用订阅方法异常时,打印异常信息。
logger.log(Level.SEVERE, "Could not dispatch event: " + event.getClass() + " to subscribing class "
+ subscription.subscriber.getClass(), cause);
}
if (sendSubscriberExceptionEvent) {
// 调用订阅方法异常时,发送SubscriberExceptionEvent事件。
SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event,
subscription.subscriber);
post(exEvent);
}
}
}
invokeSubscriber()
方法,为反射调用订阅者对象的订阅方法,并传入event
参数。handleSubscriberException()
方法,为处理调用订阅方法异常,判断是否抛出异常、打印Log
、发送SubscriberExceptionEvent
。
线程模型
POSTING(发布线程)
事件的订阅和事件的发布处于同一线程(步骤34
)。这是默认设置。事件传递意味着最少的开销,因为它完全避免了线程切换。因此,对于已知在非常短的时间内完成而不需要主线程的简单任务,这是推荐的模式。使用此模式的事件处理程序必须快速返回,以避免阻塞可能是主线程的发布线程。
说明:
- 事件的发布(可能会阻塞Android主线程)
- 事件的接收和发布在同一个线程,由于没有使用线程池,所以发布一定会阻塞线程。
- 事件的接收(可能会阻塞Android主线程)
- 事件的接收和发布在同一个线程,由于可能是主线程发送,所以接收可能会阻塞主线程,所以接收里不推荐执行耗时操作。
MAIN(主线程)
在Android
上,订阅者将在Android
的主线程(UI线程)中被调用。如果发布线程是主线程,则将直接调用订阅方法,从而阻塞发布线程(步骤36
)。否则,事件将排队等待传递(非阻塞)(步骤37
)。使用此模式的订阅者必须快速返回以避免阻塞主线程。如果不是在Android
上,则行为与POSTING
相同(步骤36
)。
说明:
- 事件的发布(一定会阻塞Android主线程)
- 在
Android
上并且是主线程
,或者不在Android
上,则行为与POSTING
相同(步骤36
),所以发布一定会阻塞线程。- 在
Android
上并且不是主线程
,将事件插入到主线程队列中(步骤37
),由于使用了线程池,所以发布一定不会阻塞线程。
- 事件的接收(一定会阻塞Android主线程)
- 事件的接收在主线程,所以接收一定会阻塞主线程,所以接收里不能执行耗时操作。
- 事件的有序性(可能不是有序的)
- 在
Android
上,如果先非主线程发布(排队等待,可能会等待很长时间才会执行到自己),后主线程发布(直接调用),所以在Android
上事件它可能是不是有序的。- 在非
Android
上,由于是直接调用,所以事件一定是有序的。
MAIN_ORDERED(主线程-有序)
在Android
上,订阅者将在Android
的主线程(UI线程)中被调用。与MAIN
不同的是,事件总是排队等待传递(步骤39
)。这确保了post
调用是非阻塞的。
说明:
- 事件的发布(一定不会阻塞Android主线程)
- 在
Android
上,始终将事件添加到主线程队列中,在主线程调用,由于使用Handler
发布,所以发布一定不会阻塞线程。- 不在
Android
上,则行为与POSTING
相同,所以发布一定会阻塞线程。
- 事件的接收(一定会阻塞Android主线程)
- 事件的接收在主线程,所以接收一定会阻塞主线程,所以接收里不能执行耗时操作(同
MAIN
)。
- 事件的有序性(一定是有序的)
- 在
Android
上,始终添加到主线程队列,所以事件一定是有序的(步骤39
)。- 在非
Android
上,由于是直接调用,所以事件一定是有序的(步骤40
)。
BACKGROUND(后台)
在Android
上,订阅者将在后台线程中被调用。如果发布线程不是主线程,则将在发布线程中直接调用订阅方法(步骤43
)。如果发布线程是主线程,则EventBus
使用一个单独的后台线程,它将按顺序传递其所有事件(步骤42
)。使用此模式的订阅者应该尝试快速返回,以避免阻塞后台线程。如果不在Android
上,则始终使用后台线程(步骤42
)。
说明:
- 事件的发布(一定不会阻塞Android主线程)
- 在
Android
上并且是主线程
,或者不在Android
上,将事件插入到后台线程队列中(步骤42
),由于使用了线程池,所以发布一定不会阻塞线程。- 在
Android
上并且不是主线程
,则行为与POSTING
相同(步骤43
),所以发布一定会阻塞线程。
- 事件的接收(可能会阻塞线程池线程)
- 在
Android
上并且是主线程
,使用backgroundPoster
执行(步骤42
),由于backgroundPoster
里判断了如果正在执行中,则不再创建线程执行,而是用上次线程一块执行(详细看下面的-Poster
-backgroundPoster
),所以接收可能会阻塞后台线程,所以接收里不推荐执行耗时操作。
ASYNC(异步)
订阅者将在单独的线程中调用。这始终独立于发布线程和主线程。使用此模式发布事件从不等待订阅方法。如果订阅方法的执行可能需要一些时间(例如,用于网络访问),则应使用此模式。避免同时触发大量长时间运行的异步订阅方法以限制并发线程的数量。EventBus
使用线程池高效地重用已完成异步订阅者通知中的线程。
说明:
- 事件的发布(一定不会阻塞Android主线程)
- 订阅者将在单独的线程中调用(
步骤45
),由于使用了线程池,所以发布一定不会阻塞线程。
- 事件的接收(不会阻塞线程池线程)
- 订阅者将在单独的线程中调用,使用
asyncPoster
执行(步骤45
),由于asyncPoster
里执行每次都在线程池里面使用新的线程执行(详细看下面的-Poster
-asyncPoster
),所以接收里能执行耗时操作。
小结
EventBus
的post()
,获取当前线程的事件队列,将要发送的事件加入到其中。- 遍历事件队列,只要有事件,就一直发送。
- 判断事件是否有继承性,如果有则根据事件类型,找到当前
Event
的所有父类以及所有父接口的class
集合,遍历这个集合,依次发送单个事件;否则,则只发送当前Event
一个事件。- 根据事件获取所有订阅它的
Subscription
集合,遍历集合,将事件发送给订阅者的订阅方法。- 发送给订阅者时,根据订阅方法的线程模型调用订阅方法,如果需要线程切换,则切换线程进行调用;否则,直接调用。
EventBus的postSticky
EventBus --> postSticky()
public void postSticky(Object event) {
synchronized (stickyEvents) {
// 1、将事件添加到粘性事件集合中
stickyEvents.put(event.getClass(), event);
}
// 2、发送事件
post(event);
}
说明:
EventBus
的post()
,只会发送给已经注册的订阅者。EventBus
的postSticky()
,会发送给已经注册的订阅者、之后要注册的订阅者的粘性订阅方法。
小结
EventBus
的postSticky()
,将事件加入到stickyEvents
集合中,当有新的订阅者注册后,如果该订阅者订阅了该事件的粘性订阅方法,则会发送给此方法。- 将事件发送给已经注册的订阅者。
EventBus的注销
EventBus --> unregister()
public synchronized void unregister(Object subscriber) {
// 1、获取订阅者订阅的所有事件
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
// 2、遍历此订阅者订阅的所有事件集合
for (Class<?> eventType : subscribedTypes) {
// 3、通过eventType,将此事件的Subscription集合中含有该订阅者的Subscription,从集合中移除。
unsubscribeByEventType(subscriber, eventType);
}
// 4、将此订阅者从typesBySubscriber中移除
typesBySubscriber.remove(subscriber);
} else {
// 5、未注册,调用了注销,Log打印警告。
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
unregister()
方法,为注销订阅者。从typesBySubscriber
中移除自己,从subscriptionsByEventType
中移除subscriber
内的多个事件类型。
EventBus --> unsubscribeByEventType()
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 6、获取该事件的所有订阅者
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
// 7、遍历上面的集合,找到此订阅者的Subscription,然后从此集合中移除。
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
// 8、找到,从此集合中移除。
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
unsubscribeByEventType()
方法,为从subscriptionsByEventType
中移除subscriber
内的单个事件类型。
说明:
- 可以先遍历
subscriptionsByEventType
,然后再遍历subscriptionsByEventType
的值,最后匹配subscriber
,通过这种方式可以找到要移除的Subscription
,但是这种实现要比上面实现效率低。
小结
EventBus
的注销,获取订阅者订阅的所有事件,遍历所有事件,通过事件从subscriptionsByEventType
中,获取该事件的所有Subscription
,遍历Subscription
集合,找到此订阅者的Subscription
,然后从此集合中移除。- 将此订阅者从
typesBySubscriber
中移除。
Poster
Poster
public interface Poster {
void enqueue(Subscription subscription, Object event);
}
Poster
接口,它有一个enqueue
方法,将subscription
、event
入队执行。它有3个实现类,为HandlerPoster
、BackgroundPoster
、AsyncPoster
,我们来分别看一下。
mainThreadPoster
HandlerPoster
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
public HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
// 这里传入的是主线程的Looper对象,所以这个Handler对象是主线程的Handler
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
// 创建一个事件队列
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 根据传入的参数封装一个PendingPost对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 将PendingPost加入到队列中
// 发一个,执行一个,再发送,如果已经正在执行中,则不再立即执行,而是和上次一块执行
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// 调用Handler的sendMessage,发送事件回到主线程,最终会调用下面的handleMessage方法
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
// 无限循环
while (true) {
PendingPost pendingPost = queue.poll(); // 获取并移除第一个
// 进行两次检查,确保pendingPost不为null,如果为null直接跳出循环
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
// 一直循环到queue结束,才返回
handlerActive = false;
return;
}
}
}
// 使用反射的方法调用订阅者的订阅方法。
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
HandlerPoster
类继承Handler
,并实现Poster
接口,enqueue
方法将PendingPost
加入到queue
队列,然后发送空消息,最终走到handleMessage
方法内,其内部判断,只要queue
队列有数据就执行,最后使用反射调用订阅者的订阅方法。
backgroundPoster
BackgroundPoster
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
// 初始化队列
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 封装PendingPost对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 将PendingPost对象加入到队列中
// 发一个,执行一个,再发送,如果已经正在执行中,则不再立即执行,而是和上次一块执行
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 这里使用到之前初始化的线程池
eventBus.getExecutorService().execute(this);
}
}
}
// 线程池的执行回调
@Override
public void run() {
// 实现同HandlerPoster
try {
try {
// 无限循环
while (true) {
// 获取队列中的PendingPost,进行双重检查,如果为null直接返回,结束循环
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
// 一直循环到queue结束,才返回
executorRunning = false;
return;
}
}
}
//使用反射的方式调用订阅者的订阅方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
BackgroundPoster
类实现Runnable
接口,并实现Poster
接口,enqueue
方法将PendingPost
加入到queue
队列,然后使用线程池执行此Runnable
,最终走到run
方法内,其内部判断,只要queue
队列有数据就执行,最后使用反射调用订阅者的订阅方法。
asyncPoster
AsyncPoster
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
// 发一个,执行一个
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
AsyncPoster
类实现Runnable
接口,并实现Poster
接口,enqueue
方法将PendingPost
加入到queue
队列,然后使用线程池执行此Runnable
,最终走到run
方法内,其内部,获取queue
队列最新数据然后执行,最后使用反射调用订阅者的订阅方法。
说明:
BackgroundPoster
类和AsyncPoster
类线程执行区别:BackgroundPoster
类,如果正在执行中,则用上一次的线程执行,否则用新线程执行。AsyncPoster
类,始终用新的线程执行。
小结
Poster
接口,它有3个实现类,为HandlerPoster
、BackgroundPoster
、AsyncPoster
。- 这几个类,
enqueue
方法将PendingPost
(持有subscription
对象、event
对象)加入到queue
队列,然后切换到指定的线程执行,最后使用反射调用订阅者的订阅方法。
SubscriberMethodFinder
SubscriberMethodFinder
class SubscriberMethodFinder {
private static final int BRIDGE = 0x40;
private static final int SYNTHETIC = 0x1000;
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
private List<SubscriberInfoIndex> subscriberInfoIndexes;
private final boolean strictMethodVerification;
private final boolean ignoreGeneratedIndex;
private static final int POOL_SIZE = 4;
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification,
boolean ignoreGeneratedIndex) {
this.subscriberInfoIndexes = subscriberInfoIndexes;
this.strictMethodVerification = strictMethodVerification;
this.ignoreGeneratedIndex = ignoreGeneratedIndex;
}
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 1、先从之前缓存的集合中获取
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
// 2、如果之前缓存了,直接返回
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
// 忽略注解处理器生成的索引,所以直接用反射获取,ignoreGeneratedIndex默认为false。
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 3、获取所有订阅方法集合
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 4、放入缓存集合中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// 5、从数组中获取FindState对象,如果有直接返回,如果没有创建一个新的FindState对象。
FindState findState = prepareFindState();
// 6、根据事件订阅者初始化findState
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 7、获取subscriberInfo,初始化为null。
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
// 通过注解处理器处理
// 获取到所有订阅者方法
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
// 通过检查,给findState.subscriberMethods增加订阅者方法
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 8、通过反射的方式获取订阅者中的Method
findUsingReflectionInSingleClass(findState);
}
// 移动到父类,以便后续操作。
findState.moveToSuperclass();
}
// 释放
return getMethodsAndRelease(findState);
}
// 从findState中获取订阅者所有方法并释放
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
// 获取订阅者所有订阅方法集合
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
// findState进行回收
findState.recycle();
// 将回收后的findState,恢复到FIND_STATE_POOL中。
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
// 返回集合
return subscriberMethods;
}
// 准备FindState对象,先从缓存中获取,如果有则获取并在缓存中移除;如果没有则进行创建并返回。
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}
// 获取订阅者信息
private SubscriberInfo getSubscriberInfo(FindState findState) {
// 处理SubscriberInfoIndex添加手动创建的AbstractSubscriberInfo(一般不会手动创建)。
// -findState.subscriberInfo != null:说明调用者调用findState.subscriberInfo = getSubscriberInfo(findState)的getSubscriberInfo(findState)返回的不为null,即subscriberInfoIndexes有值。
// -findState.subscriberInfo.getSuperSubscriberInfo() != null:说明手动创建的AbstractSubscriberInfo,而不是注解处理器生成添加的SimpleSubscriberInfo。
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
// 要获取的Class等于父类的Class,则返回父类的Class。
return superclassInfo;
}
}
// 处理addIndex()添加的SubscriberInfoIndex。
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
// 通过反射获取订阅者方法
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// 获取FindState,有复用。
FindState findState = prepareFindState();
// 初始化FindState,因为有复用,所以得初始化。
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 反射获取,如果找到则存到findState.subscriberMethods里。
findUsingReflectionInSingleClass(findState);
// 移动到父类,会更改findState.clazz
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 9、订阅者中所有声明的方法,放入数组中。
// getDeclaredMethods,获取的是当前类的所有方法。
// getMethods,获取的是当前类及其所有父类的所有公共方法。
methods = findState.clazz.getDeclaredMethods();
// 因为getDeclaredMethods,获取的是当前类的所有方法,所以findState.skipSuperClasses为false,为不跳过父类,即继续查找父类。
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
try {
// 10、获取订阅者中声明的public方法,设置跳过父类
// 因为getMethods,获取的是当前类及其所有父类的所有公共方法,所以findState.skipSuperClasses为true,为跳过父类,即不查找父类。
methods = findState.clazz.getMethods();
} catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
String msg = "Could not inspect methods of " + findState.clazz.getName();
if (ignoreGeneratedIndex) {
// 忽略注解生成器生成的索引,提示:请考虑使用EventBus注释处理器来避免反射。
msg += ". Please consider using EventBus annotation processor to avoid reflection.";
} else {
// 忽略注解生成器生成的索引,提示:请使这个类对EventBus注释处理器可见,以避免反射。
msg += ". Please make this class visible to EventBus annotation processor to avoid reflection.";
}
throw new EventBusException(msg, error);
}
findState.skipSuperClasses = true;
}
// 遍历这些方法
for (Method method : methods) {
// 11、获取方法的修饰符:public、private等等
int modifiers = method.getModifiers();
// 12、订阅方法为public同时不是abstract、static、bridge、synthetic。
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 13、方法参数类型数组
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
// 14、获取方法的注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// 15、如果有注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 16、将method和eventType放入到findState进行检查
if (findState.checkAdd(method, eventType)) {
// 17、获取注解中的threadMode对象
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 18、新建一个SubscriberMethod对象,同时加入到findState中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
// 非1个参数,并且是严格模式,并且方法上有Subscribe注解,则抛异常警告。
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
// 方法的修饰符不符合,并且是严格模式,并且方法上有Subscribe注解,则抛异常警告。
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
}
SubscriberMethodFinder
类findSubscriberMethods
方法,找到此订阅者的所有订阅方法。它先从缓存中获取,如果缓存中有,直接返回;如果缓存中没有,通过反射或注解处理器的方式获取到此订阅者的所有订阅方法,并放入到集合中进行返回。
总结
以上就是全面的EventBus
源码了!之后会出其它三方库
源码系列,请及时关注。如果你有什么问题,大家评论区见!
最后推荐一下我的网站,开发者的技术博客: devbolg.cn ,目前包含android相关的技术,之后会面向全部开发者,欢迎大家来体验!