Rxjava

Rxjava基础知识

2019-12-18  本文已影响0人  覆水无言

目录

1. Rx简介
2. Rxjava基础知识
3. Rxjava创建操作符
4. Rxjava的线程操作
5. Rxjava变换过滤操作符
6. Rxjava合并、链接操作符
7. Rxjava条件和布尔操作符

Rxjava基础知识

Observable:

一:Rxjava的Observable使用通常需要三步。
<font color=blue>1:创建Observable(被观察者):</font>
Observable是Rxjava中的被观察者,Rxjava使用时需要创建一个被观察者,
他决定什么时候触发事件, 也就是被观察者发布信息.
<font color=blue>2:创建Observer(观察者):</font>
Observer是观察者,用于接受Observable发布的数据,它可以在不同的线程执行任务,
这种模式极大简化并发操作,
<font color=blue>3:使用subscribe()进行订阅,</font>
这个操作类似传统观察者的注入操作,将Observable与Observer链接起来,

上篇博客的hello world

    Observable.just("hello world").subscribe(new Consumer<String> (){
        @Override
        public void accept(@NonNull String s) throws Exception {
            System.out.println(s);
        }
    })

<font color=red>代码注解:</font>
1:just:rxjava中Observable的创建操作符,创建操作符有多种create、from等,后续会详细介绍。
2:Consumer:消费者,用于接受单个值,
3:subscribe:订阅,他有多种重载方法,

subscribe(onNext)
subscribe(onNext,onError)
subscribe(onNext,onError,onComplete)
subscribe(onNext,onError,onComplete,onSubscribe); //onSubscribe:订阅事件,


eg:
Observable.just("hello world")
    .subscribe(new Consumer<String>(){  //订阅onNext事件
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    },new Consumer<Throwable>(){   //订阅错误事件
         @Override
         public void accept(Throwable throwable) throws Exception {
             System.out.println(throwable);
         }
    }, new Action(){
        @Override
        public void run() throws Exception {   //订阅完成事件
            System.out.println(" onComplete");
        }
    });
    
注:Action与Consumer是一个意思都是事件的观察者,只是action无参数,Consumer:单一参数类型。

执行顺序:

  1. onSubscribe() 订阅事件,用于取消订阅,表示观察者已将开始观察被观察者。
  2. onNext()
  3. onComplete()
  4. onError(); 在程序运行异常时发布该事件。

<font color=red>注:</font>

Rxjava中被观察者、观察者、subscribe三者缺一不可,只有使用了subscribe(),observable才会发布数据。
Rxjava中5中包含的观察者模式图:


观察模式图

observable:能发射0或N个数据,并以成功或错误事件终止。
Flowable:能发射0或N个数据,并以成功或错误事件终止。支持背压,可以控制发布数据的速度。
Single:只发送单个数据或错误事件。
Completable:不发送数据,只处理onComplete,onError事件,
Maybe:发射0或1个数据,要么成功,要么失败。

do操作符

Rxjava包含许多操作符,do操作符可以给Observable的生命周期的各个阶段加上一系列的回调监听,Rxjava中有许多doXXX操作符。
根据生命周期顺序,doXXX操作符的用途。

操作符 用途
doOnSubscribe 监听订阅事件,一旦观察者订阅了Observable,它就会被调用
doOnLifecycle 可以在观察者订阅后,设置是否取消订阅,参数为Disposable.isDisposed()
doOnNext Observable每次调用OnNext事件发送数据之前调用一次,只包含onNext事件,它的参数为发射的数据
doOnEach Observable每发射一次数据就调用一次,包含onNext, onError, onCompleted
doAfterNext 与doOnNext属性一致,只是doOnNext发生在onNext事件之前,它发生在之后
doOnComplete Observable正常终止时调用该事件。
doFinally Observable终止之后调用,无论是正常终止还是异常终止,并且它执行在doAfterTerminate之前
doAfterTerminate 向Observable注入一个action,当Observable调用onComplete或onError时触发
Observable.just("hello")
    .doOnNext(new Consumer<String>(){
        @Override
        public void accept(@NonNull String s) throws Exception{
            System.out.println("doOnNext : " + s);
        }
    })
    .doOnComplete(new Action(){
        @Override
        public void run() throws Exception{
            System.out.println("OnComplete");
        }
    })
    .doFinally(new Action(){
        @Override
        public void run() throws Exception{
            System.out.println(" doFinally");
        }
    })
    .doOnLifecycle(new Consumer<Disposable>(){
        @Override
        public void accept(@NonNull Disposable disposable) throws Exception{
            System.out.println( "doOnLifecycle : + " disposable.isDisposed());
        }
    });

