初探Rxjava2

2017-11-01  本文已影响0人  beatzcs

Rxjava的基本概念:

1.自我介绍:

a library for composing asynchronous and event-based programs using observable sequences for the Java VM"

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

2.特性:

1.异步

2.简洁

ps:随着代码量的增加,Rxjava可以保持逻辑的简洁明了,替代了层层的嵌套。

3.角色:

1.观察者:Observer/Subscriber

2.被观察者:Observable/Flowable

3.订阅/注册:subscribe

ps:实际开发中常用到的两个组合Observable/Observer和Flowable/Subscriber。

Rxjava的基本使用:

1.引入:

compile'io.reactivex.rxjava2:rxjava:2.0.1'

compile'io.reactivex.rxjava2:rxandroid:2.0.1'

2.步骤:

1.创建被观察者,生产事件;

2.根据需求利用操作符来实现事件传递中的变换等加工操作;

3.订阅观察者,实现事件处理。

talk is less

1.创建

Observable Observer 订阅

Observable其他创建方式:

                    just(),from(),fromIterable(),range( ),timer( ),repeat( ),defer()


2.Consumer

没必要使用Observer时,可以根据需要使用Consumer来分别实现

等价于Observer/Subscriber subscribe函数参数

3.ActionX/FuncX

相较于Rxjava1.x的改动如下:

其中,Action0 改名成Action,Action1改名成Consumer,而Action2改名成了BiConsumer,而Action3 - Action9都不再使用了,ActionN变成了Consumer。

同样,Func改名成Function,Func2改名成BiFunction,Func3 - Func9 改名成 Function3 - Function9,FuncN 由Function取代。


4.操作符

map()操作符 flatMap()操作符 filter()操作符 take()操作符 doOnNext()

lift()和compose()等操作符可参考  给 Android 开发者的 RxJava 详解  、给初学者的RxJava2.0教程


5.线程调度

如果不指定线程的话,Rxjava是同步进行的,即在哪个线程调用subscribe()就在那个线程生产事件和消费事件,切换线程使用Schedule调度器。

Schedule线程控制器:

1.Schedulers.immediate():默认的,直接在当前线程执行,相当于不指定线程。

2.Schedulers.newThread():总是启用新线程,在新线程执行操作。

3.Schedulers.io():耗时操作(读写文件、读写数据库、访问网络等),和newThread()的区别在于,io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。注意不要把计算工作放在 io() 中,可以避免创建不必要的线程。

4.Schedulers.computation():CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

5.AndroidSchedulers.mainThread():指定的操作将在 Android 主线程运行。

调用方式:

1.subscribeOn():指定Observable(被观察者)所在线程,或叫做事件产生的线程。

2.observeOn():指定Observer(观察者)所在线程,或叫做事件消费的线程。


6.Disposable/Subscription

在Rxjava中,disposable/subscription用来切断观察者和被观察者之间的连接,当调用.dispose()/.cancel()时,观察者和被观察者连接被切断,观察者(Observer)收不到事件,但是被观察者(Observable)还是会发送事件。

disposable/subscription的获取方式:

1.Observer接口方式 2.subscribe(Consumer接口)返回 3.Subscription

详情可参考 Rxjava结合Retrofit,如何优雅的取消请求 。


7.Flowable

Backpressure(背压):当被观察者发送消息的速度大于观察者响应事件的速度时,造成事件堆积,最后OOM。Flowable就可以解决此类问题。

生产者和消费者在同意线程(同步)不会出现Backpressure问题,但生产者和消费者在不同线程(异步)就有可能出现,Backpressure策略并不会影响Flowable发送事件的速度和方式,主要是改变Subscriber消费事件的方式。

Flowable

Ⅰ.异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。

Ⅱ.request():通过这个方法可以向生产者申请消费数量,采取的是响应式编程,要多少事件,传递多少事件。虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM。

Backpressure策略的方式:

1.ERROR

ERROR即保证在异步操作中,事件累积不能超过128,超过则缓存池溢出,立刻抛出MissingBackpressureException异常。消费者不能再接收事件了,但生产者并不会停止。

2.BUFFER

把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。但是这种方式仍然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM(慎用)。

3.DROP

当消费者处理不了事件,就丢弃。消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。

4.LATEST

LATEST与DROP功能很像,消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。ps:不是最近的事件,是最后一个。

关于Retrofit2+Rxjava2+OkHttp3的网路请求封装

1.导入需要的jar包 2.初始化OkHttpClient,获取Retrofit 3.构造网络请求接口 4.发起网络请求 运行结果

ps:最后别忘记添加网络权限。

上一篇 下一篇

猜你喜欢

热点阅读