EventBus架构(包含Hermes跨进程架构)

2020-05-28  本文已影响0人  Coder_Sven

EventBus是一款在 Android 开发中使用的发布/订阅事件总线框架,基于观察者模式.

2418741155-5d5209ae7054f_articlex.png

Evenbus用法很简单,只需要四步就能够实现

dependencies {
    ...

    implementation 'org.greenrobot:eventbus:3.1.1'
}

1,注册

 EventBus.getDefault().register(this);

2,注解

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void receive(Friend friend){
        Toast.makeText(this, friend.toString(), Toast.LENGTH_SHORT).show();
    }

3,post(发布)

EventBus.getDefault().post(new Friend("Sven", 18));

4,反注册

 EventBus.getDefault().unregister(this);

EventBus使用就是这么简单四步,下面我们来看看源码实现
1,register

public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    if (subscriberMethod.sticky) {
        if (eventInheritance) {          
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

简单来说,register就是将注册的对象的类的方法中符合条件的方法的详细信息(即有标注Subscribe注解等)保存在一个缓存subscriptionsByEventType中。

2, @Subscribe (threadMode = ThreadMode.MAIN)(注解)

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;

    /**
     * If true, delivers the most recent sticky event (posted with
     * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
     */
    boolean sticky() default false;

    /** Subscriber priority to influence the order of event delivery.
     * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
     * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
     * delivery among subscribers with different {@link ThreadMode}s! */
    int priority() default 0;
}

注解用于Method上,主要作用是用这个来区别是否需要进行事件处理

ThreadMode:线程模式(用来指定线程)

POSTING:默认的模式,开销最小的模式,因为声明为POSTING的订阅者会在发布的同一个线程调用,发布者在主线程那么订阅者也就在主线程,反之亦,避免了线程切换,如果不确定是否有耗时操作,谨慎使用,因为可能是在主线程发布

MAIN:主线程调用,视发布线程不同处理不同,如果发布者在主线程那么直接调用(非阻塞式),如果发布者不在主线程那么阻塞式调用

MAIN_ORDERED:和MAIN差不多,主线程调用,和MAIN不同的是他保证了post是非阻塞式的(默认走MAIN的非主线程的逻辑,所以可以做到非阻塞)

BACKGROUND:在子线程调用,如果发布在子线程那么直接在发布线程调用,如果发布在主线程那么将开启一个子线程来调用,这个子线程是阻塞式的,按顺序交付所有事件,所以也不适合做耗时任务,因为多个事件共用这一个后台线程

ASYNC:在子线程调用,总是开启一个新的线程来调用,适用于做耗时任务,比如数据库操作,网络请求等,不适合做计算任务,会导致开启大量线程

3,post

public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);

    if (!postingState.isPosting) {
        postingState.isMainThread = isMainThread();
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

这个方法是将register注册在subscriptionsByEventType缓存集合中符合条件的方法进行反射调用,并通过上面的注解标注来找到用来处理事件的方法

4.unregister

从缓存中将注册的对象移除

public synchronized void unregister(Object subscriber) {
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        typesBySubscriber.remove(subscriber);
    } else {
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}

画图分析

eventbus_wps.jpg

根据源码分析我们可以自己手写一个EventBus的架构(当然是简化的版本,但是功能却完全相同)。

    //注册
    public void register(Object subscriber) {
        Class<?> aClass = subscriber.getClass();
        List<SubscribleMethod> subscribleMethods = cacheMap.get(subscriber);
        //如果已经注册,就不需要注册
        if (subscribleMethods == null) {
            subscribleMethods = getSubscribleMethods(subscriber);
            cacheMap.put(subscriber, subscribleMethods);
        }
    }

    //遍历能够接收事件的方法
    private List<SubscribleMethod> getSubscribleMethods(Object subscriber) {
        List<SubscribleMethod> list = new ArrayList<>();
        Class<?> aClass = subscriber.getClass();
        //需要 subscriber --- 》BaseActivity    ------->Activitiy  往父类一直找

        while (aClass != null) {
            //判断分类是在那个包下,(如果是系统的就不需要)
            String name = aClass.getName();
            if (name.startsWith("java.") ||
                    name.startsWith("javax.") ||
                    name.startsWith("android.") ||
                    name.startsWith("androidx.")) {
                break;
            }
            Method[] declaredMethods = aClass.getDeclaredMethods();
            //遍历类下面的所有方法,找到标有注解的能够接收事件的方法
            for (Method method : declaredMethods) {
                HGSubscribe annotation = method.getAnnotation(HGSubscribe.class);
                if (annotation == null) {
                    continue;
                }

                //检测这个方法合不合格
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length != 1) {
                    throw new RuntimeException("eventbus只能接收一个参数");
                }

                //符合要求
                HGThreadMode dnThreadMode = annotation.threadMode();
                SubscribleMethod subscribleMethod = new SubscribleMethod(method, dnThreadMode, parameterTypes[0]);
                list.add(subscribleMethod);
            }
            //自己找不到,就往父类找,一直找下去看能不能找到能够接收事件的方法
            aClass = aClass.getSuperclass();
        }
        return list;
    }
    
    //发布事件
    public void post(final Object obj) {
        Set<Object> set = cacheMap.keySet();
        Iterator<Object> iterator = set.iterator();
        while (iterator.hasNext()) {
            //拿到注册类
            final Object next = iterator.next();

            //获取类中所有添加注解的方法
            List<SubscribleMethod> list = cacheMap.get(next);
            for (final SubscribleMethod subscribleMethod : list) {
                //判断这个方法是否应该接收事件
                if (subscribleMethod.getEventType().isAssignableFrom(obj.getClass())) {
                    switch (subscribleMethod.getThreadMode()) {
                        case MAIN:
                            //如果接收方法在主线程执行的情况
                            if(Looper.myLooper() == Looper.getMainLooper()){
                                invoke(subscribleMethod, next, obj);
                            } else {
                                //post方法执行在子线程中,接收消息在主线程中
                                handler.post(new Runnable() {
                                    @Override
                                    public void run() {
                                        invoke(subscribleMethod, next, obj);
                                    }
                                });
                            }
                            break;
                        //接收方法在子线程种情况
                        case ASYNC:
                            //post方法执行在主线程中
                            if(Looper.myLooper() == Looper.getMainLooper()){
                                executorService.execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        invoke(subscribleMethod, next, obj);
                                    }
                                });
                            } else {
                                //post方法执行在子线程中
                                invoke(subscribleMethod, next, obj);
                            }
                            break;

                        case POSTING:
                            break;
                    }
                }
            }
        }
    }

    //反射调用接收事件的方法
    private void invoke(SubscribleMethod subscribleMethod, Object next, Object obj) {
        Method method = subscribleMethod.getMethod();
        try {
            method.invoke(next, obj);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }
    
    //取消注册
    public void unregister(Object subscriber) {
        Class<?> aClass = subscriber.getClass();
        List<SubscribleMethod> list = cacheMap.get(subscriber);
        //如果获取到
        if (list != null) {
            cacheMap.remove(subscriber);
        }
    }

项目地址https://github.com/games2sven/HG_EventBus

手写实现HermesEventBus架构

上面的项目只是一个简单的EventBus架构,假设我现在需要跨进程实现事件通信,那么可以加入饿了么跨进程通信框架Hermes。

我们知道,跨进程通信在android中需要用AIDL来实现。

第一步:定义AIDL文件

image-20200528163602766.png image-20200528163705440.png image-20200528163732140.png

第二步:开始正式编写代码

1,服务端注册

//初始化
Hermes.getDefault().init(this);
//注册  进程A中,被进程B调用的类需要事先注册  
Hermes.getDefault().register(HgUserManager.class);
//生成类的单例对象
HgUserManager.getInstance().setFriend(new Friend("Sven",18));
   public void init(Context context){
        this.mContext = context.getApplicationContext();
    }

    //----------服务端------A进程-----------
    public void register(Class<HgUserManager> clazz) {
        typeCenter.register(clazz);
    }

TypeCenter.class

public void register(Class<HgUserManager> clazz) {
    //注册---类 注册----方法
    registerClass(clazz);
    registerMethod(clazz);
}

    private void registerMethod(Class<HgUserManager> clazz) {
        Method[] methods = clazz.getMethods();
        for (Method method : methods) {
            mRawMethods.putIfAbsent(clazz,new ConcurrentHashMap<String, Method>());
            ConcurrentHashMap<String,Method> map = mRawMethods.get(clazz);
            String methodId = TypeUtils.getMethodId(method);
            map.put(methodId,method);
        }
    }

    private void registerClass(Class<HgUserManager> clazz) {
        String name = clazz.getName();
        mAnnotatedClasses.putIfAbsent(name,clazz);
    }
    

HgUserManager添加注解@ClassId,并实现IUserManager暴露getFriend和setFriend接口为之后B进程通过AIDL能够调用到

@ClassId("com.highgreat.sven.myapplication.manager.HgUserManager")
public class HgUserManager implements IUserManager{

    Friend friend;
    private static HgUserManager sInstance = null;
    private HgUserManager(){

    }

    //约定这个进程A  单例对象的  规则 getInstance()
    public static synchronized HgUserManager getInstance(){
        if(sInstance == null){
            sInstance = new HgUserManager();
        }
        return  sInstance;
    }


    @Override
    public Friend getFriend() {
        return friend;
    }

    @Override
    public void setFriend(Friend friend) {
        this.friend = friend;
    }
}

定义运行时注解ClassId

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)
public @interface ClassId {
    String value();
}