Hot Observable与Cold Obsevable

1:区别

\color{#ea4235}{hot Observable:}
无论有没有观察者,事件都会发生,而且与订阅这的关系是一对多,多个Observer可以订阅它。常用与某些事件不确定什么时候发生或不确定发射的元素数量,常用与UI交互,网络环境变化,服务器推送消息的到达等。
\color{#ea4235}{cold Observable:}
只有观察者订阅了,才开始发射数据。并且与订阅者关系是一对一。Observable的普通创建操作符just、create, range,fromXXX等都是生成的cold Observable。
\color{#ea4235}{注意:}当一个cold Observable被两个Observer同时订阅时,这两个是完全相互独立的程序,相当于两个Observable与两个Observer。

2:Cold Observable装换成Hot Observable

  1. 使用publish生成ConnectableObservable.
    publish可以让Cold Observable转换成Hot Observable,将原来的Observable装换成ConnectableObservable.
  2. 使用Connect进行执行。
    生成ConnectableObservable后调用Connect(),ConnectableObaservable才能真正开始执行,不管是否有订阅。
//代码使用了lambda表达式,这个自己学习。
Consumer<Long> observer1 = aLong -> System.out.println("observer1 : " + aLong);
Consumer<Long> observer2 = aLong -> System.out.println("observer1 : " + aLong);
Consumer<Long> observer3 = aLong -> System.out.println("observer1 : " + aLong);
ConnectableObservable<Long> observable = Observable.create((ObservableOnSubscribe<Long>)
     emitter -> Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
     .take(Integer.MAX_VALUE).subscribe(emitter::onNext)).observeOn(Schedulers.newThread())
     .publish(); //这里将cold 转换为了hot
observable.connect();  //这里启动了observable, 之后已经开始发送数据,无论有没有订阅
observable.subscribe(observer1);
observable.subscribe(observer2);
observable.subscribe(observer3);

注意:
Hot Observable是多个订阅者共享同一事件。Connectableobservable是线程安全的。

3:Hot Observable转Cold Observable

ConnectableObservable的refCount操作符。refCount操作符可以把一个可连接的Observable链接和断开的过程自动化,它操作一个可链接的Observable返回一个普通的Observable,当观察者(订阅者)订阅这个Observable时,RefCount链接到下层的可连接Observable。refconut跟踪多个观察者,直到最后一个观察者取消订阅,才断开与下层可连接的OBservable的链接。
\color{#ea4235}{注意:}

1:如果所有观察者都取消订阅,则Observable的数据流停止,重新订阅,重新开始数据流。
2:部分观察者取消后,如果又有观察者订阅,则数据不会重新开发送,会按原来的书序发送。
Consumer<Long> observer1 = aLong -> System.out.println("observer1 : " + aLong);
Consumer<Long> observer2 = aLong -> System.out.println("observer1 : " + aLong);
Consumer<Long> observer3 = aLong -> System.out.println("observer1 : " + aLong);

ConnectableObservable<Long> observable = Observable.create((ObservableOnSubscribe<Long>)
        emitter -> Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation())
        .take(Integer.MAX_VALUE).subscribe(emitter::onNext)).observeOn(Schedulers.newThread()).publish();
        observable.connect();
        
Observable obs = observable.refCount();    //使用refcount生成一个需要的observable.
Disposable disposable = obs.subscribe(observer1);
Disposable disposable1 = obs.subscribe(observer2);
obs.subscribe(observer3);
try {
    Thread.sleep(20L);
} catch (InterruptedException e) {
    e.printStackTrace();
}
//取消部分订阅。
disposable.dispose(); //取消observer1
disposable1.dispose(); //取消Observer2
//重新开始订阅
disposable = obs.subscribe(observer1);
disposable1 = obs.subscribe(observer2);

Flowable

Rxjava2中Observable不再支持背压,而是Flowable来支持背压,Flowable是Rxjava2中新增的被观察者。Flowable可以看做
Flowable新的实现,它支持背压。同时实现Reactive streams的Publisher接口。

使用场景

Observable一般处理不超过1000套数据,几乎不会出现内存溢出。
Flowable较好的使用场景

1:处理某种产生超过10k的元素
2:文件的读取和分析
3:读取数据库记录,也是一个阻塞和基于拉取模式
4:IO流
5:创建响应式非阻塞接口。
上一篇下一篇

猜你喜欢

热点阅读