Rxjava(一)之流程分析与基本操作

2021-01-25  本文已影响0人  梦星夜雨

前言

Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。
Rxjava是一个基于时间流,实现异步操作的库。
定义:Rxjava简单来说就是采用的观察者内模式来定义的,被观察者(Observable)通过订阅(Subscribe)按照一定顺序将事件发送给观察者(Observer),观察者按顺序接收事件并做出相应的响应动作。
首先我们引入Rxjava库

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'io.reactivex.rxjava2:rxjava:2.0.7'

基本使用

Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {

            }
        });

        Observer<Integer> observer = new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };

        integerObservable.subscribe(observer);

首先创建一个被观察者integerObservable 做为上游,接受创建一个观察者observer 做为下游接受integerObservable 传来的消息,最后调用被观察者的订阅方法subscribe()进行订阅。
当然,Rxjava最经典的还是如下的基于事件流的链式调用结构。有着逻辑简洁,代码优雅,使用简单等优点。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG,"Observable onSubscribe");
                emitter.onNext(1);
                emitter.onNext(2);
                Log.d(TAG,"start send onComplete");
                emitter.onComplete();
                Log.d(TAG,"end send onComplete");
                emitter.onNext(3);
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG,"Observer onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG,"Observer onNext: "+ integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG,"Observer onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG,"Observer onComplete");
            }
        });

在上述代码中我们加入了一些逻辑和日志,运行上述代码会得到下面结果:

Observer onSubscribe
Observable onSubscribe
Observer onNext: 1
Observer onNext: 2
start send onComplete
Observer onComplete
end send onComplete

通过分析上面的结果,我们不难发现,Rxjava的流程是从被观察者的subscribe()方法开始,然后到观察者的onSubscribe()方法,通过发射器(emitter)发送事件,最终在发射完onComplete()事件后结束,这里我们注意到,最后发射的两个事件,下游并没有收到,这是因为发送完onComplete()事件后,观察者的任务就完成了。
如果我们这里在发送onError()事件,会抛出UndeliverableException异常,但是先调用onError()再调用onComplete()事件是可以的。由于篇幅关系,这里我就不贴源码了。

这里我们介绍一个简单的观察者Consumer。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
            }
        }).subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG,"accept: "+integer);
                    }
                });
accept: 1
accept: 2

我们可以看到,简易的观察者也能接收到被观察者发送的消息。

下面我们介绍一下Disposable对象,它可以通过disposable()方法切断观察者和被观察者之间的联系,虽然被观察者还可以继续发送事件,但是观察者已经接收不到。具体示例代码和结果如下:

Disposable disposable;
    public void r01(View view) {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG,"Observable onSubscribe");
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable =d;
                Log.d(TAG,"Observer onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                if (integer == 2) {
                    disposable.dispose();
                }
                Log.d(TAG,"Observer onNext: "+ integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG,"Observer onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG,"Observer onComplete");
            }
        });
    }
Observer onSubscribe
Observable onSubscribe
Observer onNext: 1
Observer onNext: 2

本文对RxJava的进行了基本介绍,对基本用法进行了讲解。

上一篇下一篇

猜你喜欢

热点阅读