RxJava 基本使用

2018-05-30  本文已影响0人  翻滚吧王咸鱼

对RxJava 的学习,做一个记录,为以后的面试复习用. 我看一些RxJava 的介绍,个人感觉
RxJava 这个作者讲的简单明了.
通过水管来讲

image.png

老规矩 先放依赖
要在Android中使用RxJava2, 先添加Gradle配置:

  //解释一下  implementation 跟compile 的区别
  //compile依赖的确实可以做到依赖传递,但是AS 3.0开始推荐使用implementation取代了compile,
//依赖传递失效了.  而 ##implement## 的意思是将该依赖隐藏在内部,而不对外部公开. 在 app //mudule 中//使用 implement 依赖的第三方库, 在其他 mudule 是无法调用的,## compile ##android //studio 3.0 版本后废弃该指令 改用 api 代替, api 完全等同于之前的 compile 指令,
// 也就是普通的依赖, //第三方库在 mudule 中依赖后其他 mudule 都可以使用该库.官方推荐在不影响的前提下优先使用 //implement 指令依赖.
    implementation 'io.reactivex.rxjava2:rxjava:2.1.14'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2' 

下面就简单使用的例子

  //Observable 被观察者  subscribe订阅  需要做的事
        // create() 是 RxJava 最基本的创造事件序列的方法
        Observable.create(new ObservableOnSubscribe<Integer>() {
            // 此处传入了一个 OnSubscribe 对象参数
            // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
            // 即观察者会依次调用对应事件的复写方法从而响应事件
            // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
               // ObservableEmitter类对象产生事件并通知观察者
                // ObservableEmitter类介绍
                // a. 定义:事件发射器
                // b. 作用:定义需要发送的事件 & 向观察者发送事件
                emitter.onNext(1);  //发射第一个事件
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        })
                //观察者Observer subscribe是建立联系
                .subscribe(new Observer<Integer>() {  //事件的响应拿到结果

                  //  观察者接收事件前,默认最先调用复写 onSubscribe()
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("----->", "subscribe");
                    }
                    // 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
                    @Override
                    public void onNext(Integer integer) {
                        Log.d("----->", "" + integer);
                    }
                    // 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
                    @Override
                    public void onError(Throwable e) {
                        Log.d("----->", "异常");
                    }
                    // 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
                    @Override
                    public void onComplete() {
                        Log.d("----->", "complete");
                    }
                });

        //  说明:Subscriber类 = RxJava 内置的一个实现了 Observer 的抽象类,对 Observer 接口进行了扩展
<-- Observable.subscribe(Subscriber) 的内部实现 -->

public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    // 步骤1中 观察者  subscriber抽象类复写的方法,用于初始化工作
    onSubscribe.call(subscriber);
    // 通过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件
    // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
    // 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
}
image.png

注意: 只有当上游和下游建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件.

ObservableEmitter:

Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

但是,请注意,并不意味着你可以随意乱七八糟发射事件,需要满足一定的规则:

当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
上游可以不发送onComplete或onError.
最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然


注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.
Disposable:

这个单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.注意: 调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.

被观察者 Observable的subscribe()具备多个重载的方法

 * public final Disposable subscribe() {}
         // 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)

         public final Disposable subscribe(Consumer<? super T> onNext) {}
         // 表示观察者只对被观察者发送的Next事件作出响应
         public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
         // 表示观察者只对被观察者发送的Next事件 & Error事件作出响应

         public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
         // 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应

         public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
         // 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应

         public final void subscribe(Observer<? super T> observer) {}
         // 表示观察者对被观察者发送的任何事件都作出响应

可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接

/**
     * 可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
     * 即观察者 无法继续 接收 被观察者的事件,但被观察者还是可以继续发送事件
     */
    private void Dome2() {
        Observable.create(new ObservableOnSubscribe<Integer>(){
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d("----->", "开始发射:");
                emitter.onNext(1);  //发射第一个事件
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onComplete();

            }
        }).subscribe(new Observer<Integer>() {
            // 1\. 定义Disposable类变量
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d("----->", "接收到事件开始了"  );
                // 2\. 对Disposable类变量赋值
                mDisposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d("----->", "接收到事件"+integer  );
                if (integer == 2) {
                    // 设置在接收到第二个事件后切断观察者和被观察者的连接
                    mDisposable.dispose();
                    Log.d("----->", "已经切断了连接:" + mDisposable.isDisposed());
                }


            }

            @Override
            public void onError(Throwable e) {
                Log.d("---->", "对Error事件作出响应");
            }

            @Override
            public void onComplete() {
                Log.d("---->", "对Complete事件作出响应");
            }
        });
    }

简单学习到这里

上一篇下一篇

猜你喜欢

热点阅读