浅谈Rxbus封装(一)

2017-07-26  本文已影响0人  墨染书

最近再看一个项目,但是那个项目里面的Rxjava是1.x版本的,由于最近又有一个项目要开始了,在封装各种基类,所以我准备将项目中的Rxbus用Rxjava2.x来修改一下继续用,由于2.x的Rxjava进行了代码的重构,所以在这里写下我的一些收集与写法

什么是Rxbus


先说说Rxjava

官方解释:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava是Reactive Extensions的Java VM实现:用于通过使用observable序列来组合异步和基于事件的程序的库。

关于Rxjava的使用和理解可以参看:https://github.com/ReactiveX/RxJava

那我们的Rxbus呢?

RxBus:处理应用程序间各个组件的通信,或者组件与组建之间的数据传递。

其实说到底,RxBus学的是一种思路,而并不是给你一个现成的库,然后直接去调用。需要的是自己来封装。

Rxbus的简单实现


导包

compile 'io.reactivex.rxjava2:rxjava:2.1.2'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

Rxbus类

public class RxBus {

  // 主题
  private final Subject<Object> bus;

  // PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
private RxBus() {
    bus = PublishSubject.create().toSerialized();
}

// 静态内部类(单例模式的内部类实现方法)
private static class SingletonHolder {
    public final static RxBus sInstance = new RxBus();
}

// 单例RxBus
public static RxBus getDefault() {
    return SingletonHolder.sInstance;
}


/**
 * 提供了一个新的事件
 * 发布
 * @param o
 */
public void post(Object o) {
    bus.onNext(o);
}

// 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
public <T> Observable<T> toObserverable(Class<T> eventType) {
    return bus.ofType(eventType);       //判断接收事件类型
}
}

不侮辱大家智商,配上注释应该大家都懂了,就不解释了

Rxjava1.x -> 2.x问题


如果大家有兴趣的话,最好还是阅读源码。

有两篇官方文档值得阅读:
what's different in 2.0:https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0
文档:http://reactivex.io/RxJava/2.x/javadoc/
参考文档:http://www.jianshu.com/p/2badfbb3a33b

 1.  修改EventThread.class,删除Schedulers.immediate()相关
     因为在2.0中删除了Schedulers.immediate()这个线程的切换

2. //存放订阅者信息
  private Map<Object, CompositeSubscription> subscriptions = new HashMap<>();
  修改为:
  private Map<Object, CompositeDisposable> subscriptions = new HashMap<>();

 3. CompositeSubscription.unsubscribe();
    修改为
    CompositeDisposable.dispose();

 4. 在2.0中增加了Flowable 这样就把 backpressure 的问题放到了Flowable中来处理,
    而Observable 不对backpressure进行处理了。但是使用Flowable还是要注意对backpressure
    的处理,不然还是会出现以前的问题。

5. Subscription subscription = toObservable(sub.tag(), cla);
   修改为:
   Disposable disposable = tObservable(sub.tag(), cla);

6. SerializedSubject 已经变为非public类,可以通过
   bus = PublishSubject.create().toSerialized();
   的方式获取线程安全 的对象。

7. private final PublishSubject<Object> bus = PublishSubject.create();
   final Subject<Object> subject = bus.toSerialized();
   修改为:
   bus = PublishSubject.create().toSerialized();
   toSerialized() 方法中其实就是之前序列化安全对象的写法。而SerializedSubject类已经变成了非public的

 8.//1.X public class AppBaseActivity extends AppCompatActivity {
   ...  
   private CompositeSubscription mCompositeSubscription; 
   protected void addSubscription(Subscription subscription) {
        if (null == mCompositeSubscription) {
                    mCompositeSubscription = new CompositeSubscription();                 
         } 
        mCompositeSubscription.add(subscription);
   }
   @Override 
   protected void onDestroy() { 
      if (null != mCompositeSubscription) { 
           mCompositeSubscription.unsubscribe();
       }
  super.onDestroy();
  }
  ... 
} 
 //2.X
 public class AppBaseActivity extends AppCompatActivity { 
 ... 
   private CompositeDisposable mCompositeDisposable; 
   protected void addDisposable(Disposable disposable) {
        if (null == mCompositeDisposable) { 
                 mCompositeDisposable = new CompositeDisposable(); 
        } 
        mCompositeDisposable.add(disposable);
     }  
   @Override 
   protected void onDestroy() { 
          if (null != mCompositeDisposable) { 
                 mCompositeDisposable.clear();
           }  
     super.onDestroy();
     } 
... 
}
上一篇下一篇

猜你喜欢

热点阅读