EventBus源码解析
1-注册
1.1-核心
注册主要原理就是EventBus的三个map变量的注册:
- subscriptionsByEventType | EventType为key,Subscription列表为Value
- typesBySubscriber | 注册对象subscriber为key,EventType列表为Value
- stickyEvents | EventType为key,Event对象为Value
字段说明:
字段 | 说明 |
---|---|
EventType | 事件类型,即@Subscribe注解的订阅方法的第一个参数对应的Class对象 |
Subscription | 封装了订阅对象subscriber及订阅方法SubscriberMethod |
SubscriberMethod | 封装了订阅对象的订阅方法的相关信息,即@Subscribe注解参数及注解的方法信息 |
subscriber | 注册对象,例如在MainActivty注册即为MainActivity的实例对象 |
Event | 事件对象,即@Subscribe注解的订阅方法的第一个入参实参对象 |
1.2-注册具体源码
从EventBus.getDefault().register(this);
开始,this传入的是当前注册类的Activity对象
/**
* EventBus.register方法
*/
public void register(Object subscriber) {
//@1.获取对象类对应的Class对象
Class<?> subscriberClass = subscriber.getClass();
//@2.通过反射获取Class对象中注解的方法列表
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//@5.遍历获取的目标方法列表,并注册
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
@2.findSubscriberMethods方法获取Class对象中@Subscribe注解的方法列表。subscriberMethodFinder是EventBus的成员变量具体实现
//SubscriberMethodFinder.findSubscriberMethods方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//先读缓存
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//ignoreGeneratedIndex默认为false,若自定义EventBusBuilder则为ture
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//@3.通过反射获取Class文件的带注解方法
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
//注册类没有@Subscriber注解,抛出异常
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//更新缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
@3.通过反射获取Class文件的带注解方法。
//SubscriberMethodFinder.findUsingInfo方法
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//通过FindState来辅助查找
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
//第一次注册为null
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//@4.筛选目标方法,并将目标方法相关信息封装到findState中的集合
findUsingReflectionInSingleClass(findState);
}
//将findState.clazz修改为父类,到父类遍历
findState.moveToSuperclass();
}
//返回findState中的SubscriberMethod列表,并释放findState
return getMethodsAndRelease(findState);
}
@4.findUsingReflectionInSingleClass方法
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 通过反射获取Class对象的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
。。。
}
//遍历所有方法,根据条件筛选出目标方法
for (Method method : methods) {
//获取方法修饰符
int modifiers = method.getModifiers();
//方法是public且非abstract、static等
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取方法的形参类型
Class<?>[] parameterTypes = method.getParameterTypes();
//只有一次参数
if (parameterTypes.length == 1) {
//获取Subscribe注解类
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//获取订阅事件的类型
Class<?> eventType = parameterTypes[0];
//判断findState是否添加过该事件类型为key的键值对,未添加过返回true
if (findState.checkAdd(method, eventType)) {
//获取注解中指定的线程
ThreadMode threadMode = subscribeAnnotation.threadMode();
//将订阅方法、事件类型、线程、优先级、是否支持粘性事件等封装成SubscriberMethod
//并添加到findState的subscriberMethods集合中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
。。。
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
。。。
}
}
}
@5.遍历获取的目标方法列表,并注册。主要是注册三个map。
//调用EventBus.subscribe方法注册
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//获取订阅方法的事件类型
Class<?> eventType = subscriberMethod.eventType;
//将方法参数SubscriberMethod及注册类Object封装为Subscription
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//查找注册类是否有对应的Subscription列表
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
//没有则创建集合 subscriptions
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
//根据newSubscription的优先级插入到subscriptions集合中
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//判断typesBySubscriber map中是否存在当前注册类的事件类型列表
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//添加事件类型到列表
subscribedEvents.add(eventType);
//如果支持粘性事件
//1.4-粘性事件
if (subscriberMethod.sticky) {
。。。
}
}
总结注册流程:
- 1-根据反射获取注册类Class对象的所有方法
- 2-遍历方法列表根据条件筛选目标方法(public修饰一个形参,Subscribe注解)
- 3-将目标方法信息及指定线程、优先级、粘性等封装到SubscriberMethod。最终返回筛选出来的SubscriberMethod列表
-
4-遍历SubscriberMethod列表,并分别注册到EvnetBus的三个map集合中
图1-注册流程图
2-取消注册
EventBus.getDefault().unregister(this);
//EventBus.unregister方法
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
//遍历注册对象-eventType列表的map
for (Class<?> eventType : subscribedTypes) {
//@1
unsubscribeByEventType(subscriber, eventType);
}
//将该对象从map中移除
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
@1.调用unsbscribeByEventType将eventType-事件列表的map中移除目标对象
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//通过EventType获取对应的事件列表
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
//依次删除事件列表中与目标对象一致的subscription
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
总结解注册流程:
- 从object-eventType列表的map中遍历eventType
- 获取eventType对应的事件列表subscriptions,依次移除订阅对象关联的事件
- 将订阅对象从object-eventType的map中移除
3-事件发送
EventBus.getDefault().post(new Object());
调用EventBus.post方法。在EventBus中维护一个PostingThreadState。各自线程维护一份,保存了当前线程的事件队列,线程状态,发送状态,是否主线程等信息。
- ThreadLocal<PostingThreadState> currentPostingThreadState
final static class PostingThreadState {
//当前线程的事件队列
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;//是否发送状态
boolean isMainThread;//是否主线程
Subscription subscription;//辅助记录待通知的订阅者
Object event;//辅助记录待发送的事件
boolean canceled;
}
public void post(Object event) {
//获取当前线程的PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
//获取待发送事件队列
List<Object> eventQueue = postingState.eventQueue;
//将当前发送事件入列
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//遍历待发送事件列表
while (!eventQueue.isEmpty()) {
//@1发送单个事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//重置postingState状态
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
@1.逐一发送事件
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//拿到该事件所有的父类事件类型
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//遍历事件类型
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//@2查找订阅者
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//没有订阅
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
//发送NosubscriberEvent事件
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
@2.通过eventType-订阅者列表找到对应的订阅者列表,依次发送事件到对应的订阅者
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//获取eventType-订阅者的map对应的订阅列表
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
//遍历订阅者Subscription
for (Subscription subscription : subscriptions) {
//记录订阅者及事件
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
//@3发送事件到订阅者
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
//清除PostingState状态
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
@3.发送事件到对应的订阅者
。该模式下涉及到的三个列表:
- HandlerPoster mainThreadPoster;//切换到主线程执行
- BackgroundPoster backgroundPoster;//切换到线程池执行,有同步锁
- AsyncPoster asyncPoster;//切换到线程池执行,无同步锁
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据订阅方法设置的线程模式执行
switch (subscription.subscriberMethod.threadMode) {
//默认线程模式,在发送事件的线程接收事件
case POSTING:
//@4.通过反射执行订阅者的订阅方法
invokeSubscriber(subscription, event);
break;
//指定主线程
case MAIN:
if (isMainThread) {//如果当前是主线程则直接执行
invokeSubscriber(subscription, event);
} else { //@5.否则子线程加入队列,通过Handler切换到主线程执行
mainThreadPoster.enqueue(subscription, event);
}
break;
//无论哪个线程都加入队列,通过handler在主线程执行
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
//后台执行
case BACKGROUND:
if (isMainThread) {//@7.主线程,加入队列,通过线程池执行
backgroundPoster.enqueue(subscription, event);
} else {//子线程,直接执行
invokeSubscriber(subscription, event);
}
break;
//无论哪个线程都加入队列,线程池执行
case ASYNC:
//@8.AsyncPoster与BackgroundPoster不同的是
//没有加同步锁
//每次执行都会开一个子线程执行
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
@4.通过反射执行订阅者类的订阅方法
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
@5.子线程加入队列,通过Handler切换到主线程执行
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//将subscription和event封装成PendingPost对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入到队列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//主线程Handler发送消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
//主线程Handler接收并处理消息
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//取出队列
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//@6.执行事件
eventBus.invokeSubscriber(pendingPost);
。。。
}
} finally {
handlerActive = rescheduled;
}
}
}
@6.取出PendingPost并通过反射执行订阅方法
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
//是否PendingPost的引用资源
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
//通过反射调用对应的方法,此时在主线程执行
invokeSubscriber(subscription, event);
}
}
@7.BACKGROUND线程模式下,主线程加入队列,通过线程池切换到子线程执行。BackgroundPoster原理和HandlerPoster类似。
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//用PendingPost封装subscription及event
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入队列
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
//调用newCachedThreadPool线程池,执行任务
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
//执行订阅事件
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
@8.ASYNC线程模式下,所有线程进入队列,通过线程池切换到子线程执行。具体由AsyncPoster实现,BackgroundPoster是通过CachedThreadPool来管理线程,而AsyncPoster则是有EventBusBuilder传入的线程池来管理线程。且BackgroundPoster执行时加了synchronized同步锁,而AsyncPoster未加锁。
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
//通过线程池切换到子线程执行
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
事件发送总结:
- 1.将发送的事件保存在postingState中
- 2.遍历postingState的事件列表eventQueue
-
3.遍历eventQueue根据线程模式直接或间接通过反射执行订阅方法
事件发送流程图
4-粘性事件
粘性事件使用:
//@1.订阅处理粘性事件
@Subscribe(sticky = true)
public void testEventBust(Object obj) {
。。。
}
//@2.发送粘性事件
EventBus.getDefault().postSticky(new Object());
@1.订阅处理粘性事件,当sticky=true时,在EventBus.subscribe注册方法中
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//见1-@5
。。。
if (subscriberMethod.sticky) {
if (eventInheritance) {
//遍历stickyEvents集合
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
//如果方法制成粘性事件,则根据线程模式处理事件 checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
@2.发布粘性事件。
public void postSticky(Object event) {
// 将要发布的粘性事件存stickyEvents中
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// 普通的发布事件
post(event);
}
粘性事件总结:
- 通过postSticky将粘性事件存储到stickyEvents中
- 调用post发送粘性事件,若订阅者活跃则会消耗该事件并从stickyEvents中移除
- 订阅者注册时,遍历stickyEvents集合,通过postToSubscription发送Event给当前订阅者并执行