Android 架构师之路19 响应式编程RxJava基本元素
概念介绍
响应式编程是一种面向数据流和变化传播的编程范式
1.基础概念定义
- 数据流:只能以事先规定好的顺序被读取一次的数据的一个序列
- 变化传播:类似观察者模式,变化了要通知别人。
- 编程范式:计算机编程的基本风格或典范模式
1.1、数据流
- 在计算机中是数据
- 在现实中可以任意对象组成的有顺序的队列
- 就象水管里的水流,在水管的一端一点一点地供水,而在水管的另一端看到的是一股连续不断的水流。
1.2、变化传播
前一个程序的执行的结果影响接下来程序的执行与结果
1.3、编程范式
编程范式(Programming Paradigm)是某种编程语言的典型编程风格或者说是编程方式。
随着编程方法学和软件工程学的深入,特别是OO思想的普及,范式(Paradigm)以及编程范式等术语渐渐出现在人们面前。面向对象编程(OOP)常常被誉为是一种革命性的的思想,正因为它不同于其他的各种编程范式。编程范式也许是学习任何一门编程语言时要理解的最重要的术语。
托马斯.库尔提出“科学的革命”的范式论后,Robert Floyd在1979年图灵奖的颁奖演说中使用了编程范式一词。编程范式一般包括三个方面,以OOP为例:
1,学科的逻辑体系——规则范式:如 类/对象、继承、动态绑定、方法改写、对象替换等等机制。
2,心理认知因素——心理范式:按照面向对象编程之父Alan Kay的观点,“计算就是模拟”。OO范式极其重视隐喻(metaphor)的价值,通过拟人化,按照自然的方式模拟自然。
3,自然观/世界观——观念范式:强调程序的组织技术,视程序为松散耦合的对象/类的组合,以继承机制将类组织成一个层次结构,把程序运行视为相互服务的对象之间的对话。
简单来说,编程范式是程序员看待程序应该具有的观点。
为了进一步加深对编程范式的认识,这里介绍几种最常见的编程范式。
需要再次提醒的是:编程范式是编程语言的一种分类方式,它并不针对某种编程语言。就编程语言而言,一种语言可以适用多种编程范式。
编程范式示例
-
自省编程或称反射编程
2.RxJava简介
A library for composing asynchronous and event-based programs by using observable sequences.
Asynchronous:
1.异步的,RxJava是一个异步的库
2.基于回调的
Event-based:
1.基于事件的
2.事件分发的库,消息传递
2.1RxJava五大元素
1.Observable
2.Observer
3.Subscription
4.OnSubscribe
5.Subscriber
3 RxJava2五大元素
背压概念
1.异步环境下产生的问题
2.发送和处理速度不统一
3.是一种流速控制解决策略
从RxJava一个比较常见的工作场景说起。
RxJava是一个观察者模式的架构,当这个架构中被观察者(Observable)和观察者(Subscriber)处在不同的线程环境中时,由于者各自的工作量不一样,导致它们产生事件和处理事件的速度不一样,这就会出现两种情况:
被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件,(好比观察者在等米下锅,程序等待,这没有问题)。
被观察者产生事件的速度很快,而观察者处理很慢。那就出问题了,如果不作处理的话,事件会堆积起来,最终挤爆你的内存,导致程序崩溃。(好比被观察者生产的大米没人吃,堆积最后就会烂掉)。
下面我们用代码演示一下这种崩溃的场景:
//被观察者在主线程中,每1ms发送一个事件
Observable.interval(1, TimeUnit.MILLISECONDS)
//.subscribeOn(Schedulers.newThread())
//将观察者的工作放在新线程环境中
.observeOn(Schedulers.newThread())
//观察者处理每1000ms才处理一个事件
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.w("TAG","---->"+aLong);
}
});
在上面的代码中,被观察者发送事件的速度是观察者处理速度的1000倍
这段代码运行之后:
Caused by: rx.exceptions.MissingBackpressureException
抛出MissingBackpressureException往往就是因为,被观察者发送事件的速度太快,而观察者处理太慢,而且你还没有做相应措施,所以报异常。
而这个MissingBackpressureException异常里面就包含了Backpressure这个单词,看来背压肯定和这种异常情况有关系。
关于背压(Backpressure)
我这两天翻阅了大量的中文和英文资料,我发现中文资料中,很多人对于背压(Backpressure)的理解是有很大问题的,有的人把它看作一个需要避免的问题,或者程序的异常,有的人则干脆避而不谈,模棱两可,着实让人尴尬。
通过参考和对比大量的相关资料,我在这里先对背压(Backpressure)做一个明确的定义:背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略
简而言之,背压是流速控制的一种策略。
需要强调两点:
背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
背压(Backpressure)并不是一个像flatMap一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。
那么我们再回看上面的程序异常就很好理解了,就是当被观察者发送事件速度过快的情况下,我们没有做流速控制,导致了异常。
3.1RxJava2基本元素
非背压
1.Observable
- 1.观察得到的-被观察者,不支持背压
- 2.通过Observable创建一个可观察的序列(create方法)
- 3.通过subscribe去注册一个观察者
2.Observer
- 1.用于接收数据-观察者
- 2.作为Observable的subscribe方法的参数
3.Disposable
- 1.和RxJava1的Subscription的作用相当
- 2.用于取消订阅和获取当前的订阅状态
4.OnSubscrible
- 1.当订阅时会触发此接口调用
- 2.在Observable内部,实际作用是向观察者发射数据
5.Emitter
- 1.一个发射数据的接口,和Observer的方法类似
- 2.本质是对Observer和Subscriber的包装
背压
1.Flowable
- 1.易流动的---被观察者,支持背压
- 2.通过Flowable创建一个可观察的序列(create方法)
- 3.通过subscribe去注册一个观察者
2.Subscriber
- 1.一个单独接口,和Observer的方法类似
- 2.作为Flowable的subscribe方法的参数
3.Subscription
- 1.订阅,和RxJava1的有所不同
- 2.支持背压,有用于背压的request方法
4.FlowableOnSubscribe
- 1.当订阅时会触发此接口调用
- 2.在Flowable内部,实际作用是向观察者发射数据
5.Emitter
- 1.一个发射数据的接口,和Observer的方法类似
- 2.本质是对Observer的Subscriber的包装
3.2自定义实现RxJava1基本功能
自定义映射关系类(仿写相应功能),如下:
- Observable -> Caller
- Observer -> Callee
- Subscription -> Calling
- OnSubscribe -> OnCall
- Subscriber -> Receiver
/**
* Created by Xionghu on 2018/6/6.
* Desc: 简单的调用
*/
public interface Action1<T>{
void call(T t);
}
public class Caller<T> {
final OnCall<T> onCall;
public Caller(OnCall<T> onCall) {
this.onCall = onCall;
}
public static <T> Caller<T> create(OnCall<T> onCall) {
return new Caller<>(onCall);
}
public Calling call(Receiver<T> receiver) {
this.onCall.call(receiver);
return receiver;
}
public interface OnCall<T> extends Action1<Receiver<T>> {
}
}
public interface Callee<T>{
void onCompleted();
void onError(Throwable t);
void onReceive(T t);
}
/**
* Created by Xionghu on 2018/6/6.
* Desc: 描述打电话
*/
public interface Calling {
void unCall();
boolean isUnCalled();
}
/**
* Created by Xionghu on 2018/6/6.
* Desc: 接收信息的人
*/
public abstract class Receiver<T> implements Callee<T>, Calling {
private volatile boolean unCalled;
@Override
public void unCall() {
unCalled = true;
}
@Override
public boolean isUnCalled() {
return unCalled;
}
}
调用:
@OnClick(R.id.async)
public void onViewClicked() {
Calling tCalling = Caller.create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
if (!stringReceiver.isUnCalled()) {
stringReceiver.onReceive("test");
stringReceiver.onCompleted();
}
}
}).call(new Receiver<String>() {
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(String s) {
Log.d("kpioneer", "onReceive:" + s);
}
});
Caller caller = Caller.create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
if (!stringReceiver.isUnCalled()) {
stringReceiver.onReceive("test");
stringReceiver.onCompleted();
}
}
});
caller.call(new Receiver<String>() {
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(String s) {
Log.d("kpioneer", "onReceive:" + s);
}
});
}
}
Log 打印
06-06 18:35:55.210 28537-28537/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-06 18:35:55.210 28537-28537/com.haocai.rxjavademo D/kpioneer: onCompleted
06-06 18:35:55.210 28537-28537/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-06 18:35:55.210 28537-28537/com.haocai.rxjavademo D/kpioneer: onCompleted
3.3自定义实现RxJava2(无背压)基本功能
自定义映射关系类(仿写相应功能),如下:
1.Observable ->Caller
2.Observer ->Callee
3.Disposable ->Release
4.OnSubscribe -> OnCall
5.Emitter -> Emitter
RxJava2(无背压)UML
public abstract class Caller<T> {
public static <T> Caller<T> create(CallerOnCall<T> callerOnCall) {
return new CallerCreate<>(callerOnCall);
}
public void call(Callee<T> callee) {
callActual(callee);
}
protected abstract void callActual(Callee<T> callee);
}
/**
* Created by Xionghu on 2018/6/7.
* Desc: 接电话的人
*/
public interface Callee<T> {
void onCall(Release release);
void onReceive(T t);
void onCompleted();
void onError(Throwable t);
}
public class CallerCreate<T> extends Caller<T> {
private CallerOnCall<T> mCallerOnCall;
public CallerCreate(CallerOnCall<T> mCallerOnCall) {
this.mCallerOnCall = mCallerOnCall;
}
@Override
protected void callActual(Callee<T> callee) {
CreateEmitter<T> tCallerEmitter = new CreateEmitter<>(callee);
callee.onCall(tCallerEmitter);
mCallerOnCall.call(tCallerEmitter);
}
public static final class CreateEmitter<T> extends AtomicReference<Release> implements CallerEmitter<T>, Release {
private Callee<T> mCallee;
public CreateEmitter(Callee<T> mCallee) {
this.mCallee = mCallee;
}
@Override
public void onReceive(T t) {
if (!isReleased()) {
mCallee.onReceive(t);
}
}
@Override
public void onCompleted() {
if (!isReleased()) {
mCallee.onCompleted();
}
}
@Override
public void onError(Throwable t) {
if (!isReleased()) {
mCallee.onError(t);
}
}
@Override
public boolean isReleased() {
return ReleaseHelper.isReleased(get());
}
@Override
public void release() {
ReleaseHelper.release(this);
}
}
}
public interface CallerEmitter<T> extends Emitter<T> {
}
/**
* Created by Xionghu on 2018/6/7.
* Desc: 当打电话的时候
*/
public interface CallerOnCall<T> {
void call(CallerEmitter<T> callerEmitter);
}
/**
* Created by Xionghu on 2018/6/7.
* Desc:
*/
public interface Emitter<T> {
void onReceive(T t);
void onCompleted();
void onError(Throwable t);
}
/**
* Created by Xionghu on 2018/6/7.
* Desc: 挂断电话
*/
public interface Release {
boolean isReleased();
void release();
}
import java.util.concurrent.atomic.AtomicReference;
/**
* Created by Xionghu on 2018/6/7.
* Desc:
*/
public enum ReleaseHelper implements Release {
RELEASED;
public static boolean isReleased(Release release) {
return release == RELEASED;
}
public static boolean release(AtomicReference<Release> releaseAtomicReference) {
Release current = releaseAtomicReference.get();
Release d = RELEASED;
if (current != d) {
current = releaseAtomicReference.getAndSet(d);
if (current != d) {
if (current != null) {
current.release();
}
return true;
}
}
return false;
}
@Override
public boolean isReleased() {
return true;
}
@Override
public void release() {
}
}
主程序调用:
@OnClick(R.id.async)
public void onViewClicked() {
Caller.create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("test");
callerEmitter.onCompleted();
}
}).call(new Callee<String>() {
@Override
public void onCall(Release release) {
Log.d("kpioneer", "onCall");
}
@Override
public void onReceive(String s) {
Log.d("kpioneer", "onReceive:" + s);
}
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
@Override
public void onError(Throwable t) {
}
});
}
06-08 13:24:20.536 20720-20720/com.haocai.rxjavademo D/kpioneer: onCall
06-08 13:24:20.536 20720-20720/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-08 13:24:20.536 20720-20720/com.haocai.rxjavademo D/kpioneer: onCompleted
3.4自定义实现RxJava2(有背压)基本功能
1.Flowable -> Telephoner
2.Subscriber -> Receiver
3.Subscription -> Drop
4.OnSubscribe -> OnCall
5.Emitter -> Emitter
RxJava2(有背压)UML核心代码:
public class TelephonerCreate<T> extends Telephoner<T> {
private TelephonerOnCall<T> mTelephonerOnCall;
public TelephonerCreate(TelephonerOnCall<T> telephonerOnCall) {
mTelephonerOnCall = telephonerOnCall;
}
@Override
protected void callActual(Receiver<T> receiver) {
DropEmitter<T> tDropEmitter = new DropEmitter<>(receiver);
receiver.onCall(tDropEmitter);
mTelephonerOnCall.call(tDropEmitter);
}
private static class DropEmitter<T>
extends AtomicLong
implements TelephonerEmitter<T>, Drop {
private Receiver<T> mReceiver;
private volatile boolean mIsDropped;
private DropEmitter(Receiver<T> receiver) {
mReceiver = receiver;
}
@Override
public void onReceive(T t) {
if (get() != 0) {
mReceiver.onReceive(t);
BackpressureHelper.produced(this, 1); //消耗事件 直到为0
}
}
@Override
public void request(long n) {
BackpressureHelper.add(this, n); //n表示数据的数量 添加事件
}
@Override
public void onCompleted() {
mReceiver.onCompleted();
}
@Override
public void drop() {
mIsDropped = true;
}
@Override
public void onError(Throwable t) {
mReceiver.onError(t);
}
}
}
public final class BackpressureHelper {
public static void add(AtomicLong requested, long n) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return;
}
long u = r + n;
if (u < 0L) {
u = Long.MAX_VALUE;
}
// compareAndSet:如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。这里需要注意的是这个方法的返回值实际上是是否成功修改,而与之前的值无关。
requested.compareAndSet(r, u);//把requested中的值设置为u
}
public static void produced(AtomicLong requested, long n) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return;
}
long update = current - n;
if (update < 0L) {
update = 0L;
}
requested.compareAndSet(current, update);
}
}
public interface Drop {
void request(long n);
void drop();
}
@OnClick(R.id.async)
public void onViewClicked() {
Telephoner.create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("test");
telephonerEmitter.onCompleted();
}
}).call(new Receiver<String>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE); //注释带调 该句 话 就不会打印onReceive:test
Log.d("kpioneer", "onCall");
}
@Override
public void onReceive(String s) {
Log.d("kpioneer", "onReceive:" + s);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
Log.d("kpioneer", "onCompleted");
}
});
}
···
06-08 15:06:25.686 11378-11378/com.haocai.rxjavademo D/kpioneer: onCall
06-08 15:06:25.686 11378-11378/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-08 15:06:25.686 11378-11378/com.haocai.rxjavademo D/kpioneer: onCompleted
···
注意:
d.request(Long.MAX_VALUE); //注释带调 该句 话 就不会打印onReceive:test