RxJava的基本使用

2018-03-06  本文已影响17人  谢尔顿

引言

该篇文章主要是关于RxJava的基本使用的代码讲解。

1.两种使用方式

RxJava的两种使用方式分别为:

2. 第一种方式:分步骤使用

     /**
     * RxJava的使用方式1:分步骤实现
     */
    private void useMethod1() {
        //1. 创建被观察者对象
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            // create() 是 RxJava 最基本的创造事件序列的方法
            // 此处传入了一个 OnSubscribe 对象参数
            // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
            // 即观察者会依次调用对应事件的复写方法从而响应事件
            // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式

            // 2. 在复写的subscribe()里定义需要发送的事件
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        });

        //创建观察者
        //方式1:采用Observer接口
        Observer<Integer> observer = new Observer<Integer>() {

            //观察者接收事件前,默认最先调用复写onSubscribe()
            @Override
            public void onSubscribe(Disposable d) {
                Logger.d("开始采用subscribe连接");
            }

            //当被观察者产生Next事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onNext(Integer value) {
                Logger.d("对Next事件作出响应" + value);
            }

            //当被观察者产生Error事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onError(Throwable e) {
                Logger.d("对Error事件作出响应");
            }

            //当被观察者产生Complete事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onComplete() {
                Logger.d("对Complete事件作出响应");
            }
        };

        observable.subscribe(observer);
    }

3. 第二种方式:基于事件流的链式调用

   /**
     * RxJava的使用方式2:基于事件流的链式调用
     */
    private void useMethod2(){
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {

            //观察者接收事件前,默认最先调用复写onSubscribe()
            @Override
            public void onSubscribe(Disposable d) {
                Logger.d("开始采用subscribe连接");
            }

            //当被观察者产生Next事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onNext(Integer value) {
                Logger.d("对Next事件作出响应" + value);
            }

            //当被观察者产生Error事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onError(Throwable e) {
                Logger.d("对Error事件作出响应");
            }

            //当被观察者产生Complete事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onComplete() {
                Logger.d("对Complete事件作出响应");
            }
        });
    }
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 开始采用subscribe连接
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 对Next事件作出响应1
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 对Next事件作出响应2
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 对Next事件作出响应3
03-04 20:44:37.893 26201-26201/com.gjj.frame E/gjj: 对Complete事件作出响应

接下来我们看看观察者Observer的重写方法onSubscribe方法中的Disposable的作用,我们可以利用Disposable断开观察者和被观察者的连接,修改Observer的重写方法如下所示:

        //创建观察者
        //方式1:采用Observer接口
        Observer<Integer> observer = new Observer<Integer>() {

            //观察者接收事件前,默认最先调用复写onSubscribe()
            @Override
            public void onSubscribe(Disposable d) {
                Logger.e("开始采用subscribe连接");
                mDisposable = d;
            }

            //当被观察者产生Next事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onNext(Integer value) {
                Logger.e("对Next事件作出响应" + value);
                if (value == 2) {
                    // 设置在接收到第二个事件后切断观察者和被观察者的连接
                    mDisposable.dispose();
                    Logger.e("已经切断了连接:" + mDisposable.isDisposed());
                }
            }

            //当被观察者产生Error事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onError(Throwable e) {
                Logger.e("对Error事件作出响应");
            }

            //当被观察者产生Complete事件&观察者接受到时,会调用该复写方法进行相应
            @Override
            public void onComplete() {
                Logger.e("对Complete事件作出响应");
            }
        };

        observable.subscribe(observer);
    }

对应的log日志:

03-04 20:49:02.103 30444-30444/com.gjj.frame E/gjj: 开始采用subscribe连接
03-04 20:49:02.103 30444-30444/com.gjj.frame E/gjj: 对Next事件作出响应1
03-04 20:49:02.103 30444-30444/com.gjj.frame E/gjj: 对Next事件作出响应2
03-04 20:49:02.103 30444-30444/com.gjj.frame E/gjj: 已经切断了连接:true

完整demo

参考文章

上一篇 下一篇

猜你喜欢

热点阅读