Rxjava2实现Rxbus
2016-11-09 本文已影响2603人
Lightofrain
Rxjava2在Api方面有不少变化,基于Rxjava1.x的Rxbus实现不太适用,网上关于2.0的资料很少,既然没有现成的,那就自己写一个
1.API变化
1.x中是使用PublishSubject来现实管理消息的,并使用SerializedSubject保证现成安全,在2.0中,
Subject被Processer代替,通过PublishProcesser管理消息,并通过toSerialized()方法保证现成安全
public class RxBus {
//相当于Rxjava1.x中的Subject
private final FlowableProcessor<Object> mBus;
private static volatile RxBus sRxBus = null;
private RxBus() {
//调用toSerialized()方法,保证线程安全
mBus = PublishProcessor.create().toSerialized();
}
public static synchronized RxBus getDefault() {
if (sRxBus == null) {
synchronized (RxBus.class) {
if (sRxBus == null) {
sRxBus = new RxBus();
}
}
}
return sRxBus;
}
/**
* 发送消息
* @param o
*/
public void post(Object o) {
new SerializedSubscriber<>(mBus).onNext(o);
}
/**
* 确定接收消息的类型
* @param aClass
* @param <T>
* @return
*/
public <T> Flowable<T> toFlowable(Class<T> aClass) {
return mBus.ofType(aClass);
}
/**
* 判断是否有订阅者
* @return
*/
public boolean hasSubscribers() {
return mBus.hasSubscribers();
}
}
2.简单封装
public class RxBusHelper {
/**
* 发布消息
*
* @param o
*/
public static void post(Object o) {
RxBus.getDefault().post(o);
}
/**
* 接收消息,并在主线程处理
*
* @param aClass
* @param disposables 用于存放消息
* @param listener
* @param <T>
*/
public static <T> void doOnMainThread(Class<T> aClass, CompositeDisposable disposables, OnEventListener<T> listener) {
disposables.add(RxBus.getDefault().toFlowable(aClass).observeOn(AndroidSchedulers.mainThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS))));
}
public static <T> void doOnMainThread(Class<T> aClass, OnEventListener<T> listener) {
RxBus.getDefault().toFlowable(aClass).observeOn(AndroidSchedulers.mainThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS)));
}
/**
* 接收消息,并在子线程处理
*
* @param aClass
* @param disposables
* @param listener
* @param <T>
*/
public static <T> void doOnChildThread(Class<T> aClass, CompositeDisposable disposables, OnEventListener<T> listener) {
disposables.add(RxBus.getDefault().toFlowable(aClass).subscribeOn(Schedulers.newThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS))));
}
public static <T> void doOnChildThread(Class<T> aClass, OnEventListener<T> listener) {
RxBus.getDefault().toFlowable(aClass).subscribeOn(Schedulers.newThread()).subscribe(listener::onEvent, throwable -> listener.onError(new ErrorBean(ErrorCode.ERROR_CODE_RXBUS, ErrorCode.ERROR_DESC_RXBUS)));
}
public interface OnEventListener<T> {
void onEvent(T t);
void onError(ErrorBean errorBean);
}
}