Android CommunityRxJava系列专题(Android方向)Android开发

RxJava 2.x知识笔记

2017-12-01  本文已影响183人  正规程序员

观察者模式的运用

传统的Java观察者模式可以参考此篇博客:Java观察者模式案例简析

RxJava 是基于Java的观察者模式开展的。构建被观察者(Observable/Flowable)、观察者(Observer/Subscriber),并通过建立两者的订阅关系实现观察,在事件的传递过程中可以对事件进行各种处理。

在rxjava 1.x、rxjava 2.x里,Observable是被观察者,Observer是观察者,正常逻辑是观察者通过subscribe订阅Observable的事件处理,当Observable发射事件时Observer接收数据。但为了保持流式API风格,观察者订阅被观察者的代码顺序设计有一些调整。
如:

Observable.subscribe(Observer);

RxJava 1.x 和RxJava 2.x的主要区别

在RxJava 2.x中的观察者模式有两种。而Flowable作为被观察者是专门支持背压的。这也是RxJava 1.x 和RxJava 2.x的主要区别。当然还有一些区别是操作符、接口的不兼容更新。

RxJava1.x 平滑升级到RxJava2.x

由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.x,或者将RxJava2.x转回RxJava1.x。
RxJava2Interop

经典流式API调用风格

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        })
//                ...省略很多在发射过程中的流式处理代码
                .subscribe(new Observer<Integer>() {
                    private Disposable mDisposable;

                    @Override
                    public void onSubscribe(Disposable d) {
                        mDisposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d("onNext", "" + integer);
                        //新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件。
                        if (integer == 3) {
                            mDisposable.dispose();
                            Log.d("onNext", "已停止接收事件");
                        }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

打印结果:

11-28 13:00:42.195 29930-29930/? D/onNext: 1
11-28 13:00:42.195 29930-29930/? D/onNext: 2
11-28 13:00:42.195 29930-29930/? D/onNext: 3
11-28 13:00:42.195 29930-29930/? D/onNext: 已停止接收事件

Rxjava 线程调度

subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。

多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

Rxjava 2.x常用操作符

1,Function<T, R> ——将输入的value类型T转换成输出的value类型R。通常结合Map操作符。

/**
 * A functional interface that takes a value and returns another value, possibly with a
 * different type and allows throwing a checked exception.
 *
 * @param <T> the input value type
 * @param <R> the output value type
 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T t) throws Exception;
}

2,Map——将一个Observable被观察者通过特定函数的执行,转换成另一种Observable被观察者。
在 2.x 中和 1.x 中作用几乎一致,不同点在于:2.x 将 1.x 中的 Func1 和 Func2 改为了 Function 和 BiFunction。

  /**
     * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and emits the results of these function applications.
     * 
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

3,Consumer——接收一个单独的数据,类似于一个简化版的观察者observer。

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(@NonNull T t) throws Exception;
}

4,distinct——去重操作符。即先有的数字保留,重复的数字去除并保留原先顺序的方式输出。

  Observable.just(2, 1, 2, 3, 4, 2, 3)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("accept", "" + integer);
                    }
                });

输出

11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 2
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 1
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 3
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 4

5,concat—— 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止(onComplete) 后才会订阅下一个 Observable。比如可以采用 concat 操作符先读取缓存再通过网络请求获取数据。

案例说明:

  Observable observable = Observable.just(1, 2, 3, 4, 5, 6)
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return integer + 1;
                    }
                });
        Observable.concat(Observable.just(-1, -2, -3, -4, -5, -6), observable)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("accept", "" + integer);
                    }
                });

打印输出:看吧,两个Observable是按照顺序依次无交错执行的。

11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -1
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -2
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -3
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -4
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -5
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -6
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 2
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 3
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 4
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 5
11-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 6
11-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 7

注:熟悉操作符的目的在于,不同场景中都能随时想到有对应的工具可用。

背压

背压:指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
因为事件产生的速度远远快于事件消费的速度,最终导致数据积累越来越多,从而导致OOM等异常。这就是背压产生的必要性。

RxJava2.0中,Flowable是能够支持Backpressure的Observable,是对Observable的补充(而不是替代)。所以Observable被观察者支持的API,Flowable也都支持,并且Flowable的API里也都强制支持背压。

背压经典代码

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                if (e.requested() != 0) {
                    for (int i = 0; i < 10; i++) {
                        e.onNext(i + 1);
                        Log.d(TAG, "已发送" + (i + 1) + "个——剩下" + e.requested());
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                    }

                    @Override
                    public void onNext(Integer s) {
                        Log.d(TAG, "接收 ——" + s);
                        if (s == 9){
                            subscription.cancel();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "接收错误——" + t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

外部调用subscription请求配合配额11个。

  if (subscription != null) {
      subscription.request(11);
  }

在BackpressureStrategy.LATEST背压策略下,上游发射10个事件,下游由外部调用请求发布配额指令,当下游接收到第9个事件时暂停上游发布(此操作会清空上游事件源)。

背压策略

/**
 * Represents the options for applying backpressure to a source sequence.
 */
public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

Flowable案例代码

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
                e.onNext(4);
            }
        }, BackpressureStrategy.ERROR)
                //下面两行代码执行线程切换,达到异步效果
