消息总线那些事儿
项目到了一定阶段会出现一种甜蜜的负担:业务的不断发展与人员的流动性越来越大,代码维护与测试回归流程越来越繁琐。这个时候需要对项目进行两方面的重构:
1.分层操作,方便复用
2.模块解耦,减少影响
小英团队经过多次调研之后有如下成果:
分层操作
下图是项目的分层模型:
ProgectUI:界面展示层,包括各个Activity,Fragment页面,以及相对复杂的一些UI组件等
Bussiness:实际业务层,比如:用户点击登录按钮,去执行登录的操作
BussinessService:业务服务层,对外提供数据服务。比如:用户信息模块对外提供用户相关的所有信息
Interface层:网络请求与数据缓存层;将用户的网络接口单独作为一层,并根据实际需要设置是否进行缓存。
DbCache层:数据库与数据模型转换层;所有数据库操作都使用DbCache;
CoreService层:功能同FrameWork层,但是较重,故拆出;CoreService与FrameWork层具备业务无关性、通用性;主要有:分发器,Hybrid,热修复以及埋点等
FrameWork层:基础的技术组件(网络库,图片库等)、三方服务封装以及通用UI等;
以用户信息模块为例,介绍一下具体的实现过程:
ProgectUI对应NewLoginActivity:界面展示层,登录页面,通过ChrLoginView与业务逻辑解耦
Bussiness对应QuickLoginPresenter:实际业务层,用户注册、登录、忘记密码等操作;依赖View与Module,处理实际的业务逻辑
BussinessService对应UserInstance:业务服务层.存储账号信息与用户信息。通过实现接口UserInfoInterface,向外提供用户信息服务。
Interface对应ApiService:登录相关的后台接口。
FrameWork对应ApiUtils:封装网络库Retrofit,代理网络请求
模块解耦
总体采用依赖注入的方式将服务的实现与使用分离:
UserInstance维护用户模块所有的信息,并通过UserInfoInterface与其他模块进行隔离
ServiceManager负责维护各个模块服务的注册并提供访问的接口
UserBean实体bean作为数据通信的格式,负责统计用户模块所有的信息
EventBus消息总线负责向外提供本模块的方法调用
总体思想:依赖注入负责对外暴露数据;EventBus负责对外暴露回调方法。
接下来就来介绍本文的主体:EventBus
简单介绍
EventBus-Android端事件发布/订阅框架,特点如下:
简化组件之间的通信。Android常用的通信方式:Broadcast、Listener、静态变量以及通过Handler进行线程之间的通信等。都可以统一使用EventBus。
简化代码。不同于使用Listener通信方式,层层传递,模块之间耦合严重。EventBus使用非常简单并且无耦合
高性能。框架中采用缓存、池化技术、细粒度锁、索引加速等方式使开发者无需关注安全性能方面的问题
依赖包不足50k
高级属性:线程模型、优先级以及是否接收粘性事件等
总线的工作机制如图:
订阅者通过EventBus订阅相关事件,并准备好回调方法
发布者将事件发送给post出去,EventBus负责通知订阅者
极简使用
分为五步:导入依赖、初始化总线、定义事件、注册订阅者、发送事件
导入依赖
项目中Module的build.gradle中添加依赖:
compile 'org.greenrobot:eventbus:3.0.0’
如果不需要索引加速,直接跳到第二步。
索引加速使用到编译时注解,所以需要在项目gradle中添加apt编译插件:
classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8’
在Module中申请apt插件生成索引的完整类名
apply plugin: 'com.neenbedankt.android-apt'
apt {
arguments {
eventBusIndex “com.chinahrMyEventBusIndex"
}
}
Module中引入EventBusAnnotationProcessor:
apt 'org.greenrobot:eventbus-annotation-processor:3.0.1’
编译项目之后,就可以在\app\build\generate\source\apt下看到生成的索引类
初始化总线
EventBus可以通过getDefault()方法获取单例
EventBus.getDefault().register(this);
也可以通过EventBus.builder()去构造自定义的EventBus。另外还可以通过Bulder.installDefaultEventBus()修改默认的单例
EventBus.builder().eventInheritance(true).installDefaultEventBus();
如果需要索引加速,将编译时生成的Index通过Builder添加进去
EventBus.getDefault().builder().addIndex(new MyEventBusIndex());
定义事件
所有可以被实例化成object的类都可以作为事件
public class MyEvent {
}
注册订阅者
订阅事件的类中执行register()
EventBus.getDefault().register(this);
并在监听事件的回调方法上添加注解@SubScribe,可配置属性:方法执行的线程模型,分发的优先级,是否接收粘性事件
@Subscribe(threadMode = ThreadMode.MAIN,priority = 0,sticky = true)
public void onEventLogin(MyEvent myEvent){
Toast.makeText(MainActivity.this,"登录成功",Toast.LENGTH_LONG).show();
}
为了防止内存泄露,在总线中对订阅者进行注销,比如在Activity的OnDestroy()中:
EventBus.getDefault().unregister(this);
发送事件
调用post(myEvent)或postSticky(myEvent)
EventBus.getDefault().post(new MyEvent());
以上完成了消息订阅/发布的整个流程。
接下来将会说明框架内部的结构:
整体框架
四部分组成,如图:
数据元素
框架主要涉及这些数据元素:订阅者Subscriber,方法主体Method,事件event,事件类型eventType,线程模型ThreadMode,方法优先级priority,是否接收粘性事件sticky
SubscriberMethod与订阅者subscriber组合成Subscription,即订阅方法。
一个方法的执行需要方法主体(Method),调用方法的对象(Subscriber),参数(事件event)
post(event)之后结合Subscription就可以完成Method的调用。
EventBus
框架的门面,维护三个Map,并负责分发事件与执行订阅方法。
Map, CopyOnWriteArrayList>subscriptionsByEventType.事件类型与订阅方法列表的对应关系。注册事件,发送事件都是在操作此map.
Map>>typesBySubscriber.订阅者与订阅事件类型的对应关系。为了方便注销订阅者。
Map, Object>stickyEvents.粘性事件列表
调度器
回调方法通过四种方式进行分发,即线程模型ThreadMode:
POSTING:默认模式,直接在当前线程执行
MAIN:如果当前是主线程,直接执行;如果不是主线程,通过handler发送到主线程执行。
BACKGROUND:如果是主线程,交给backgroundPoster去调度;如果不是主线程就直接执行。
ASYN:交给asyncPoster调度。asyncPoster会直接在线程池当中开启一个线程执行。
索引加速
编译时,apt插件通过EventBusAnnotationProcessor提取注解@Subscribe生成索引MyEventBusIndex.索引内部维护一个Map,辅助EventBus查找方法信息
Map, SubscriberInfo>SUBSCRIBER_INDEX.订阅者的类型与回调方法列表对应关系
整个流程一句话总结:在订阅者准备好处理事件的回调方法之后,EventBus根据订阅者对象经过反射或者索引加速获取回调方法的信息,接收发布的事件event,按照指定的线程模型执行回调方法。
EventBus内部设计十分精致,对于编程技能的提高有非常大的帮助。接下来介绍:
设计思想
作为一个框架主要有四方面的设计:门面、调度器、线程安全、性能调优等
门面
1.EventBus类,门面模式中的门面类对外提供集中化和简化的沟通管道。总线的所有操作(注册订阅者,发送事件,调度执行,注销订阅者等)被封装到EventBus中,有效地屏蔽实现细节,使用和维护起来非常方便。
public classEventBus {
/**
* Registers the given subscriber to receive events. Subscribers must call {@link#unregister(Object)} once they
* are no longer interested in receiving events.
*
* Subscribers have event handling methods that must be annotated by {@linkSubscribe}.
* The {@linkSubscribe} annotation also allows configuration like {@link
* ThreadMode} and priority.
*/
public void register(Object subscriber) {Class subscriberClass = subscriber.getClass();
List subscriberMethods =subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized(this) {
for(SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
/** Posts the given event to the event bus. */
public voidpost(Object event) {
PostingThreadState postingState =currentPostingThreadState.get();
List eventQueue = postingState.eventQueue;
eventQueue.add(event);
if(!postingState.isPosting) {
postingState.isMainThread= Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting=true;
if(postingState.canceled) {
throw newEventBusException("Internal error. Abort state was not reset");
}
try{
while(!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
}finally{
postingState.isPosting=false;
postingState.isMainThread=false;
}
}
}
voidinvokeSubscriber(Subscription subscription, Object event) {
try{
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
}catch(InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
}catch(IllegalAccessException e) {
throw newIllegalStateException("Unexpected exception", e);
}
}
/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {List> subscribedTypes =typesBySubscriber.get(subscriber);
if(subscribedTypes !=null) {
for(Class eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
}else{
Log.w(TAG,"Subscriber to unregister was not registered before: "+ subscriber.getClass());
}
}
}
2.EventBus对象的构建,采用volatile实例与双重检查加锁方式保证线程安全。
static volatileEventBusdefaultInstance;
/** Convenience singleton for apps using a process-wide EventBus instance. */
public staticEventBus getDefault() {
if(defaultInstance==null) {
synchronized(EventBus.class) {
if(defaultInstance==null) {
defaultInstance=newEventBus();
}
}
}
returndefaultInstance;
}
3.采用Builder模式辅助构建实例,避免因构造参数多带来的构造器繁多,即避免重复重叠构造器模式;也避免了采用javaBean封装参数带来的修改一致性问题。
public classEventBus {
private static finalEventBusBuilderDEFAULT_BUILDER=newEventBusBuilder();
EventBus(EventBusBuilder builder) {
subscriptionsByEventType=newHashMap<>();
typesBySubscriber=newHashMap<>();
stickyEvents=newConcurrentHashMap<>();
mainThreadPoster=newHandlerPoster(this, Looper.getMainLooper(),10);
backgroundPoster=newBackgroundPoster(this);
asyncPoster=newAsyncPoster(this);
indexCount= builder.subscriberInfoIndexes!=null? builder.subscriberInfoIndexes.size() :0;
subscriberMethodFinder=newSubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions= builder.logSubscriberExceptions;
logNoSubscriberMessages= builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent= builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent= builder.sendNoSubscriberEvent;
throwSubscriberException= builder.throwSubscriberException;
eventInheritance= builder.eventInheritance;
executorService= builder.executorService;
}
public staticEventBusBuilder builder() {
return newEventBusBuilder();
}
}
public classEventBusBuilder {
/** Default: true */
publicEventBusBuilder logSubscriberExceptions(booleanlogSubscriberExceptions) {
this.logSubscriberExceptions= logSubscriberExceptions;
return this;
}
/** Builds an EventBus based on the current configuration. */
publicEventBus build() {
return newEventBus(this);
}
}
4.构造方法采用public,可以构建在项目中对不同的子模块创建消息总线。Builder中提供修改默认单例的方法installDefaultEventBus(),是全局单例更加灵活。
public classEventBus {
/**
* Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a
* central bus, consider {@link#getDefault()}.
*/
publicEventBus() {
this(DEFAULT_BUILDER);
}
}
public classEventBusBuilder {
* Installs the default EventBus returned by {@linkEventBus#getDefault()} using this builders' values. Must be
* done only once before the first usage of the default EventBus.
*
*@throwsEventBusException if there's already a default EventBus instance in place
*/
publicEventBus installDefaultEventBus() {
synchronized(EventBus.class) {
if(EventBus.defaultInstance!=null) {
throw newEventBusException("Default instance already exists."+
" It may be only set once before it's used the first time to ensure consistent behavior.");
}
EventBus.defaultInstance= build();returnEventBus.defaultInstance;
}
}
}
总结一句话:EvntBus作为框架的门面,采用双检锁,builder模式,构造参数public,封装流程细节,使得总线可以灵活配置与统一管理。
数据结构Map化
利用对象的class属性作为Map的key值可以使代码更加简洁。这也是只需要传入一个对象就可以驱动整个流程运行起来的关键。
private finalMap, CopyOnWriteArrayList>subscriptionsByEventType;
private finalMap>>typesBySubscriber;
private finalMap, Object>stickyEvents;
// Must be called in synchronized block
private voidsubscribe(Object subscriber, SubscriberMethod subscriberMethod) {
…………
…………
…………
if(subscriberMethod.sticky) {
if(eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List).
Set, Object>> entries =stickyEvents.entrySet();
for(Map.Entry, Object> entry : entries) {
Class candidateEventType = entry.getKey();
if(eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}else{
Object stickyEvent =stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
public voidpostSticky(Object event) {
synchronized(stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
这个可以借鉴到组件化设计当中的服务管理器ServiceManager,维护一个以服务接口类为key,实现类作为value的Map。简化注册调用方法。
public classServiceManager {
Map serviceMap=new HashMap<>();static volatileServiceManagerdefaultInstance;
public staticServiceManager getInstance(){
if(defaultInstance==null){
synchronized(ServiceManager.class){
if(defaultInstance==null){
defaultInstance=newServiceManager();
}
}
}
returndefaultInstance;
}
public voidregister(Object serviceInstance){
serviceMap.put(serviceInstance.getClass().getInterfaces()[0],serviceInstance);
}
publicTgetService(Class serviceType){
return(T)serviceMap.get(serviceType);
}
}
public interfaceUserinfoInterface {
publicUserBean getUserInfo();
}
注册服务
public classUserInstanceimplementsUserinfoInterface {
UserInstance(){
ServiceManager.getInstance().register(this);
}
@Override
publicUserBean getUserInfo() {
return newUserBean();
}
}
其他模块调用用户信息:
public classResumeInstance {
voiddoSomeThing(){
UserinfoInterface userinfoInterface=ServiceManager.getInstance().getService(UserinfoInterface.class);
if(userinfoInterface!=null)
userinfoInterface.getUserInfo();
}
}
调度器
Android的线程有个特点:主线程不能被阻塞,UI的更新位于主线程,耗时操作如网络处理在后台线程
框架采用下面的方式进行调度:
每个调度器都维护一个待处理方法队列PendingPostQueue。值得一提的是:poll(intmaxMillisToWait)出队时如果当前队列为空,会释放当前对象的锁,等待队列填充。这个功能将在下面性能分析时解释
final classPendingPostQueue {
privatePendingPosthead;
privatePendingPosttail;
synchronized voidenqueue(PendingPost pendingPost) {
if(pendingPost ==null) {
throw newNullPointerException("null cannot be enqueued");
}
if(tail!=null) {
tail.next= pendingPost;
tail= pendingPost;
}else if(head==null) {
head=tail= pendingPost;
}else{
throw newIllegalStateException("Head present, but no tail");
}
notifyAll();
}
synchronizedPendingPost poll() {
PendingPost pendingPost =head;
if(head!=null) {
head=head.next;
if(head==null) {
tail=null;
}
}
returnpendingPost;
}
synchronizedPendingPost poll(intmaxMillisToWait)throwsInterruptedException {
if(head==null) {
wait(maxMillisToWait);
}
returnpoll();
}
}
HandlerPoster负责在主线程中处理事件,显然它是Handler的子类
final classHandlerPosterextendsHandler {
private finalPendingPostQueuequeue;
private final intmaxMillisInsideHandleMessage;
private finalEventBuseventBus;
private booleanhandlerActive;
HandlerPoster(EventBus eventBus, Looper looper,intmaxMillisInsideHandleMessage) {
super(looper);
this.eventBus= eventBus;
this.maxMillisInsideHandleMessage= maxMillisInsideHandleMessage;
queue=newPendingPostQueue();
}
voidenqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized(this) {
queue.enqueue(pendingPost);
if(!handlerActive) {
handlerActive=true;
if(!sendMessage(obtainMessage())) {
throw newEventBusException("Could not send handler message");
}
}
}
}
@Override
public voidhandleMessage(Message msg) {
booleanrescheduled =false;
try{
longstarted = SystemClock.uptimeMillis();
while(true) {
PendingPost pendingPost =queue.poll();
if(pendingPost ==null) {
synchronized(this) {
// Check again, this time in synchronized
pendingPost =queue.poll();
if(pendingPost ==null) {
handlerActive=false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
longtimeInMethod = SystemClock.uptimeMillis() - started;
if(timeInMethod >=maxMillisInsideHandleMessage) {
if(!sendMessage(obtainMessage())) {
throw newEventBusException("Could not send handler message");
}
rescheduled =true;
return;
}
}
}finally{
handlerActive= rescheduled;
}
}
}
BackgroundPoster继承自Runnable,某一时段内待处理都会在BackgroundPoster的run方法中排队处理。BackgroundPoster正在被线程池执行时,executorRunning==true,此时发布的事件只会进队列,不会再次调用线程池的execute方法。事件全部处理后才退出死循环,设置executorRunning=fasle,此后再发布事件才会在线程池中开辟一个新线程。
上文提到(PendingPostQueue.poll(int))在队列为空的时候等待队列中添加元素。以及executorRunning标示位的使用,都是为了重用当前对象。
final classBackgroundPosterimplementsRunnable {
private finalPendingPostQueuequeue;
private finalEventBuseventBus;
private volatile booleanexecutorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus= eventBus;
queue=newPendingPostQueue();
}
public voidenqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized(this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public voidrun() {
try{
try{
while(true) {
PendingPost pendingPost = queue.poll(1000);
if(pendingPost ==null) {
synchronized(this) {
// Check again, this time in synchronized
pendingPost =queue.poll();
if(pendingPost ==null) {
executorRunning=false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
}catch(InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() +" was interruppted", e);
}
}finally{
executorRunning=false;
}
}
}
AyncPoster很简单,来了事件就在线程池中开辟线程执行。
classAsyncPosterimplementsRunnable {
private finalPendingPostQueuequeue;
private finalEventBuseventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus= eventBus;
queue=newPendingPostQueue();
}
public voidenqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public voidrun() {
PendingPost pendingPost =queue.poll();
if(pendingPost ==null) {
throw newIllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
总体一句话:三种方式的线程模型与一个方法链表组成一个非常标准Android版线程调度器。
线程安全
框架在应对多线程方面做了很多设计
1.粒度锁
public classEventBus {
private finalMap, CopyOnWriteArrayList>subscriptionsByEventType;
private finalMap>>typesBySubscriber;
private finalMap, Object>stickyEvents;
public voidregister(Object subscriber) {
Class subscriberClass = subscriber.getClass();
List subscriberMethods =subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
private booleanpostSingleEventForEventType(Object event, PostingThreadState postingState, Class eventClass) {
CopyOnWriteArrayList subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
…………
}
public booleanhasSubscriberForEvent(Class eventClass) {
List> eventTypes =lookupAllEventTypes(eventClass);
if(eventTypes !=null) {
intcountTypes = eventTypes.size();
for(inth =0; h < countTypes; h++) {
Class clazz = eventTypes.get(h);
CopyOnWriteArrayList subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(clazz);
}
if(subscriptions !=null&& !subscriptions.isEmpty()) {
return true;
}
}
}
return false;
}
}
只在使用到需要同步的数据结构时,才去synchronized(this),获取当前对象锁。另外,对于在部分流程才会使用的数据结构并不会占用当前对象锁,比如:stickyEvents,但问题就出在这里,stickyEvents是ConcurrentHashMap线程安全的,并且粒度比当前对象小。
public classEventBus {
private finalMap, Object>stickyEvents;
public voidpostSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
publicTgetStickyEvent(Class eventType) {
synchronized (stickyEvents) {
return eventType.cast(stickyEvents.get(eventType));
}
}
EventBus(EventBusBuilder builder) {
subscriptionsByEventType=newHashMap<>();
typesBySubscriber=newHashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadPoster=newHandlerPoster(this, Looper.getMainLooper(),10);
backgroundPoster=newBackgroundPoster(this);
asyncPoster=newAsyncPoster(this);
indexCount= builder.subscriberInfoIndexes!=null? builder.subscriberInfoIndexes.size() :0;
subscriberMethodFinder=newSubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions= builder.logSubscriberExceptions;
logNoSubscriberMessages= builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent= builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent= builder.sendNoSubscriberEvent;
throwSubscriberException= builder.throwSubscriberException;
eventInheritance= builder.eventInheritance;
executorService= builder.executorService;
}
2.数据结构的合理使用。
方法列表采用CopyOnWriteArrayList,CopyOnWrite容器即写时复制的容器。通俗的理解是当往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。缺点就是会造成内存占用双份,以及数据一致性问题(CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。)考虑到方法只会在注册的时候进行添加,因此采用偏缓存性质的数据结构更适合于当前的应用场景。
线程状态信息采用ThreadLocal类型进行保存,ThreadLocal会为每一个线程提供一个独立的变量副本,从而隔离了多个线程对数据的访问冲突。因为每一个线程都拥有自己的变量副本,从而也就没有必要对该变量进行同步了。
public classEventBus {
private finalMap,CopyOnWriteArrayList>subscriptionsByEventType;
private finalThreadLocalcurrentPostingThreadState=newThreadLocal() {
@Override
protectedPostingThreadState initialValue() {
return newPostingThreadState();
}
};
总结一句话:通过synchronized与ThreadLocal的数据类型,构建出线程安全以及简洁优美的代码。
高性能
1.索引加速
总线注册时只传入通过注解标记回调方法的订阅者对象。EventBus把获取订阅方法的过程放在编译时,避免运行时反射带来的性能问题
@SupportedAnnotationTypes("org.greenrobot.eventbus.Subscribe")
@SupportedOptions(value = {"eventBusIndex", "verbose"})
public class EventBusAnnotationProcessor extends AbstractProcessor {
/** Found subscriber methods for a class (without superclasses). 被注解表示的方法信息 */
private final ListMap methodsByClass = new ListMap<>();
private final Set classesToSkip = new HashSet<>(); // checkHasErrors检查出来的异常方法
@Override
public boolean process(Set annotations, RoundEnvironment env) {
Messager messager = processingEnv.getMessager();
try {
String index = processingEnv.getOptions().get(OPTION_EVENT_BUS_INDEX);
if (index == null) { // 如果没有在gradle中配置apt的argument,编译就会在这里报错
messager.printMessage(Diagnostic.Kind.ERROR, "No option " + OPTION_EVENT_BUS_INDEX +
" passed to annotation processor");
return false;
}
/** ... */collectSubscribers(annotations, env, messager);// 根据注解拿到所有订阅者的回调方法信息checkForSubscribersToSkip(messager, indexPackage);// 筛掉不符合规则的订阅者
if (!methodsByClass.isEmpty()) {createInfoIndexFile(index);// 生成索引类
}
/** 打印错误 */
}
/** 下面这些方法就不再贴出具体实现了,我们了解它们的功能就行 */
private void collectSubscribers // 遍历annotations,找出所有被注解标识的方法,以初始化methodsByClass
private boolean checkHasNoErrors // 过滤掉static,非public和参数大于1的方法
private void checkForSubscribersToSkip // 检查methodsByClass中的各个类,是否存在非public的父类和方法参数
/** 下面这三个方法会把methodsByClass中的信息写到相应的类中 */
private void writeCreateSubscriberMethods
private void createInfoIndexFile
private void writeIndexLines
}
获取所有订阅者的回调方法信息之后,生成Index。运行时就可直接使用Map去查找回调方法。
packagecom.example.wangbinlong.myapplication;
importorg.greenrobot.eventbus.meta.SimpleSubscriberInfo;
importorg.greenrobot.eventbus.meta.SubscriberMethodInfo;
importorg.greenrobot.eventbus.meta.SubscriberInfo;
importorg.greenrobot.eventbus.meta.SubscriberInfoIndex;
importorg.greenrobot.eventbus.ThreadMode;
importjava.util.HashMap;
importjava.util.Map;
/** This class is generated by EventBus, do not edit. */
public classMyEventBusIndeximplementsSubscriberInfoIndex {
private static final Map, SubscriberInfo>SUBSCRIBER_INDEX;
static {
SUBSCRIBER_INDEX= new HashMap, SubscriberInfo>();
putIndex(new SimpleSubscriberInfo(MainActivity.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onLogin", EventBusManager.LoginEvent.class, ThreadMode.MAIN),
}));
putIndex(new SimpleSubscriberInfo(SecondActivity.Inner.class, true, new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onStickyEvent", EventBusManager.StickEvent.class, ThreadMode.MAIN, 0, true),
}));
}
private static voidputIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}
@Override
publicSubscriberInfo getSubscriberInfo(Class subscriberClass) {
SubscriberInfo info =SUBSCRIBER_INDEX.get(subscriberClass);
if(info !=null) {
returninfo;
}else{
return null;
}
}
}
总结一句话:如果项目中通过反射可以获取的对应关系,可以通过获取编译时注解的方式进行索引加速
2.池化技术
EventBus的设计非常精致,但是有一个明显的缺陷:产生很多中间对象。为了最大限度地减少影响,项目中多处使用缓存,对象池。
METHOD_CACHE缓存订阅者类型与回调方法的列表,避免重复查找
classSubscriberMethodFinder {
List findSubscriberMethods(Class subscriberClass) {
List subscriberMethods =METHOD_CACHE.get(subscriberClass);
if(subscriberMethods !=null) {
returnsubscriberMethods;
}
if(ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
}else{
subscriberMethods = findUsingInfo(subscriberClass);
}
if(subscriberMethods.isEmpty()) {
throw newEventBusException("Subscriber "+ subscriberClass
+" and its super classes have no public methods with the @Subscribe annotation");
}else{
METHOD_CACHE.put(subscriberClass, subscriberMethods);
returnsubscriberMethods;
}
}
}
private static finalMap, List>METHOD_CACHE=newConcurrentHashMap<>();
eventTypesCache缓存事件类型与(事件类型父类和接口)的关系
private static finalMap, List>>eventTypesCache=newHashMap<>();
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
private staticList> lookupAllEventTypes(Class eventClass) {
synchronized(eventTypesCache) {
List> eventTypes =eventTypesCache.get(eventClass);
if(eventTypes ==null) {
eventTypes =newArrayList<>();
Class clazz = eventClass;
while(clazz !=null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
returneventTypes;
}
}
通过反射查找订阅方法时,将所有中间对象封装成FindState.全局维护一个FindState的对象池。使用完对象之后通过recycle()擦除痕迹。
classSubscriberMethodFinder {
privateList findUsingInfo(Class subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while(findState.clazz!=null) {
findState.subscriberInfo= getSubscriberInfo(findState);
if(findState.subscriberInfo!=null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for(SubscriberMethod subscriberMethod : array) {
if(findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
}else{
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
returngetMethodsAndRelease(findState);
}
private static finalFindState[]FIND_STATE_POOL=newFindState[POOL_SIZE];
privateList getMethodsAndRelease(FindState findState) {
List subscriberMethods =newArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized(FIND_STATE_POOL) {
for(inti =0; i
if(FIND_STATE_POOL[i] ==null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
returnsubscriberMethods;
}
privateFindState prepareFindState() {
synchronized(FIND_STATE_POOL) {
for(inti = 0; i
FindState state =FIND_STATE_POOL[i];
if(state !=null) {
FIND_STATE_POOL[i] =null;
returnstate;
}
}
}
return newFindState();
}
static classFindState{
final List subscriberMethods = new ArrayList<>();
finalMapanyMethodByEventType=newHashMap<>();
finalMapsubscriberClassByMethodKey=newHashMap<>();
finalStringBuildermethodKeyBuilder=newStringBuilder(128);
ClasssubscriberClass;
Classclazz;
booleanskipSuperClasses;
SubscriberInfosubscriberInfo;
voidinitForSubscriber(Class subscriberClass) {
this.subscriberClass=clazz= subscriberClass;
skipSuperClasses=false;
subscriberInfo=null;
}
void recycle() {
subscriberMethods.clear();
anyMethodByEventType.clear();
subscriberClassByMethodKey.clear();
methodKeyBuilder.setLength(0);
subscriberClass = null;
clazz = null;
skipSuperClasses = false;
subscriberInfo = null;
}
}
回调方法队列PendPostQueue中的元素PendingPost本身就是对象池。构造器为私有,从而只能通过obtainPendingPost从对象池获取对象,对象池中有保存的对象则获取对象(ArrayList尾部获取),没有就通过new创建。releasePendingPost则将使用后的对象归还给对象池,归还的时候要将对象的使用痕迹擦除,同时要限制对象池大小为10000,防止对象池无限增大。
final classPendingPost {
private final static ListpendingPostPool= new ArrayList();
Objectevent;
Subscriptionsubscription;
PendingPostnext;
privatePendingPost(Object event, Subscription subscription) {
this.event= event;
this.subscription= subscription;
}
staticPendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized(pendingPostPool) {
intsize =pendingPostPool.size();
if(size >0) {
PendingPost pendingPost =pendingPostPool.remove(size -1);
pendingPost.event= event;
pendingPost.subscription= subscription;
pendingPost.next=null;
returnpendingPost;
}
}
return newPendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
总结一句话:通过编译时注解解决反射问题,缓存与对象池解决内存开销问题。
EventBus是通过可配置的门面来维护事件,订阅者与回调方法之间的关系。然后按照指定线程模型调度执行。的使用简洁,线程安全,高性能的消息总线。最重要的是,它带给大家一些编程上面的思考。