初探Rxjava2
Rxjava的基本概念:
1.自我介绍:
a library for composing asynchronous and event-based programs using observable sequences for the Java VM"
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
2.特性:
1.异步
2.简洁
ps:随着代码量的增加,Rxjava可以保持逻辑的简洁明了,替代了层层的嵌套。
3.角色:
1.观察者:Observer/Subscriber
2.被观察者:Observable/Flowable
3.订阅/注册:subscribe
ps:实际开发中常用到的两个组合Observable/Observer和Flowable/Subscriber。
Rxjava的基本使用:
1.引入:
compile'io.reactivex.rxjava2:rxjava:2.0.1'
compile'io.reactivex.rxjava2:rxandroid:2.0.1'
2.步骤:
1.创建被观察者,生产事件;
2.根据需求利用操作符来实现事件传递中的变换等加工操作;
3.订阅观察者,实现事件处理。

talk is less
1.创建



Observable其他创建方式:
just(),from(),fromIterable(),range( ),timer( ),repeat( ),defer()
2.Consumer
没必要使用Observer时,可以根据需要使用Consumer来分别实现


3.ActionX/FuncX
相较于Rxjava1.x的改动如下:
其中,Action0 改名成Action,Action1改名成Consumer,而Action2改名成了BiConsumer,而Action3 - Action9都不再使用了,ActionN变成了Consumer。
同样,Func改名成Function,Func2改名成BiFunction,Func3 - Func9 改名成 Function3 - Function9,FuncN 由Function取代。
4.操作符





lift()和compose()等操作符可参考 给 Android 开发者的 RxJava 详解 、给初学者的RxJava2.0教程。
5.线程调度
如果不指定线程的话,Rxjava是同步进行的,即在哪个线程调用subscribe()就在那个线程生产事件和消费事件,切换线程使用Schedule调度器。
Schedule线程控制器:
1.Schedulers.immediate():默认的,直接在当前线程执行,相当于不指定线程。
2.Schedulers.newThread():总是启用新线程,在新线程执行操作。
3.Schedulers.io():耗时操作(读写文件、读写数据库、访问网络等),和newThread()的区别在于,io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。注意不要把计算工作放在 io() 中,可以避免创建不必要的线程。
4.Schedulers.computation():CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
5.AndroidSchedulers.mainThread():指定的操作将在 Android 主线程运行。
调用方式:
1.subscribeOn():指定Observable(被观察者)所在线程,或叫做事件产生的线程。
2.observeOn():指定Observer(观察者)所在线程,或叫做事件消费的线程。
6.Disposable/Subscription
在Rxjava中,disposable/subscription用来切断观察者和被观察者之间的连接,当调用.dispose()/.cancel()时,观察者和被观察者连接被切断,观察者(Observer)收不到事件,但是被观察者(Observable)还是会发送事件。
disposable/subscription的获取方式:



详情可参考 Rxjava结合Retrofit,如何优雅的取消请求 。
7.Flowable
Backpressure(背压):当被观察者发送消息的速度大于观察者响应事件的速度时,造成事件堆积,最后OOM。Flowable就可以解决此类问题。
生产者和消费者在同意线程(同步)不会出现Backpressure问题,但生产者和消费者在不同线程(异步)就有可能出现,Backpressure策略并不会影响Flowable发送事件的速度和方式,主要是改变Subscriber消费事件的方式。

Ⅰ.异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。
Ⅱ.request():通过这个方法可以向生产者申请消费数量,采取的是响应式编程,要多少事件,传递多少事件。虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM。
Backpressure策略的方式:
1.ERROR
ERROR即保证在异步操作中,事件累积不能超过128,超过则缓存池溢出,立刻抛出MissingBackpressureException异常。消费者不能再接收事件了,但生产者并不会停止。
2.BUFFER
把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。但是这种方式仍然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM(慎用)。
3.DROP
当消费者处理不了事件,就丢弃。消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
4.LATEST
LATEST与DROP功能很像,消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。ps:不是最近的事件,是最后一个。
关于Retrofit2+Rxjava2+OkHttp3的网路请求封装





ps:最后别忘记添加网络权限。