//                .subscribeOn(Schedulers.io())
//                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
//                        subscription.request(1);
                    }

                    @Override
                    public void onNext(Integer s) {
//                        subscription.request(1);
                        Log.d(TAG, "接收——" + s);
//                        subscription.cancel();
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "接收错误——" + t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

这里指定背压策略是BackpressureStrategy.ERROR,这种策略下执行此段代码会报如下错误。

D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

因为,上下游是同步的。上游发射了事件但是下游没有接收,就会造成阻塞(即便上游的事件队列长度只有3个 < 128)。为了避免ANR,就要提示MissingBackpressureException异常。

如果恢复第12、13行处理线程切换的代码,表示上下游位于不同线程,是异步状态。此种情形下,上游发射数据后就不会报MissingBackpressureException异常,但虽然上游能正常发射数据,下游同样接收不到数据。

这里涉及到一个知识点:

Flowable默认事件队列大小为128。BackpressureStrategy.BUFFER策略下事件队列无限大,和没有采取背压的Observable ( 被观察者 ) / Observer ( 观察者 )类似了。

注:在处理同一组数据时,Observable ( 被观察者 ) / Observer ( 观察者 )比BackpressureStrategy.BUFFER策略下的Flowable (被观察者)/ Subscriber (观察者)性能更优,内存消耗更少。

在上下游异步的情况下,上游会先把事件发送到长度为128的事件队列中,待下游发送请求数据指令后从事件队列中拉取数据。这种“响应式拉取”的思想用于解决上下游流速不均衡的情况。

上述代码中,第19、24行代码是表示下游接收前、接收后发送请求配额指令给上游。也可以通过subscription.request(n);在外围调用发送n个请求配额给上游以获取数据。

API自带的被观察者的背压策略

API内的其他被观察者,API也为我们提供了背压策略方法:

   Flowable.interval(1, TimeUnit.MICROSECONDS)
                .onBackpressureDrop()  //加上背压策略
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        subscription = s;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

如果不加背压策略,则会报错:

D/rxjava2: onSubscribe
W/rxjava2: onError: 
           io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
               at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
               at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
               at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)
               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
               at java.lang.Thread.run(Thread.java:761)

响应式编程

当上下游在同一个线程中的时候,在下游调用request(n)就会直接改变上游中的requested的值,多次调用便会叠加这个值,而上游每发送一个事件之后便会去减少这个值,当这个值减少至0的时候,上游若继续发送事件便会抛异常了。

案例情景一:同步环境下,在下游发出10个请求配额情况下,上游发射130个事件。

      Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                    for (int i = 0; i < 130; i++) {
                        e.onNext(i + 1);
                        Log.d(TAG, "requested—left—" + e.requested());
                    }
