关于RxBus的Stick特性探究

2017-06-02  本文已影响37人  boboyuwu

使用RxBus有一段时间了,记得刚使用的使用我在ActivityB中去注册RxBus,这个时候ActivityB还没有启动,然后我在ActivityA中post事件后启动ActivityB,然而ActivityB并没有接收到任何事件,当时还郁闷了很久。

ActivityB接收不到事件的主要原因在于我们的ActivityB create之前发送了事件这个时候由于我们ActivityB并没有创建,所以也没有进行事件注册,当然收不到任何事件了,我当时的做法是调用handler.postDelay(500)进行延迟一下发送事件,这个时间是我测试ActivityB完全创建后的时间,这个时候由于ActivityB已经创建并且注册了事件,ActivityA发送的事件也可以被接收了,但是这种做法并不优雅靠谱,因为Activity创建时间多少完全不可预料,导致我们没办法去估计一个准确的延迟事件,并且这种做法实在不优雅。。。。
所以我一直在想难道事件只能在Activity创建后才能发送吗?可以不可在它创建前就发送,创建成功后接受到呢?

我们看下面实现代码

/**
 * Created by wubo on 2017/6/2.
 */

public class RxBus<T> {


    private static RxBus mRxBus;


    private PublishProcessor<T> mPublishProcessor;
    private ConcurrentHashMap<Class, T> mConcurrentHashMap;

    private RxBus() {
        mPublishProcessor = PublishProcessor.create();
        mConcurrentHashMap = new ConcurrentHashMap();

    }

    public static RxBus getInstance() {
        if (mRxBus == null) {
            synchronized (RxBus.class) {
                if (mRxBus == null) {
                    mRxBus = new RxBus();
                }
            }
        }
        return mRxBus;
    }

    //register
    public Flowable<T> registerEvent(Class<T> clazz) {
        Flowable <T>tFlowable = mPublishProcessor.ofType(clazz);
        return tFlowable;
    }

    public void post(T event) {
        mPublishProcessor.onNext(event);
    }

    public Flowable registerStickEvent(final Class<T> clazz) {

        final T t = mConcurrentHashMap.get(clazz);
        if (t != null) {
            Flowable<T> tFlowable = mPublishProcessor.ofType(clazz);
            return tFlowable.mergeWith(Flowable.create(new FlowableOnSubscribe<T>() {
                @Override
                public void subscribe(FlowableEmitter<T> e) throws Exception {
                        e.onNext(t);
                }
            }, BackpressureStrategy.BUFFER));
        }

        return mPublishProcessor;

    }

    public void postStick(T event) {
        T t = mConcurrentHashMap.get(event.getClass());
        if (t == null) {
            mConcurrentHashMap.put(event.getClass(), event);
        }
        post(event);
    }

    public void removeAllStickEvent(){
        mConcurrentHashMap.clear();
    }

}

我们可以这么写,为什么Stick事件注册的时候需要合并一个流呢,因为如果我们这里首先需要返回一个Flowable提供订阅的,那么如果我们直接返回mPublishProcessor的话我们怎么接收到Stick事件呢? 所以我们merge一个流当我们注册时就发送我们map中存入的stick事件。

这里还有个小问题,当我们订阅一个被观察者我们销毁的时候需要解除订阅,否则会继续持有引用对象,导致内存泄露.

Flowable<String> flowable = RxBus.getInstance().registerStickEvent(String.class);
        flowable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.e("wwwSecondActivity",s);
            }
        });

比如这段代码,我们在SecondActivity中使用RxBus注册一个Stick事件,然后在MainActivity中开开心心的调用

RxBus.getInstance().postStick("MainActivity");

发送一个Stick事件
好我们看看Log打印日志
点击按钮跳到第二个界面

06-03 16:15:06.051 22060-22060/com.example.boboyuwu.rxbusstick E/wwwSecondActivity: MainActivity

吆西貌似效果不错哦~~等等我们按返回键退出这个界面然后再进来试一试

06-03 16:16:46.714 22060-22060/com.example.boboyuwu.rxbusstick E/wwwSecondActivity: MainActivity
06-03 16:16:46.749 22060-22060/com.example.boboyuwu.rxbusstick E/wwwSecondActivity: MainActivity

这。。这怎么回事,我只发送一次,怎么收到二个事件抱着大大的❓我返回这个界面再进来一次看看,

06-03 16:18:00.767 22060-22060/com.example.boboyuwu.rxbusstick E/wwwSecondActivity: MainActivity
06-03 16:18:00.767 22060-22060/com.example.boboyuwu.rxbusstick E/wwwSecondActivity: MainActivity
06-03 16:18:00.814 22060-22060/com.example.boboyuwu.rxbusstick E/wwwSecondActivity: MainActivity

可以看到情况很不妙每次我们退出界面再进来的时候这个事件就会多接收一次一直累加,我们分析一下问题可能出现在哪。

在上一篇关于RxBus实现思考我们分析到当我们使用subject onNext()去发送事件时候我们会发送到我们所有用subject 订阅
的里面,而我们第一次接收一次说明我们订阅了一次,而退出来再进来接收到二次说明我们订阅了二次,可你会疑惑我界面都退出了啊,不错,Rx是不会主动帮你去解除你的订阅关系的,所以当你第一次订阅的时候虽然你退出了但是被订阅者引用还在RxBus的Subject里,每订阅并退出一次,这个引用会增加一个,所以当你发送事件的时候满足类型会接收之前所有订阅过的事件。

哪这多坑啊怎么办呢?没事rx可能也想到一点帮我提供一个CompositeDisposable类,它可以组装我们所有的订阅关系,然后统一进行解绑,我们可以创建一个方法

    public void addDispose(Disposable disposable){
        mCompositeDisposable.add(disposable);
    }

然后在onDesdory()中进行统一解绑

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if(mCompositeDisposable!=null&&!mCompositeDisposable.isDisposed()){
            mCompositeDisposable.dispose();
        }
        RxBus.getInstance().removeAllStickEvent();
    }

这样不管我们怎么退出界面都没有关系,我们当前注册事件的界面只会一对一的接收发送过来的事件。
isDisposed()是2.0新方法判断CompositeDisposable有没有解绑
如果已经解绑就会返回true如果没有的话就返回false
我们看看源码是不是这样

    @Override
    public void dispose() {
        if (disposed) {
            return;
        }
        OpenHashSet<Disposable> set;
        synchronized (this) {
            if (disposed) {
                return;
            }
            disposed = true;
            set = resources;
            resources = null;
        }

        dispose(set);
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }

isDisposed()返回disposed变量而当我们调用dispose()丢弃我们绑定关系后 disposed = true;会被赋值为true

    @Override
    public boolean add(Disposable d) {
        ObjectHelper.requireNonNull(d, "d is null");
        if (!disposed) {
            synchronized (this) {
                if (!disposed) {
                    OpenHashSet<Disposable> set = resources;
                    if (set == null) {
                        set = new OpenHashSet<Disposable>();
                        resources = set;
                    }
                    set.add(d);
                    return true;
                }
            }
        }
        d.dispose();
        return false;
    }

add方法也是如此如果disposed是true的话说明已经解绑就不添加到集合里,直接把参数解绑然后返回false说明添加失败.

上一篇 下一篇

猜你喜欢

热点阅读