RxJava

RxJava 的初理解和基本使用

2020-09-24  本文已影响0人  tingtingtina

观察者设计模式

提到RxJava 有点了解的就知道这个框架是基于观察者模式的,先来温习下观察者模式。

观察者模式 UML

被观察者(Observable)持有对观察者(Observer) 的引用,当被观察者发生某些变化,调用 观察者的 API 就可以了。

RxJava 正是基于观察者模式的响应式编程框架。

基于事件流编程

啥是响应式?其实就是事件驱动,事件一发生,就会被相应接收处理。就这么简单,一张图就能表达。

有一个起点和一个终点,从起点发送事件,终点接收处理事件。他们之间与一个订阅关系。

Rxjava 使用示例

先看下 Rxjava 的基本使用

在使用的时候需要引入 rxjava 包

// 依赖 RxAndroid 2x 的依赖库,完整的支持在Android 中使用,比如一些线程调度
api 'io.reactivex.rxjava2:rxandroid:2.1.0'
// 增加 Rxjava 2x 的依赖库
api io.reactivex.rxjava:1.3.0"

使用时,注意要引入 io 包中的类

// 起点 可以先不用管,用 create 创建了一个 Observable
Observable.create(new ObservableOnSubscribe<Integer>() {

    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

            }
})
// 订阅
.subscribe(
        // 终点 也可以不用管,new 了一个 Observer
        new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
             }
       });

Rxjava 的上游与下游

事件流向分层:事件流可以分成上游与下游,事件的起点也就是上游,而终点就是事件的下游。

上游可以发送多个消息给下游,像下面这样

拆分实现时,分别创建观察者和被观察者,再订阅

        // 上游 Observable 被观察者
        Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            // 发射器 发射事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                log("1: 上游发射事件");
                // 发射事件
                emitter.onNext(1);
                log("2: 上游发射完成");
            }
        });

        // 下游 Observer 观察者 处理事件
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                log("3 下游处理事件 onNext " + integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        // 上游 订阅 下游
        observable.subscribe(observer);

当然也可以像最初那样,链式调用

RxJava 调用流程

        // 上游 Observable 被观察者
        Observable.create(new ObservableOnSubscribe<Integer>() {
            // 发射器 发射事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                log("2: 开始发射");
                // 发射事件
                emitter.onNext(1);

                emitter.onComplete(); // 4
                log("5:发射完成");
            }
        })
                //订阅
                .subscribe(new Observer<Integer>() {
                    // 下游 Observer 观察者 处理事件
                    @Override
                    public void onSubscribe(Disposable d) {
                        // 弹出加载框 loading……
                        log("1: 订阅成功");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        log("3: 下游接收 onNext " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log("4: 下游接收 onError");
                    }

                    @Override
                    public void onComplete() {
                        // 隐藏加载 loading……
                        // 只有接收完成之后,上游的 log才会打印
                        log("4: 下游接收完成 onComplete");
                    }
                });
//-----------
结果
1: 订阅成功
2: 开始发射
3: 下游接收 onNext 1
4: 下游接收完成 onComplete
5:发射完成

发射器事件

RxJava 中有个事件发射器,用来发送事件。发射器有Emitter 有三个方法,onNext、onComplete、onError,调用后会对应下游观察者的同名方法。
使用中有下面几个需要注意的地方

  1. onNext/onComplete/onError
emitter.onNext(1);
emitter.onComplete();
// onComplete 之后,以下收不到
emitter.onNext(2)

↑执行 emitter.onComplete 之后,继续发射,下游不再接受上游事件

emitter.onNext(1);
emitter.onError(new IllegalAccessError("error"));
// onError 之后,以下收不到
emitter.onNext(2);

↑执行 emitter.onError 之后,继续发射,下游不再接受上游事件

  1. onError 和 onComplete 顺序测试
emitter.onNext(1);
emitter.onComplete();
emitter.onError(new IllegalAccessError("error"));

↑ onComplete 后再调用 onError 会报错

emitter.onError(new IllegalAccessError("error"));
emitter.onComplete();

↑ 先发射 onError 再发射 onComplete 不会报错,但 onComplete也不会接收。

订阅关系

上游和下游可以有订阅关系,这种关系也可以被切断

// 上游 Observable 被观察者
Observable.create(new ObservableOnSubscribe<Integer>() {
    // 发射器 发射事件
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        log("上游发射事件");
        // 发射事件
        emitter.onNext(1);
        emitter.onNext(2);
        log("上游发射完成");
    }
}).subscribe(new Observer<Integer>() {
    // 下游 Observer 观察者 处理事件
    Disposable disposable;
    @Override
    public void onSubscribe(Disposable d) {
       // 事件被订阅
        disposable = d;
    }
    @Override
    public void onNext(Integer integer) {
        log("下游处理事件 onNext " + integer);
        // 接收上游的一个时间之后,就切断下游,让下游不再接收,但上游可以继续发
        disposable.dispose();
        // 实际用法,可在 onDestroy 中 使用上面方法,切断下游
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
//---------
结果
上游发射事件
下游处理事件 onNext 1
上游发射完成

可以看到事件2 没有被接收了,但还是会发送的,只是不接收。

总结 Part

写在后面

其实使用 RxJava 比较久了,是配合 Retrofit 使用也好,还是用它来处理线程等等,都没有系统的去学习其架构思想,最近重新学习一下搞一搞。

上一篇 下一篇

猜你喜欢

热点阅读