//                    e.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                //下面两行代码执行线程切换,达到异步效果
//                .subscribeOn(Schedulers.io())
//                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                        subscription.request(10);
                    }

                    @Override
                    public void onNext(Integer s) {
                        Log.d(TAG, "接收 ——" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "接收错误——" + t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

打印LOG日志:

D/rxjava2: 接收 ——1
D/rxjava2: requested—left—9
D/rxjava2: 接收 ——2
D/rxjava2: requested—left—8
D/rxjava2: 接收 ——3
D/rxjava2: requested—left—7
D/rxjava2: 接收 ——4
D/rxjava2: requested—left—6
D/rxjava2: 接收 ——5
D/rxjava2: requested—left—5
D/rxjava2: 接收 ——6
D/rxjava2: requested—left—4
D/rxjava2: 接收 ——7
D/rxjava2: requested—left—3
D/rxjava2: 接收 ——8
D/rxjava2: requested—left—2
D/rxjava2: 接收 ——9
D/rxjava2: requested—left—1
D/rxjava2: 接收 ——10
D/rxjava2: requested—left—0
D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
D/rxjava2: requested—left—0

下游不再发送请求配额时,上游的配额令牌就为0。此时上游还有事件强行发个的话,就会出现异常。这里可以验证上面说的结论。

案例情景二:异步环境,在下游发出10个请求配额情况下,上游发射128个事件。

只是多了切换线程的代码:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

打印LOG日志:

D/rxjava2: requested—left—127
D/rxjava2: requested—left—126
D/rxjava2: requested—left—125
...
D/rxjava2: requested—left—2
D/rxjava2: requested—left—1
D/rxjava2: requested—left—0
D/rxjava2: 接收 ——1
D/rxjava2: 接收 ——2
D/rxjava2: 接收 ——3
D/rxjava2: 接收 ——4
D/rxjava2: 接收 ——5
D/rxjava2: 接收 ——6
D/rxjava2: 接收 ——7
D/rxjava2: 接收 ——8
D/rxjava2: 接收 ——9
D/rxjava2: 接收 ——10

异步的情况下,上游先把事件序列内的事件发射完毕,下游才开始接收。如果上游的时间序列超过默认的128个,则上游事件发射到第129个就会报MissingBackpressureException异常,下游就接收不到事件了。这就涉及到下一个知识点了。

当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们下游调用request(1000)时,实际上改变的是下游线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。

何时自动触发呢?我们一起看下:

 Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "e.requested() == "+e.requested());
                    for (int i = 0; ; i++) {
                        boolean flag = false;
                        //这里做了一个循环,使发射循环处于保活状态,并在适当时机继续发射事件
                        while (e.requested() == 0){
                            if (!flag){
                                Log.d(TAG, "e.requested() == "+e.requested());
                                flag = true;
                            }
                        }
                        e.onNext(i + 1);
                        Log.d(TAG, "已发送" + (i + 1) + "个——剩下" + e.requested());
                    }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                    }

                    @Override
                    public void onNext(Integer s) {
                        Log.d(TAG, "接收 ——" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "接收错误——" + t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

外部手动调用request向上游请求发射配额

 if (subscription != null) {
     subscription.request(96);
     Log.d(TAG,"下游请求了96个");
  }

打印Log日志:

D/rxjava2: e.requested() == 128
D/rxjava2: 已发送1个——剩下127
D/rxjava2: 已发送2个——剩下126
...
D/rxjava2: 已发送127个——剩下1
D/rxjava2: 已发送128个——剩下0
D/rxjava2: e.requested() == 0
D/rxjava2: 下游请求了96个
D/rxjava2: 接收 ——1
D/rxjava2: 接收 ——2
...
D/rxjava2: 接收 ——95
D/rxjava2: 接收 ——96
D/rxjava2: 已发送129个——剩下95
...
D/rxjava2: 已发送223个——剩下1
D/rxjava2: 已发送224个——剩下0
D/rxjava2: e.requested() == 0

1,2,3,4行表示启动之后,上游自动想上游的事件序列缓存区发射128个事件。
8,9...13行表示下游手动请求96个发射配额时接收的事件。此时会自动触发上游继续发送事件,如14,15...17,18行,上游会自动再次发射96个事件(95-0+1=96)。

如果你下游请求95个发射配额的话,上游不会自动触发事件发射的(这个应该是底层设置的触发阀吧)。

因此得出结论:当下游每消费96个事件便会自动触发内部的request()去设置上游的requested的值。

在某一些场景下,可以在发送事件前先判断当前的requested的值是否大于0,若等于0则说明下游处理不过来了,则需要等待,

注意:是在onNext事件里,onComplete和onError事件不会消耗requested。

本章完~

上一篇下一篇

猜你喜欢

热点阅读