EventBus无法连续发送粘性事件问题改造日记

2019-08-26  本文已影响0人  SeptemberWei

在EventBus中可以使用粘性事件功能在目标注册之前进行发送消息,这样在目标注册后可以接受之前发送的消息,但是存在一个问题,那就是如果之前连续发送好多消息,那只能接收到最后一个粘性事件的消息。
现在项目遇到一个问题就是需要在新的页面打开之前就发送许多消息出去,在新打开的页面需要连续接受,因此这里做了个简易版本的改造来完成需求。具体代码如下所示,其中涉及到的相关类在之前的文章 《EventBus撸代码》 https://www.jianshu.com/p/4ba245331cd7 中有提及,在此记录下笔记。

/**
 * @param <T>参数
 * @param <C>注册类s
 * @author wilson
 * @since 2019年8月26日10:33:06
 */
public class MessageQueen<T, C> {
    private ConcurrentLinkedQueue<T> linkedQueue;
    private Handler handler;
    private ExecutorService cacheExecutorService;
    private HashMap<C, List<SubscribleMethod>> methodCache;
    private boolean isRegist;

    private MessageQueen() {
        methodCache = new HashMap<>();
        linkedQueue = new ConcurrentLinkedQueue<>();
        handler = new Handler();
        cacheExecutorService = Executors.newCachedThreadPool();
    }

    private final static class Create {
        private static final MessageQueen instance = new MessageQueen();
    }

    public static MessageQueen getDefault() {
        return MessageQueen.Create.instance;
    }


    public void post(T bean) {
        if (isRegist) {
            postPriv(bean);
        } else {
            if (linkedQueue != null) {
                linkedQueue.offer(bean);
            }
        }
    }

    public void regist(C clz) {
        synchronized (MessageQueen.class) {
            isRegist = true;
            List<SubscribleMethod> list = methodCache.get(clz);
            if (list == null) {
                list = findSubscribleMethods(clz);
                methodCache.put(clz, list);
            }
        }

        List<SubscribleMethod> subscribleMethods = findSubscribleMethods(clz);
        if (subscribleMethods != null && linkedQueue.size() > 0) {
            while (linkedQueue.size() > 0) {
                for (SubscribleMethod method : subscribleMethods) {
                    T poll = linkedQueue.poll();
                    if (poll != null) {
                        postThread(clz, method, (T) poll);
                    }
                }
            }
        }
    }

    private void postThread(C clz, SubscribleMethod method, T poll) {
        if (method.getParam().isAssignableFrom(poll.getClass())) {
            switch (method.getmThreadModel()) {
                case MAIN:
                    if (Looper.myLooper() == Looper.getMainLooper()) {
                        invoke(method, clz, poll);
                    } else {
                        handler.post(() -> invoke(method, clz, poll));
                    }
                    break;
                case BACKGROUND:
                    cacheExecutorService.execute(() -> invoke(method, clz, poll));
                    break;
            }
        }
    }


    public void unregist() {
        linkedQueue.clear();
    }

    private void invoke(SubscribleMethod subscribleMethod, C obj, T param) {
        Method method1 = subscribleMethod.getmMethod();
        try {
            method1.invoke(obj, param);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }

    private List<SubscribleMethod> findSubscribleMethods(Object object) {
        List<SubscribleMethod> list = new ArrayList<>();
        Class<?> claz = object.getClass();
        Method[] declaredMethods = claz.getDeclaredMethods();
        while (claz != null) {
            String name = claz.getName();
            if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {
                break;
            }
            for (Method method : declaredMethods) {
                Subscribe annotation = method.getAnnotation(Subscribe.class);

                if (annotation != null) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length != 1) {
                        Log.e("wilson", "只能处理一个参数");
                    }
                    ThreadModel threadModel = annotation.threadMode();

                    SubscribleMethod subscribleMethod = new SubscribleMethod(method, threadModel, parameterTypes[0]);

                    list.add(subscribleMethod);
                }
            }
            claz = claz.getSuperclass();
        }

        return list;
    }

    private void postPriv(final T param) {
        Set<C> set = methodCache.keySet();
        Iterator<C> iterator = set.iterator();
        while (iterator.hasNext()) {
            final C obj = iterator.next();
            List<SubscribleMethod> list = methodCache.get(obj);
            for (final SubscribleMethod method : list) {
                postThread(obj, method, param);
            }
        }
    }

}

上一篇 下一篇

猜你喜欢

热点阅读