Vector

Android RxBus的使用

2017-06-08  本文已影响557人  Angelicas

RxBus的核心功能是基于Rxjava的,在RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,因此Subject可以同时担当订阅者和被订阅者的角色,这里我们使用Subject的子类PublishSubject来创建一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件立刻发送给订阅者),在需要接收事件的地方,订阅该Subject对象,之后如果Subject对象接收到事件,则会发射给该订阅者,此时Subject对象充当被订阅者的角色。完成了订阅,在需要发送事件的地方将事件发送给之前被订阅的Subject对象,则此时Subject对象做为订阅者接收事件,然后会立刻将事件转发给订阅该Subject对象的订阅者,以便订阅者处理相应事件,到这里就完成了事件的发送与处理。最后就是取消订阅的操作了,Rxjava中,订阅操作会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Subscription对象,我们可以用一个CompositeSubscription存储起来,以进行批量的取消订阅。

首先添加类库

    // RxAndroid
    compile 'io.reactivex:rxandroid:1.2.1'
    // RxJava
    compile 'io.reactivex:rxjava:1.2.4'

1、接下来结合实现代码再做进一步的解释:

package com.example.rxbus;

import java.util.HashMap;

import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subscriptions.CompositeSubscription;

/**
 * @author: lijuan
 * @description:
 * @date: 2017-06-07
 * @time: 17:24
 */
public class RxBus {
    private static volatile RxBus mInstance;
    private SerializedSubject<Object, Object> mSubject;
    private HashMap<String, CompositeSubscription> mSubscriptionMap;

    private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        if (mInstance == null) {
            synchronized (RxBus.class) {
                if (mInstance == null) {
                    mInstance = new RxBus();
                }
            }
        }
        return mInstance;
    }

    /**
     * 发送事件
     *
     * @param o
     */
    public void post(Object o) {
        mSubject.onNext(o);
    }

    /**
     * 返回指定类型的Observable实例
     *
     * @param type
     * @param <T>
     * @return
     */
    public <T> Observable<T> tObservable(final Class<T> type) {
        return mSubject.ofType(type);
    }

    /**
     * 是否已有观察者订阅
     *
     * @return
     */
    public boolean hasObservers() {
        return mSubject.hasObservers();
    }

    /**
     * 一个默认的订阅方法
     *
     * @param type
     * @param next
     * @param error
     * @param <T>
     * @return
     */
    public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
        return tObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next, error);
    }

    /**
     * 保存订阅后的subscription
     *
     * @param o
     * @param subscription
     */
    public void addSubscription(Object o, Subscription subscription) {
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        String key = o.getClass().getName();
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(subscription);
        } else {
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            compositeSubscription.add(subscription);
            mSubscriptionMap.put(key, compositeSubscription);
        }
    }

    /**
     * 取消订阅
     *
     * @param o
     */
    public void unSubscribe(Object o) {
        if (mSubscriptionMap == null) {
            return;
        }

        String key = o.getClass().getName();
        if (!mSubscriptionMap.containsKey(key)) {
            return;
        }
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).unsubscribe();
        }

        mSubscriptionMap.remove(key);
    }
}

先看一下这个私有的构造函数:

private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

由于Subject类是非线程安全的,所以我们通过它的子类SerializedSubject将PublishSubject转换成一个线程安全的Subject对象。之后可通过单例方法getInstance()进行RxBus的初始化。

在toObservable()根据事件类型,通过mSubject.ofType(type);得到一个Observable对象,让其它订阅者来订阅。其实ofType()方法,会过滤掉不符合条件的事件类型,然后将满足条件的事件类型通过cast()方法,转换成对应类型的Observable对象,这点可通过源码查看。
同时封装了一个简单的订阅方法doSubscribe(),只需要传入事件类型,相应的回调即可。其实可以根据需求在RxBus中扩展满足自己需求的doSubscribe()方法,来简化使用时的代码逻辑。

在需要发送事件的地方调用post()方法,它间接的通过mSubject.onNext(o);将事件发送给订阅者。

同时RxBus提供了addSubscription()、unSubscribe()方法,分别来保存订阅时返回的Subscription对象,以及取消订阅。

接下我们在具体的场景中测试一下:
1、我们在Activity的onCreate()方法中进行进行订阅操作:

public static final int SEND = 0x131;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        doSubscribe();
    }

    /**
     * 订阅事件监听
     */
    public void doSubscribe() {
        Subscription subscription = RxBus.getInstance()
                .tObservable(NewsModel.class)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<NewsModel>() {
                    @Override
                    public void call(NewsModel model) {
                        switch (model.getStatus()) {
                            case SEND:
                                Log.e("rxbus", model.getMsg());
                                break;
                            default:
                                break;
                        }
                    }
                });
        RxBus.getInstance().addSubscription(this, subscription);
    }

可以看到我们设定事件类型为NewsModel实体类,当然我们可以设定事件类型为为String或者Integer,并且Subscriber的回调发生在主线程,同时保存了Subscription对象。

2、现在通过一个Button发送事件:

RxBus.getInstance().post(new NewsModel(SEND, "发送一条信息"));

3、最后不要忘了在onDestory()中对广播进行取消注册,以及取消订阅

 protected void onDestroy() {
        super.onDestroy();
        RxBus.getInstance().unSubscribe(this);
    }

其它场景有兴趣的可自行测试!本篇文章就这样子啦,存在总结不到位的地方还望指导,感谢_
参考资料:Android 用RxJava模拟一个EventBus ———RxBus

源码下载

上一篇下一篇

猜你喜欢

热点阅读