2,客户端连接

Hermes.getDefault().connect(this, HermesService.class);
    //----------客户端--------B进程-----------
    public void connect(Context context, Class< ? extends HermesService> hermesServiceClass) {
        connectApp(context,null,hermesServiceClass);
    }

    private void connectApp(Context context, String packageName, Class<? extends HermesService> hermesServiceClass) {
        init(context);
        serviceConnectionManager.bind(context.getApplicationContext(),packageName,hermesServiceClass);
    }

ServiceConnectionManager.class

public void bind(Context context, String packageName, Class<? extends  HermesService> hermesServiceClass){
    HermesServiceConnection hermesServiceConnection = new HermesServiceConnection(hermesServiceClass);
    mHermesServiceConnection.put(hermesServiceClass,hermesServiceConnection);
    Intent intent;
    if(TextUtils.isEmpty(packageName)){
        intent = new Intent(context,hermesServiceClass);
    }else{
        intent = new Intent();
        intent.setClassName(packageName,hermesServiceClass.getName());
    }
    
    // 开启AIDL服务
context.bindService(intent,hermesServiceConnection,Context.BIND_AUTO_CREATE);
}


    private class HermesServiceConnection implements ServiceConnection{
        private Class<? extends HermesService> mClass;

        HermesServiceConnection(Class<? extends HermesService> service){this.mClass = service;};
        @Override
        public void onServiceConnected(ComponentName name, IBinder service) {
            MyEventBusService myEventBusService = MyEventBusService.Stub.asInterface(service);
            //AIDL服务连接成功,保存起来
            mHermesServices.put(mClass,myEventBusService);
        }

        @Override
        public void onServiceDisconnected(ComponentName name) {
            //AIDL连接断开,移除缓存
            mHermesServices.remove(mClass);
        }
    }

