一、初识RxJava2

2018-02-05  本文已影响0人  含笑小基石

前言

来随手记实习之前并没有接触过RxJava这个强大的异步处理库,都使用Handler和AsyncTask进行异步任务的处理。但是实习期间跟随我的导师做的主题换肤需求里,基本所有的异步处理都使用了RxJava,并且都已升级到了RxJava2,所以直接从RxJava2.X系列开始对这个强大的异步处理库进行学习。


RxJava的基本工作原理

先上一张“水管”图!!!


RxJava

很多的RxJava文章以观察者模式来进行介绍,但其实有时候太多的专有名词反而会增加我们对一个知识的学习成本。

所以,我们从上图来解释什么是RxJava。

上面的事件流我们称为上游,即Observable

下面的事件流我们称为下游,即Observer

上游和下游的连接就靠subscribe()来建立

上游发出一个事件,下游就要接受并处理这个事件,而且处理的顺序是按上游发出的顺序执行。

实战演练:
讲完“大道理”,我们来“实战”一下:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("Hello");
            e.onNext("World");
            e.onComplete();
        }
    });
 
 
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "onSubscribe");
        }
 
        @Override
        public void onNext(String s) {
            Log.d(TAG, s);
        }
 
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "onError");
        }
 
        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    };
 
    observable.subscribe(observer); // 建立连接后才开始发送事件
}
这个的运行结果就是: 运行结果

把上面那段代码连起来写就是RxJava引以为傲的链式调用了:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello");
        e.onNext("World");
        e.onComplete();
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, s);
    }
 
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError");
    }
 
    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete");
    }
});

解释一下里面的两个名词:

ObservableEmitter:
用来发出事件的,它可以发出三种类型的事件,通过调用emitter的:

  1. onNext(T value)发出next事件、
  2. onComplete()发出complete事件
  3. onError(Throwable error)发出error事件。

事件发送的原则:

  1. 上游可以发送无限个onNext事件, 下游也可以接收无限个onNext事件.
  2. 当上游发送了一个onComplete事件后, 上游可以继续发送事件, 而下游收到onComplete事件之后将不再继续接收上游的事件.
  3. 当上游发送了一个onError事件后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收上游的事件.
  4. 上游可以不发送onComplete或onError事件.
    最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个5. 5. onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。
发送的关键事件 示意图(队列顺序发送)
只发送onNext事件 onNext
发送onComplete事件 onComplete
发送onError事件 onError

Disposable:
dispose()可以理解成上游和下游的连接断了,当调用dispose()后,下游就会接收不到上游发出的事件。


我们写个小Demo检验一下,上游依次发送1,2,3,4的事件,其中3的任务类型为onComplete()类型,所以下游应该接收不到事件4,可是我们在下游接收到事件1,之后就调用了dispose(),所以不管上游之后发了什么事件,下游都不会收到。
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        Log.d(TAG, "1 send");
        e.onNext("1");
         
        Log.d(TAG, "2 send");
        e.onNext("2");
         
        Log.d(TAG, "3 send");
        e.onComplete();
         
        Log.d(TAG, "4 send");
        e.onNext("4");
    }
}).subscribe(new Observer<String>() {
 
    private Disposable disposable;
 
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe");
        disposable = d;
    }
 
    @Override
    public void onNext(String s) {
        Log.d(TAG, s + " received");
        if (s.equals("1")) {
            disposable.dispose();
        }
    }
 
    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError received");
    }
 
    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete received");
    }
});
让我们来看看执行结果是否符合预期: 输出结果

很明显,在调用了下游在接收到事件1就切断了上游和下游的联系,上游是发送了事件1,2,3,4,可下游只收到了事件1。

其实subscribe()有很多个重载方法:

public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

我们之前的用例,全都是用了最后一个方法。
可是有时候我们只关心上游发的onNext事件,不理会其他类型的事件,所以我们可以使用上述第二个重载方法:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        Log.d(TAG, "1 send");
        e.onNext("1");
 
        Log.d(TAG, "2 send");
        e.onNext("2");
 
        Log.d(TAG, "3 send");
        e.onComplete();
 
        Log.d(TAG, "4 send");
        e.onNext("4");
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, s + " received");
    }
});
执行结果如下: 输出结果

我们可以看到,下游收到了上游在发出onComplete事件之前的onNext事件。
对于其他几个重载的方法其实大同小异,大家可以自己去动手试试效果。



参考文档:

https://www.jianshu.com/p/464fa025229e

上一篇下一篇

猜你喜欢

热点阅读