EventBus源码分析
本文基于EventBus 3.1.1
基本使用
1,定义Event:
public class MessageEvent {
public String message;
public MessageEvent(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
2,生命订阅方法,并且在适当的位置注册和解除注册
@Subscribe(threadMode = ThreadMode.MAIN)
public void getMessage( MessageEvent messageEvent){
tv.setText(messageEvent.getMessage());
}
@Override
protected void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
protected void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
3,发送事件
MessageEvent messageEvent = new MessageEvent("eventBus");
EventBus.getDefault().post(messageEvent);
本文主要分析EventBus的注册和发布流程,解除注册比较简单不做分析,不具体分析粘性事件。
一,注册流程
上图可简单概括EventBus的注册过程,下面我们根据源码具体分析:
EventBus.getDefault().register(this);
getDefault()方法是通过单例的方式获取一个 EventBus对象
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
EventBus对象的具体创建是通过建造者模式
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
//这三个Map是核心
subscriptionsByEventType = new HashMap<>();//Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType CopyOnWriteArrayList线程安全的list集合 用来存储eventType(订阅方法的参数) 和 订阅者和订阅方法的关系
typesBySubscriber = new HashMap<>();//Map<Object, List<Class<?>>> typesBySubscriber 维护的是订阅者和订阅方法之间的对应关系
stickyEvents = new ConcurrentHashMap<>();//Map<Class<?>, Object> stickyEvents
//eventBus线程间的调度
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
//记录事件总数
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//查找添加@subscribe注解的方法
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
//处理事件方法异常时是否需要打印log default true
logSubscriberExceptions = builder.logSubscriberExceptions;
//没有订阅者订阅此消息时是否打印log default true
logNoSubscriberMessages = builder.logNoSubscriberMessages;
//处理方法有异常,是否发送这个event default true
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
//没有处理方法时,是否需要发送sendNoSubscriberEvent这个标签 default true
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
//是否需要发送 throwSubscriberException 标签 default true
throwSubscriberException = builder.throwSubscriberException;
//与接收者有继承关系的类是否都需要发送 default true
eventInheritance = builder.eventInheritance;
//线程池
executorService = builder.executorService;
}
这里尤其要注意两个Map集合
Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType = new HashMap<>();
Map<Object, List<Class<?>>> typesBySubscriber = new HashMap<>();
CopyOnWriteArrayList是一个线程安全的arrayList, Subscription这个对象中封装了订阅者和谋一个订阅方法之间的关系
subscriptionsByEventType 这个map中维护的是EventType(订阅方法参数)和Subscription的关系
typesBySubscriber 这个map中维护的是订阅者和订阅方法之间的关系,我们注册过程的最终目的就是把订阅者和订阅方法填充到这个集合中。
register(this) 过程:
public void register(Object subscriber) {
//获取注册对象的class对象信息
Class<?> subscriberClass = subscriber.getClass();
//获取class对象的订阅方法,添加@Subscribe 注解的方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//对添加@Subscribe 注解的方法进行注册
subscribe(subscriber, subscriberMethod);
}
}
}
我们重点关注如何获取添加@Subscribe 注解的方法:
SubscriberMethod类对订阅方法进行了一定的封装,封装重要信息如下
public class SubscriberMethod {
final Method method;//订阅方法
final ThreadMode threadMode;//线程模型
final Class<?> eventType;//订阅者class
final int priority;//优先级
final boolean sticky;//是否粘性
....
}
findSubscriberMethods(subscriberClass);方法 目的是获取订阅方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
//若缓存中有则直接从缓存中获取
if (subscriberMethods != null) {
return subscriberMethods;
}
//缓存中没有则直接通过findUsingReflection() 方法 或者 findUsingInfo() 方法获取订阅方法
if (ignoreGeneratedIndex) {//是否强制使用反射,即使生成了索引,默认false
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//放入缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
ignoreGeneratedIndex表示是否强制使用发射获取订阅方法,true 通过反射来获取订阅方法,false则是在编译期间生成SubscriberInfo,然后在运行时使用SubscriberInfo中保存的事件,减少反射的内存消耗。
1,通过findUsingReflection(Class<?> subscriberClass) 方法 获取订阅方法列表
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//获取findState对象
FindState findState = prepareFindState();
//初始化findState对象
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findUsingReflectionInSingleClass(findState);
//递归查找父类
findState.moveToSuperclass();
}
//获取订阅方法
return getMethodsAndRelease(findState);
}
findUsingReflectionInSingleClass(findState);方法:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
//反射获取该类中声明的所有方法 包含public protected private
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
//获取类中的所有public方法
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//遍历所有的方法并且找到符合要求的方法,并且封装成SubscriberMethod 保存在findState.subscriberMethods 这个list中
for (Method method : methods) {
int modifiers = method.getModifiers();//获取方法参数修饰符
//只获取public修饰的方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();//获取方法参数数组
if (parameterTypes.length == 1) {//注解方法只能有一个参数
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);//获取Subscribe 注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];//将方法参数做eventType
if (findState.checkAdd(method, eventType)) {//检查方法是否合法
ThreadMode threadMode = subscribeAnnotation.threadMode();//获取方法执行线程模式(注解中获取)
//最终将合法的方法保存在findState.subscriberMethods中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//方法参数只能有一个
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//注解方法必须是public 非static 非abstract
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
EventBus的注解方法只能是public修饰并且只能有一个参数。
2,通过findUsingInfo(Class<?> subscriberClass)方法获取订阅方法列表
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 获取订阅者信息
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//subscriberInfo 为空则通过反射再获取一次
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);//取出FindState对象中的订阅方法列表
}
通过这种方式获取订阅方法列表,是通过注解在编译期动态生成一个MyEventBusIndex.java类,并将订阅方法保存在subscriberInfo 中,具体细节不再赘述。
获取到订阅方法列表之后,则继续执行订阅逻辑
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
subscribe(subscriber, subscriberMethod);方法具体细节如下:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//将订阅者和订阅方法封装成一个Subscription对象
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//根据事件类型从缓存中取出subscriptions
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {//没有订阅列表,则创建一个
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {//该订阅列表已经包含了此订阅方法则提示已经注册过了
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//根据优先级将方法添加到订阅列表
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//通过订阅者获取订阅方法的class列表
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {订阅方法的class列表为空则新建一个并放入typesBySubscriber这个map中
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//将订阅方法的class依次放入typesBySubscriber这个map中
subscribedEvents.add(eventType);
//如果是粘性事件的话需要做如下处理 粘性事件 就是在发送事件之后再订阅该事件也能收到该事件
//如果是粘性消息则将缓存中的消息发送给订阅
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
注册的过程主要是将订阅者subscriber(注册时传入的this)和订阅方法(添加@Subscribe的方法)的所有class类型存入typesBySubscriber这个map集合中。
二,post过程
post过程图解如下:
post(Object event) 方法:
public void post(Object event) {
//currentPostingThreadState 是一个ThreadLocal对象,保存了当前线程的PostingThreadState对象
PostingThreadState postingState = currentPostingThreadState.get();
//eventQueue为postingState中保存的一个list
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
//是否处于posting状态
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();//是否在主线程中执行
postingState.isPosting = true;
if (postingState.canceled) {//posting过程被取消
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//eventQueue中的所有消息依次发送
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
//发送结束状态重置
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
postSingleEvent(eventQueue.remove(0), postingState);方法:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//获取当前发送的event的class对象
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {// 没有找到此事件类型的订阅者逻辑
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
eventInheritance变量表示是否发送当前类的superClass的事件 默认为true,此变量若为true则会调用lookupAllEventTypes方法去查找父类。
//找出所有的类对象包括超类和接口
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
while (clazz != null) {//递归寻找
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
之后会接着调用 postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass):
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
最终调用postToSubscription(Subscription subscription, Object event, boolean isMainThread) 方法,此方法中最终会通过反射调用添加了@subscrube注解的方法。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
根据ThreadMode去判断在那个线程中执行,最终都会听过invokeSubscriber(subscription, event);方法反射调用订阅方法。
ThreadMode的说明:
-
POSTING:默认的 ThreadMode,不管是不是主线程直接调用订阅方法。
-
MAIN:主线程中执行订阅方法,如果发布线程是主线程则直接调用订阅方法否则通过Handler回调到主线程。
-
MAIN_ORDERED:与MAIN类似但是是顺序执行。
-
BACKGROUND:在后台线程中执行响应方法。如果发布线程不是主线程,则直接调用订阅者的事件响应函数,否则启动唯一的后台线程去处理。
-
Async:不论发布线程是否为主线程,都使用一个空闲线程来处理。
本篇分析就到这里,此外推荐一篇EventBus源码分析的文章,写的特别好,地址如下:
https://www.jianshu.com/p/af61682450d1?mType=Group