HermesService.class

public class HermesService extends Service{
    @Override
    public IBinder onBind(Intent intent) {
        return mBinder;
    }

    //AIDL接口 通过Binder机制拿到服务端的对象
    private MyEventBusService.Stub mBinder = new MyEventBusService.Stub() {
        @Override
        public Responce send(Request request) throws RemoteException {
            //队请求参数进行处理  生成Responce结果返回
            ResponceMake responceMake = null;
            switch (request.getType()){
                case Hermes.TYPE_GET://获取单例
                    responceMake = new InstanceResponceMake();
                    break;
                case Hermes.TYPE_NEW:
                    responceMake = new ObjectResponceMake();
                    break;
            }
            return responceMake.makeResponce(request);
        }
    };
}

3,通信

//客户端得到服务端的单例对象
userManager = Hermes.getDefault().getInstance(IUserManager.class);
 Toast.makeText(this,"-----> "+userManager.getFriend().toString(), Toast.LENGTH_SHORT).show();

Hermes.class

//主要防止方法重载  单例对象  
public <T> T getInstance(Class<T> clazz,Object... parameters){
    Responce responce = sendRequest(HermesService.class,clazz,null,parameters);
    return getProxy(HermesService.class,clazz);
}

private <T> T getProxy(Class<? extends HermesService> service, Class clazz) {
    ClassLoader classLoader = service.getClassLoader();
    T proxy = (T) Proxy.newProxyInstance(classLoader,new Class<?>[]{clazz},new HermesInvocationHandler(service,clazz));
    return proxy;
}

    private <T> Responce sendRequest(Class<HermesService> hermesServiceClass,
                                 Class<T> clazz, Method method,Object[] parameters){
        RequestBean requestBean = new RequestBean();

        if(clazz.getAnnotation(ClassId.class) == null){
            requestBean.setClassName(clazz.getName());
            requestBean.setResultClassName(clazz.getName());
        }else{
            //返回类型的全类名
            requestBean.setClassName(clazz.getAnnotation(ClassId.class).value());
            requestBean.setResultClassName(clazz.getAnnotation(ClassId.class).value());
        }

        if(method != null){
            //方法名 统一传 方法名+参数名
            requestBean.setMethodName(TypeUtils.getMethodId(method));
        }

        RequestParameter[] requestParameters = null;
        if(parameters != null && parameters.length >0){
            requestParameters = new RequestParameter[parameters.length];
            for(int i = 0;i<parameters.length;i++){
                Object parameter = parameters[i];
                String parameterClassName = parameter.getClass().getName();
                String parameterValue = GSON.toJson(parameter);

                RequestParameter requestParameter = new RequestParameter(parameterClassName,parameterValue);
                requestParameters[i] = requestParameter;
            }
        }

        if(requestParameters != null){
            requestBean.setRequestParameters(requestParameters);
        }

        //请求获取单例代理对象---------》调用对象的方法  B进程开始请求A进程的通信
        Request request = new Request(GSON.toJson(requestBean),TYPE_GET);
        return serviceConnectionManager.request(hermesServiceClass,request);
    }

