Android开发Android开发经验谈Android技术知识

有关EventBus的发送异步消息时的线程切换

2019-12-19  本文已影响0人  zl_adams

概述: EventBus用于不同的Activity之间或者Activity与Service之间进行通信,非常的方便,即使是不同线程之间的数据发送,我们定义的数据接收方法也能收到。

简单使用

public class MainActivity extend Activity{

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        EventBus.getDefault().register(this)
    }


    @Override
    protected void onDestroy() {
        super.onDestroy();
        EventBus.getDefault().unregister(this)
    }

    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onEvent(Event event){//自定义Event实体类
        Log.i("onEvent", "接收到消息");
    }

}

//在其他地方直接发送Event对象
EventBus.getDefault().post(new Event());

只要我们页面内注册了好了EventBus,并且定义好了接收方法,不管我们从哪个线程发送消息都可以接收到。
onEvent()方法添加了@Subscribe注解;

Subscribe注解

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Subscribe {
    ThreadMode threadMode() default ThreadMode.POSTING;

    /**
     * If true, delivers the most recent sticky event (posted with
     * {@link EventBus#postSticky(Object)}) to this subscriber (if event available).
     */
    boolean sticky() default false;

    /** Subscriber priority to influence the order of event delivery.
     * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before
     * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of
     * delivery among subscribers with different {@link ThreadMode}s! */
    int priority() default 0;
}

ThreadMode

public enum ThreadMode {
    
    //哪个线程发送的消息,就在哪个线程回调接收方法;
    //接收方法不能过于耗时,会阻塞发送线程(可能为主线程)
    POSTING,

    //不管哪个线程发送,都在主线程回调接收方法;
    MAIN,

    //在主线程回调接收方法,消息被发送直接回放入队列,不一定会马上回调接收方法;
    MAIN_ORDERED,

    //在子线程回调接收方法,如果发送消息是在子线程,直接回调接收方法;
    BACKGROUND,

   //在一个独立的线程回调接收方法,不在主线程,也不在发送消息的线程;
    ASYNC
}

接下来查看post方法是如何执行的:

1、post(Object event)
public class EventBus {

    /** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();//往下看分析1和分析2
        List<Object> eventQueue = postingState.eventQueue;//获取消息队列
        eventQueue.add(event);//把事件添加到队列中

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();//记录是否在主线程
            postingState.isPosting = true;//设置为正在发送中的状态
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //循环发送消息队里中的消息
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

    //分析1
    //ThreadLocal 是一个线程内部的数据存储类,存储线程私有数据;
    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };


    //分析2
    //记录发送线程的信息,如发送的消息队里、是否主线程、是否正在发送,是否取消发送等
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

}

分析:

  1. post方法中,先从线程中获取PostingThreadState数据,PostingThreadState用于保存消息队列,发送状态,是否为主线程;
  2. 把消息添加到线程所在的消息队列中;
  3. 如果现在还没开始发送消息,循环发送消息队列中的消息,设置发送状态为true;
  4. 调用postSingleEvent(eventQueue.remove(0), postingState)发送单个消息;
2、postSingleEvent(eventQueue.remove(0), postingState)
    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();//获取消息实体类对应的Class对象
        boolean subscriptionFound = false;
        if (eventInheritance) {//默认为true, 判断是否要查找消息实体类的父类或实现的接口类
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);//分析3,获取消息实体类的父类或者接口类
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {//遍历所有的event类对象
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

    //分析3
    /** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */
    private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
        synchronized (eventTypesCache) {
            List<Class<?>> eventTypes = eventTypesCache.get(eventClass);//获取缓存
            if (eventTypes == null) {//如果缓存为空
                eventTypes = new ArrayList<>();
                Class<?> clazz = eventClass;
                while (clazz != null) {
                    eventTypes.add(clazz);
                    addInterfaces(eventTypes, clazz.getInterfaces());//添加class的接口类
                    clazz = clazz.getSuperclass();//获取class的父类
                }
                eventTypesCache.put(eventClass, eventTypes);//添加进集合中
            }
            return eventTypes;
        }
    }

