RxJava2系列第一篇---基本使用

2018-08-31  本文已影响0人  sofarsogoo_932d

目录

第一篇---基本使用
第二篇---异步
第一篇---操作符

什么是RxJava

异步编程框架,链式编程风格
github
参考链接

依赖

在安卓中,除了依赖RxJava外,还需要依赖一下RxAndroid

compile 'io.reactivex.rxjava2:rxandroid:2.1.0'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'

鉴于RxJava源码比较多,本系列不会分析源码,主要介绍一些常用到的东西

1. RxJava简要介绍

RxJava中有三个关键点
Observable,Observer,subscribe

观察者模式说
Observable,被观察者
Observer,观察者
subscribe,建立订阅关系

在观察者模式中,一般都是观察者订阅被观察者
但在RxJava中,是Observable.subscribe(Observer)

水管说

RxJava事件流向.png

Observable,上游水管
Observer,下游水管
subscribe,连接上下游的水管

2. 基本使用

废话不多说,先来一段代码感受一下

 //关注点1
    Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d("TAG","subscribe");
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onNext("3");
            emitter.onComplete();
        }
    });

    //关注点2
    Observer<String> observer=new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d("TAG","onSubscribe");

        }

        @Override
        public void onNext(String s) {
            Log.d("TAG",s);
        }

        @Override
        public void onError(Throwable e) {
            Log.d("TAG","onError");
        }

        @Override
        public void onComplete() {
            Log.d("TAG","onComplete");
        }
    };

    //关注点3
    observable.subscribe(observer);

这段代码的执行结果

D/TAG: onSubscribe
D/TAG: subscribe
D/TAG: 1
D/TAG: 2
D/TAG: 3
D/TAG: onComplete

站在观察者模式的角度
关注点1的Observable是被观察者,被观察者用来发布事件
关注点2的Observer是观察者,观察者用来接收事件
关注点3用来关联观察者与被观察者

站在水管的角度
把关注点1比喻成一根水管,处在上游
把关注点2也比喻成一根水管,处在下游
关注点3就是连接这两根水管的一个媒介,通过该媒介关注点1和关注点2就连通了。

我们在来看看剩下的两个比较陌生的类
ObservableEmitter和Disposable
ObservableEmitter
这是一个接口,继承自Emitter接口,翻译过来就是发射器的意思,而它就是用来发出事件的,通过调用emitter的onNext(T value),
onError(Throwable t)和onComplete()分别用来发生next事件,error事件和complete事件。

下面来总结一下事件的发送接收规则

Disposable
Disposable是一个接口,里面就两个方法

 void dispose();
 boolean isDisposed();

修改关注点1和关注点2的代码

  //关注点1
    Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d("TAG","subscribe");
            emitter.onNext("1");
            emitter.onNext("2");
            emitter.onComplete();
            emitter.onNext("3");
            Log.d("TAG","emitter 3");
        }
    });

    //关注点2
    Observer<String> observer=new Observer<String>() {
        Disposable disposable;
        @Override
        public void onSubscribe(Disposable d) {
            Log.d("TAG","onSubscribe");
            disposable=d;

        }

        @Override
        public void onNext(String s) {
            Log.d("TAG",s);
            if(s.equals("1")){
                disposable.dispose();
                Log.d("TAG", "isDisposed : " + disposable.isDisposed());
            }
        }

        @Override
        public void onError(Throwable e) {
            Log.d("TAG","onError:"+e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.d("TAG","onComplete");
        }
    };

输出结果

D/TAG: onSubscribe
D/TAG: subscribe
D/TAG: 1
D/TAG: isDisposed : true
D/TAG: emitter 3

结果分析
当在下游调用disposable.dispose()方法时,下游将不会在接收后续的事件,但上游还是会将全部事件发送完毕。
这里的dispose就相当于将水管切断了,因此接收不了后续事件。

链式调用
我们来稍微修改一下之前的写法,就变成了RxJava流行的链式调用写法了。

 Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {

        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            
        }

        @Override
        public void onNext(String s) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

通过源码发现(CreateEmitter)
当我们调用onComplete或者onError时,会触发dispose方法

  @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
上一篇下一篇

猜你喜欢

热点阅读