sendRequest是在获取单例对象的时候(即Hermes.getDefault().getInstance(IUserManager.class))会去调用AIDL实现跨进程通信。

ServiceConnectionManager.java

public Responce request(Class<HermesService> hermesServiceClass, Request request){
    //从缓存中获取binder代理对象,发送请求
    MyEventBusService eventBusService = mHermesServices.get(hermesServiceClass);
    if(eventBusService != null){
        try {
            //这里是B进程通过Binder代理机制发送数据给A进程,并收到回复
            //调用的是AIDL接口方法
            Responce responce = eventBusService.send(request);
            return responce;
        } catch (RemoteException e) {
            e.printStackTrace();
        }
    }
    return null;
}

HermesInvocationHandler.class

public class HermesInvocationHandler implements InvocationHandler {
    private static final String TAG = "Sven";
    private Class clazz;
    private Class hermeService;
    private static final Gson GSON = new Gson();

    public HermesInvocationHandler(Class<? extends HermesService> service,Class clazz){
        this.clazz = clazz;
        this.hermeService = service;
    }

    //当执行代理类的方法(比如getFriend())时会走到这里来
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Log.i(TAG, "invoke:-------> " + method.getName());
        Responce responce = Hermes.getDefault().sendObjectRequest(hermeService,clazz,method,args);
        if(!TextUtils.isEmpty(responce.getData())){
            ResponceBean responceBean = GSON.fromJson(responce.getData(),ResponceBean.class);
            if(responceBean.getData() != null){
                Object getUserResult = responceBean.getData();
                String data = GSON.toJson(getUserResult);

                Class stringgetUser = method.getReturnType();
                Object o = GSON.fromJson(data,stringgetUser);
                return o;
            }
        }