分析:

  1. 获取消息实体类的类对象,判断是否需要获取这个类对象的父类和接口类的类对象;
  2. 获取这个类对象的所有父类和接口类的类对象, 然后遍历这些类对象,调用postSingleEventForEventType(event, postingState, clazz);
  3. 不需要获取消息类对象的父类和接口,直接调用postSingleEventForEventType(event, postingState, eventClass);
3、postSingleEventForEventType(event, postingState, clazz)

    //subscriptionsByEventType记录了消息实体类的类对象,与消息订阅类和消息订阅方法的映射关系
    //subscriptionsByEventType在EventBus初始化时被赋值为HashMap;
    private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;


    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;//Subscription记录了消息订阅类和订阅方法
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);//获取消息的所有订阅者信息
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {//遍历所有的消息订阅者
                postingState.event = event;//配置发送线程配置的消息时间
                postingState.subscription = subscription;//配置发送线程配置的消息订阅者
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

//消息订阅者
final class Subscription {
    final Object subscriber;//消息订阅类
    final SubscriberMethod subscriberMethod;//消息订阅方法
}

分析:
1.首先获取消息实体类对应的消息订阅者集合;这个集合是在我们调用EventBus的注册方法时,遍历注册方法所在类的所有方法,获取添加了@Subscribe的方法所有方法,再获取这些方法的参数,然后以参数的类对象为key,Subscription的集合为value,保存消息参数和消息订阅者的映射关系,消息和订阅类以及订阅方法的映射关系为1:N:N;有兴趣的童鞋可以自己查看register()方法的相关代码;

  1. 获取到消息订阅者的集合后,遍历这个集合,调用postToSubscription(subscription, event, postingState.isMainThread);
3、postToSubscription(subscription, event, postingState.isMainThread)
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {//判断消息订阅方法定义了要在回调线程模式
            case POSTING:
                invokeSubscriber(subscription, event);//直接在发送消息的线程回调
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);//发送消息是主线程,直接回调
                } else {
                    mainThreadPoster.enqueue(subscription, event);//保存到主线程handler的消息队列
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    //保存到主线程handler的消息队列,然后发送handler消息
                    mainThreadPoster.enqueue(subscription, event);//分析4
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    //backgroundPoster是一个Runnable, 把消息保存到子线程的消息队列,然后调用线程池执行runnable
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                 //asyncPoster是一个Runnable, 把消息保存到子线程的消息队列,然后调用线程池执行runnable
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

    //分析4, mainThreadPoster变量的初始化
    private final Poster mainThreadPoster;

     //消息发送接口,
    interface Poster {
        void enqueue(Subscription subscription, Object event);//消息入队方法
    }

    EventBus(EventBusBuilder builder) {
        mainThreadSupport = builder.getMainThreadSupport();
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
    }

   
//mainThreadPoster靠MainThreadSupport来初始化
public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {

        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            //返回HandlerPoster,赋值给mainThreadPoster, looper为主线程的looper
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}

//mainThreadPoster实际上就是一个HandlerPoster,是一个Handler
public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);//消息入队
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {//然后发送空消息通知handler处理队列中的消息
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }


    //主线程处理消息
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);//分析5
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

   //分析5
  void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);通过反射,调用订阅方法
        }
    }

分析:

总结:
1、调用EventBus发送消息后,会判断发送消息所在的线程是否为主线程;
2、获取消息的类对象,然后查找这个消息对应的所有订阅者;
3、调用消息的订阅方法时,会判断订阅方法设置的线程模式;如果需要在主线程回调,发送消息时不在主线程,那么通过主线程的handler来回调订阅方法;如果需要在子线程回调订阅方法,发送消息时是在主线程,那么通过继承了Runnable的类来调用订阅方法,通过线程池启动这个Runnable;这样就能实现子线程发送,切换成主线程回调订阅方法,以及主线程发送,子线程回调订阅方法。

上一篇 下一篇

猜你喜欢

热点阅读