EventBus3.0源码分析
一、基本用法
- 添加依赖和注解处理器
android {
defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
arguments = [eventBusIndex: 'org.greenrobot.eventbusperf.EventBusIndex']
}
}
}
}
dependencies {
implementation 'org.greenrobot:eventbus:3.1.1'
annotationProcessor 'org.greenrobot:eventbus-annotation-processor:3.0.1'
}
- 在Application添加注解处理器自动生成的类
EventBus.builder().addIndex(new EventBusIndex()).installDefaultEventBus();
- 注册为事件观察者和声明接收事件的方法,注意@Subscribe注解的方法要为public,不然编译报错
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
public void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void onEventMsg(String str) {
Log.d(TAG, "onEventMsg: " + str);
}
- 发送事件
EventBus.getDefault().post("发送事件");
二、扩展
看看@Subscribe 注解的定义
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
ThreadMode threadMode() default ThreadMode.POSTING;
boolean sticky() default false;
int priority() default 0;
}
可以看到使用该注解时我们有3个变量可以配置
@Subscribe(threadMode = ThreadMode.MAIN, priority = -2, sticky = true)
public void onEventMsg(String str) {
Log.d(TAG, "onEventMsg: " + str);
}
//发送粘性事件
EventBus.getDefault().postSticky("发送事件");
- threadMode:用来声明接收事件的方法运行的线程,有5种取值
- sticky:是否接收粘性事件
- priority:事件优先级,priority越大优先级越高
ThreadMode有5种不同的取值:
- POSTING:发送事件在哪个线程处理事件就在哪个线程
- MAIN:处理事件在主线程
- MAIN_ORDERED:和MAIN的区别就是即使发送事件是在主线程,也通过Handler发送消息来执行处理事件的方法,可以保证non-blocking
- BACKGROUND:如果发送事件是子线程,那么处理事件就在该子线程,如果发送事件是主线程,那么处理事件在线程池的子线程,值得注意的是,所有事件都在同一子线程中执行
- ASYNC:无论发送事件在哪个线程,处理事件都在线程池的子线程
三、源码分析
一、注解处理器生成类
编译阶段注解处理器会扫描@Subscribe注解,生成.java文件,我们需要在Application初始化时创建这些自动生成的类的对象,添加到EventBus对象中去。
生成的.java文件在build/generated/source/apt路径下,文件名为我们在gradle声明的名称,下面看看生成的文件内容
public class EventBusIndex implements SubscriberInfoIndex {
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
putIndex(new SimpleSubscriberInfo(com.yjx.myapplication.ScrollingActivity.class, true,
new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onEventMsg", String.class, ThreadMode.MAIN),
new SubscriberMethodInfo("onEventMsg2", String.class, ThreadMode.MAIN),
}));
}
private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}
@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}
可以看到EventBusIndex类维护了一个HashMap,key为订阅者,value为订阅者接收事件的方法信息。接下去看下在Application的操作:
EventBus.builder().addIndex(new EventBusIndex()).installDefaultEventBus();
public EventBusBuilder addIndex(SubscriberInfoIndex index) {
if (subscriberInfoIndexes == null) {
subscriberInfoIndexes = new ArrayList<>();
}
subscriberInfoIndexes.add(index);
return this;
}
比较简单,不多说,主要是通过构建者模式创建一个EventBus单例对象,将生成的类的对象添加到subscriberInfoIndexes这个集合。
二、注册为观察者和反注册
直接看register方法
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 关键方法1 查找该观察者所声明的所有接受事件的方法,即@Subscribe注解的方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
// 关键方法1
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 先从缓存找
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 3.0以上ignoreGeneratedIndex的值默认为false
if (ignoreGeneratedIndex) {
// 通过反射获取接收事件的方法的信息
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 通过注解器生成的EventBusIndex类中获得该订阅者的接收事件的方法的信息
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
findSubscriberMethods这个方法可以看到,有两种方式获取该订阅者的接收事件的方法的信息,一种是通过反射,一种是使用注解处理器生成的类。还有一个小细节,如果最后没找到@Subscribe注解的方法,则会抛异常,所以,一旦你调用register方法,你就应该声明接收事件的方法。
先看看从注解处理器寻找的操作
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// FindState 此类可以理解为一个保存当前寻找状态的类
// 因为在寻找订阅者接收事件的方法时,不仅在当前类,还在其父类找
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 关键方法1
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
// 关键方法2
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
// 将接收事件的方法添加到findState对象的List<SubscriberMethod>
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
// 指向父类,重新执行相同的操作
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
// 关键方法1
private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
// 遍历我们在Application初始化时添加的EventBusIndex对象
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
// 调用注解处理器帮我们生成的方法拿到接收事件方法的信息
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
// 关键方法2,根据方法信息构造SubscriberMethod数组
@Override
public synchronized SubscriberMethod[] getSubscriberMethods() {
int length = methodInfos.length;
SubscriberMethod[] methods = new SubscriberMethod[length];
for (int i = 0; i < length; i++) {
SubscriberMethodInfo info = methodInfos[i];
methods[i] = createSubscriberMethod(info.methodName, info.eventType, info.threadMode,
info.priority, info.sticky);
}
return methods;
}
再看看通过反射的方式是怎么获取接收事件的方法信息
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
主要是反射获取类里面的public的方法,获取方法的注解,将方法的信息保存到SubscriberMethod对象。
我们再回到register方法,findSubscriberMethods不管通过哪种方式,最终得到都是一个List<SubscriberMethod>集合,下面继续看
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 关键方法1 查找该观察者所声明的所有接受事件的方法,即@Subscribe注解的方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
// 关键方法2
subscribe(subscriber, subscriberMethod);
}
}
}
// 关键方法2
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
// 1.获取订阅同一事件类型的所有@Subscribe的方法
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
// 2.根据优先级的顺序插入到列表里
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 3.获取该订阅者的接收事件的集合,将当前接收事件方法添加到集合中
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 4.是否是粘性事件
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
大致的流程引用EventBus 3.0 源码分析的图片,非常清晰
register流程图.png接下来看反注册unregister方法
public synchronized void unregister(Object subscriber) {
// 获取该订阅者的接收事件的集合
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
// 从订阅同一事件类型的列表中移除该订阅者
unsubscribeByEventType(subscriber, eventType);
}
// 移除该订阅者
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
三、发送事件
public void post(Object event) {
// 1.通过ThreadLocal获取当前线程的PostingThreadState对象,该对象记录着当前线程发送事件的状态
PostingThreadState postingState = currentPostingThreadState.get();
// 2.将事件添加到事件队列里
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
// 3.循环取出队列里的时间进行处理
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post方法主要做的事情就是上面标注的3步,下面重点看看postSingleEvent方法
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// 是否要匹配父类和接口,默认为true
if (eventInheritance) {
// 1.寻找当前时间类型的父类和接口,例如你发送String类型,那么接收Object类型的方法也可以接收该事件
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 2. 具体发送事件的方法
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
接着看postSingleEventForEventType方法
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 1.获取订阅该事件类型的订阅者
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 2.根据treadmode发送事件,subscription保存的是接收事件的方法信息
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 直接反射调用接收事件的方法,即不切换线程,发送事件和处理事件在同一线程
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
// 如果发送事件是在主线程,那么不用切换线程
invokeSubscriber(subscription, event);
} else {
// 通过Handler发送消息在主线程中处理
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
// 如果是发送事件在主线程,那么在线程池的子线程调用接收事件的方法
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
我们来看看几个Poster,Poster是一个interface,里面只有一个enqueue方法,它有3个实现类,分别是HandlerPoster、BackgroundPoster、AsyncPoster。下面分别看看这3个类
public class HandlerPoster extends Handler implements Poster {
// 接收事件的方法队列
private final PendingPostQueue queue;
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
// 反射调用处理事件的方法
eventBus.invokeSubscriber(pendingPost);
...
}
} finally {
handlerActive = rescheduled;
}
}
}
HandlerPoster其实就是一个Handler,里面维护了一个队列,一个PendingPost就代表一个接收事件的方法,队列里保存的是所有接收事件的方法。当调用enqueue方法时,就往队列里面添加一个PendingPost,然后发送消息到主线程,处理消息
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 线程池执行
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
// 反射调用处理事件方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
...
}
} finally {
executorRunning = false;
}
}
}
BackgroundPoster是一个Runnable,里面也维护了一个队列,尤其需要注意的是,队列里的每一个方法只会在线程池里的同一个线程执行,不会同时开启多个子线程。主要由executorRunning 这个标志位来控制,这一点和下面的AsyncPoster不同。
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
AsyncPoster也是一个Runnable,里面也维护了一个队列,它每enqueue一个PendingPost时,都会使用一个新的空闲线程。
post发送事件的流程大致的过程就分析完了,最后我们看看postSticky发送粘性事件
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
其实就多了把事件放进一个保存粘性事件的Map这一步,接下去还是调用post方法。在上面分析register过程中我们知道,如果是接收事件的方法的sticky为true,会做以下的操作
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
// 匹配和接收事件相同类型的粘性事件
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
// 主要是这个方法
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
@Subscribe注解的sticky如果为true,那么在register时,就会取出粘性事件调用发送事件的方法,即postToSubscription