        return null;
    }
}

Hermes.java

public <T> Responce sendObjectRequest(Class<HermesService> hermesServiceClass
        , Class<T> clazz, Method method, Object[] parameters) {

    RequestBean requestBean = new RequestBean();
    String className = null;
    if(clazz.getAnnotation(ClassId.class) == null){
        requestBean.setClassName(clazz.getName());
        requestBean.setResultClassName(clazz.getName());
    }else{
        requestBean.setClassName(clazz.getAnnotation(ClassId.class).value());
        requestBean.setResultClassName(clazz.getAnnotation(ClassId.class).value());
    }

    if(method != null){
        requestBean.setMethodName(TypeUtils.getMethodId(method));
    }

    RequestParameter[] requestParameters = null;
    if (parameters != null && parameters.length > 0) {
        requestParameters = new RequestParameter[parameters.length];
        for (int i = 0; i < parameters.length; i++) {
            Object parameter = parameters[i];
            String parameterClassName = parameter.getClass().getName();
            String parameterValue = GSON.toJson(parameter);

            RequestParameter requestParameter = new RequestParameter(parameterClassName, parameterValue);
            requestParameters[i] = requestParameter;
        }
    }

    if (requestParameters != null) {
        requestBean.setRequestParameter(requestParameters);
    }

    //        请求获取单例 ----》对象 ----------》调用对象的方法
    Request request = new Request(GSON.toJson(requestBean),TYPE_NEW);
    return serviceConnectionManager.request(hermesServiceClass, request);
}

sendObjectRequest在代理对象调用方法时会进行通信,即userManager.getFriend()时会去走AIDL接口方法实现跨进程通信。

Request.java

package com.highgreat.sven.myapplication;

import android.os.Parcel;
import android.os.Parcelable;

public class Request implements Parcelable{

    //请求的对象 RequestBean对应的jason字符串
    private String data;
    //请求对象的类型
    private int type;

    //反序列化 A进程
    protected Request(Parcel in) {
        data = in.readString();
        type = in.readInt();
    }

    public Request(String data, int type) {
        this.data = data;
        this.type = type;
    }

    public String getData() {
        return data;
    }

    public int getType() {
        return type;
    }

    public static final Creator<Request> CREATOR = new Creator<Request>() {
        @Override
        public Request createFromParcel(Parcel in) {
            return new Request(in);
        }

        @Override
        public Request[] newArray(int size) {
            return new Request[size];
        }
    };

    @Override
    public int describeContents() {
        return 0;
    }

    //序列化
    @Override
    public void writeToParcel(Parcel dest, int flags) {
        dest.writeString(data);
        dest.writeInt(type);
    }
}

Responce.java

package com.highgreat.sven.myapplication;

import android.os.Parcel;
import android.os.Parcelable;

public class Responce implements Parcelable {
    private String data;

    public String getData() {
        return data;
    }

    protected Responce(Parcel in) {
        data = in.readString();
    }

    public Responce(String data) {
        this.data = data;
    }

    public static final Creator<Responce> CREATOR = new Creator<Responce>() {
        @Override
        public Responce createFromParcel(Parcel in) {
            return new Responce(in);
        }

        @Override
        public Responce[] newArray(int size) {
            return new Responce[size];
        }
    };

    @Override
    public int describeContents() {
        return 0;
    }

    @Override
    public void writeToParcel(Parcel dest, int flags) {
        dest.writeString(data);
    }
}

流程图:

hermes跨进程调用.png

代码地址https://github.com/games2sven/HermesEventBus

上一篇下一篇

猜你喜欢

热点阅读