Android收藏集

Rxjava 2.x

2019-03-26  本文已影响20人  Android_冯星

官方网站
github

在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据。

image.png
  1. 这是Observable的时间轴,从左到右。
  2. 这些是Obserable发出的事件
  3. 此垂直线表示Observable已成功发送处事件,事件发送完毕
  4. these dotted lines and this box indicate that a transformation is being applied to the Observable The text inside the box shows the nature of the transformation
    这些虚线和此框表示正在对Observable数据转换。
    框内的文本显示转换的性质
  5. this Observable is the result of the transformation
    对Observable转换的结果
  6. is for some reason the Observable terinates abnormally,with an error the vertical line is replaced by an x
    由于某种原因,Observable异常中断,垂直线x代替错误。

创建Observables

image.png

创建一个默认的Observable

image.png

直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable

Defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

Empty 创建一个不发射任何数据但是正常终止的Observable

never 创建一个不发射数据也不终止的Observable

Throw 创建一个不发射数据以一个错误终止的Observable

一对多的关系

将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
产生的Observable会发射Iterable或数组的每一项数据。

image.png

创建一个按固定时间间隔发射整数序列的Observable

image.png

创建一个发射指定值的Observable

Just类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。

注意:如果你传递null给Just,它会返回一个发射null值的Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用Empty操作符。

创建一个发射指定范围的整数序列的Observable

Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

RxJava将这个操作符实现为range函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置为负数,会抛异常)。

创建一个重复发射指定数据或数据序列的Observable

RxJava将这个操作符实现为repeat方法。它不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数。

image.png

创建一个Observable,它发出类似函数的指令的返回值

创建一个Observable,它在一个给定的延迟后发射一个特殊的值。

image.png

最基本使用

//创建 被观察者
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                Log.d(TAG, "subscribe: 1");

                emitter.onNext("2");
                Log.d(TAG, "subscribe: 2");

                emitter.onNext("3");
                Log.d(TAG, "subscribe: 3");

                emitter.onNext("4");
                Log.d(TAG, "subscribe: 4");

                emitter.onComplete();
            }
        }).subscribe(//订阅
                new Observer<String>() {//观察者

                    private Disposable disposable;


                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: " + d.toString());
                        this.disposable = d;
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: " + s);

                        Log.d(TAG, "onNext: disposed 前" + disposable.isDisposed());
                        if (s.equals("2")) {
                            disposable.dispose();
                        }
                        Log.d(TAG, "onNext: disposed 后" + disposable.isDisposed());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

在 Observer观察者 中,多了一个回调方法:onSubscribe,传递参数为Disposable,Disposable 相当于 RxJava 1.x 中的 Subscription, 用于解除订阅。当s等于2时,解除订阅,观察者不会在接收到被观察者发出的消息,也包裹 onComplete事件。

在请求的过程中Activity已经退出了, 这个时候如果回到主线程去更新UI, 那么APP肯定就崩溃了, 怎么办呢, 上一节我们说到了Disposable , 说它是个开关, 调用它的dispose()方法时就会切断水管, 使得下游收不到事件, 既然收不到事件, 那么也就不会再去更新UI了. 因此我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可.

那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.

上游可以发送无限个onNext, 下游也可以接收无限个onNext.
当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
上游可以不发送onComplete或onError.
最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

下面是log信息。

    onNext: 1
    onNext: disposed 前false
    onNext: disposed 后false
    subscribe: 1
    onNext: 2
    onNext: disposed 前false
    onNext: disposed 后true
    subscribe: 2
    subscribe: 3
    subscribe: 4

form just 输出一样

Observable<Integer> form = Observable.fromArray(items);

Observable<Integer> just = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> range = Observable.range(1, 10);

    onNext: 1
    onNext: 2
    onNext: 3
    onNext: 4
    onNext: 5
    onNext: 6
    onNext: 7
    onNext: 8
    onNext: 9
    onNext: 10
    onComplete: 

subscribe 观察者

    public final Disposable subscribe() {}
    // 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)

    public final Disposable subscribe(Consumer<? super T> onNext) {}
    // 表示观察者只对被观察者发送的Next事件作出响应
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    // 表示观察者只对被观察者发送的Next事件 & Error事件作出响应

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    // 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    // 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应

    public final void subscribe(Observer<? super T> observer) {}
    // 表示观察者对被观察者发送的任何事件都作出响应

调度器 Scheduler

简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程.
多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.

上一篇 下一篇

猜你喜欢

热点阅读