RxJava
观察者模式和回调机制
基本使用
@Test
public void test01() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> observableEmitter) throws Exception {
log.info("可以在这里触发消费者的方法");
observableEmitter.onNext("onNext方法被调用");
observableEmitter.onComplete();
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable disposable) {
log.info("Observable调用subscribe方法时会触发这个onSubscribe方法");
}
@Override
public void onNext(Object o) {
log.info(o.toString());
}
@Override
public void onError(Throwable throwable) {
log.info("onError");
}
@Override
public void onComplete() {
log.info("onComplete方法被调用");
}
});
}
输出结果如下
Observable调用subscribe方法时会触发这个onSubscribe方法
可以在这里触发消费者的方法
onNext方法被调用
onComplete方法被调用
源码分析
Observable#create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
ObservableOnSubscribe
为订阅的每个观察者调用
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
ObservableCreate
实现了Observable接口
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
......
}
RxJavaPlugins.onAssembly
这里的source就是ObservableCreate,先不分析 f != null的情况, 直接返回source
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
Observable的create方法,返回了一个ObservableCreate对象,ObservableCreate内部持有ObservableOnSubscribe实例
上面分析的是create方法,接下分析Observable的subscribe方法
接收一个Observer对象,这里指的是代码中的匿名内部类
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 返回observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 调用ObservableCreate的subscribeActual方法
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
RxJavaPlugins.onSubscribe
不分享f != null情况,直接返回Observer
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
ObservableCreate#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 将 observer 包装成 CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用 observer的 onSubscribe方法
observer.onSubscribe(parent);
try {
// 调用ObservableOnSubscribe的subscribe方法,前面说过 ObservableCreate 持有一个ObservableOnSubscribe实例,就是create方法传进来的匿名类
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
CreateEmitter
ObservableOnSubscribe的subscribe方法,调用CreateEmitter的相关方法,本质上是调用 Observer 的相关方法
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
}
上面是基本版的,下面来一个简便快捷版的
@Test
public void test02(){
Observable.just("呵呵").subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
log.info(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
输出结果
呵呵
Observable#just
本质上并没有变化,只不过ObservableCreate换成了ObservableJust
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
ObservableJust
这个地方好像是有一些区别,之前是触发ObservableOnSubscribe的subscribe方法,然后调用CreateEmitter的相关方法,本质上是调用 Observer 的相关方法。不过这里
并没有ObservableOnSubscribe相关概念,而是多了一个ScalarDisposable
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
// 先调用调用Observer的onSubscribe方法
observer.onSubscribe(sd);
sd.run();
}
......
}
ScalarDisposable
继承了AtomicInteger,run方法中调用了observer的onNext和onComplete方法
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
static final int START = 0;
static final int FUSED = 1;
static final int ON_NEXT = 2;
static final int ON_COMPLETE = 3;
public ScalarDisposable(Observer<? super T> observer, T value) {
this.observer = observer;
this.value = value;
}
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
背压
@Test
public void test03() {
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
if (!emitter.isCancelled()) {
emitter.onNext("onNext 1");
emitter.onNext("onNext 2");
emitter.onNext("onNext 3");
emitter.onComplete();
}
}
}, DROP).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(2L);
log.info("背压订阅");
}
@Override
public void onNext(String s) {
log.info(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
输出结果
背压订阅
onNext 1
onNext 2
在subscribe方法里面调用了三次onNext方法,但是控制台只打印了两次,说明被限制了。注意onSubscribe方法中的subscription.request(2L)
原理分析
基本上和无背压版本的类似,不过这里的create方法传入了两个参数,一个是FlowableOnSubscribe,另一个是一个枚举类型,暂将它理解成背压策略
Flowable#create
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
ObjectHelper.requireNonNull(source, "source is null");
ObjectHelper.requireNonNull(mode, "mode is null");
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
FlowableCreate
public final class FlowableCreate<T> extends Flowable<T> {
final FlowableOnSubscribe<T> source;
final BackpressureStrategy backpressure;
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
}
根据传进来的参数,这里会使用的DropAsyncEmitter
DropAsyncEmitter
继承关系如下
t.onSubscribe(emitter)也就是匿名内部类的onSubscribe方法
public void onSubscribe(Subscription subscription) {
subscription.request(2L);
log.info("背压订阅");
}
然后调用BaseEmitter的request方法,BaseEmitter实现了Subscription接口
BaseEmitter#request
@Override
public final void request(long n) {
// 校验n是否大于0,大于0返回true,小于0返回false
if (SubscriptionHelper.validate(n)) {
// 设置 Emitter的值 = Emitter的值 + n
BackpressureHelper.add(this, n);
// 空实现
onRequested();
}
}
接下来调用FlowableOnSubscribe的subscrib方法,FlowableOnSubscribe中的subscribe即匿名内部类中的subscribe方法会,
先调用调用BaseEmitter中的相关方法,BaseEmitter会根据value值选择是否调用Subscriber的相关方法(onNext、onComplete、onError)
操作符原理
操作符的核心原理就是包一层,类似于代理,这里以map为例
@Test
public void test04() {
Flowable.create(emitter -> emitter.onNext("onNext 1"), DROP)
.map(v -> v + " MAP")
.subscribe(System.out::println);
}
Flowable#map
这里返回的是FlowableMap, 创建FlowableMap时以 当前Flowable实例 和 map操作符对应的逻辑函数Function 为参数。即 FlowableMap 持有上一层的Flowable实例
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableMap<T, R>(this, mapper));
}
FlowableMap
public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends U> mapper;
public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) {
super(source);
this.mapper = mapper;
}
@Override
protected void subscribeActual(Subscriber<? super U> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
} else {
source.subscribe(new MapSubscriber<T, U>(s, mapper));
}
}
static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
final Function<? super T, ? extends U> mapper;
MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 调用上一层的 onNext 方法, 这里指的是 匿名函数 Subscriber 的onNext方法,如果连接了多个操作符,就是指向上一个操作符的onNext方法
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
总结一下
Flowable.create => 传入 FlowableOnSubscribe, 返回 FlowableCreate
.map => 以create返回的FlowableCreate为参数,构建一个FlowableMap并返回
.subscribe => 以原生的Subscriber作为参数调用Flowable的subscribe方法,然后再对原生的Subscriber做一层包装作为参数,调用FlowableMap的subscribeActual,
然后再调用FlowableCreate的subscribe方法(即lowable的subscribe方法),然后再以上一层包装的Subscriber作为参数调用FlowableCreate的subscribeActual方法,
更加背压策略,以包装的Subscriber的作为参数创建BaseEmitter对象,调用包装的Subscriber的onSubscribe方法。以BaseEmitter为参数调用FlowableOnSubscribe的subscribe
方法,即调用BaseEmitter的相关方法(onNext......),但这个其实本质上还是调用包装的Subscriber的相关方法(onNext......)。在包装的Subscriber内部,执行map中的相关
逻辑修改值,然后再以新值作为参数,调用原生的Subscriber的相关方法。
也就是说,有多个个操作符,就会包装多少层
线程切换
subscribeOn
指定在哪个线程上发射数据
@Test
public void test05() throws InterruptedException {
Flowable.create(emitter -> {
log.info("发射数据的线程 => {}", Thread.currentThread().getName());
emitter.onNext("DD");
}, BUFFER)
// 指定在哪个线程上发射数据
.subscribeOn(Schedulers.io())
// 指定接收数据后在哪个线程上执行
.observeOn(Schedulers.newThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription subscription) {
log.info("onSubscribe Thread => {}", Thread.currentThread().getName());
}
@Override
public void onNext(Object s) {
log.info("onNext Data => {}, Thread => {}", s, Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
log.info("onError Thread => {}", Thread.currentThread().getName());
}
@Override
public void onComplete() {
log.info("onComplete Thread => {}", Thread.currentThread().getName());
}
}
);
Thread.sleep(3000);
}
-
subscribeOn方法返回一个
FlowableSubscribeOn
对象,创建FlowableSubscribeOn
的时候以FlowableCreate
和scheduler
作为入参; -
在
FlowableSubscribeOn.subscribe
方法内部,先创建一个Scheduler.Worker
对象,也就是实际的线程调度者,这里的Scheduler为IoScheduler
,所以对应的Worker为EventLoopWorker
;然后将入参Subscriber
封装成一个SubscribeOnSubscriber
对象;然后在主线程上调用Subscriber.onSubscribe
方法;最后以SubscribeOnSubscriber
为入参,调用Worker.schedule
方法; -
在
EventLoopWorker.schedule
方法内部调用NewThreadWorker.scheduleActual
方法; -
在
NewThreadWorker.scheduleActual
方法内部将入参SubscribeOnSubscriber
转换成ScheduledRunnable
,然后以ScheduledRunnable
为入参,调用ScheduledExecutorService.submit
方法;
最终ScheduledExecutorService.submit
方法调用顺序如下
ScheduledRunnable.call =>
ScheduledRunnable.run =>
SubscribeOnSubscriber.run =>
Flowable.subscribe 这里指 FlowableCreate
observeOn
指定接收数据后在哪个线程上执行
-
observeOn方法返回一个
FlowableObserveOn
对象,创建FlowableObserveOn
的时候以FlowableCreate
和scheduler
作为入参; -
在
FlowableObserveOn.subscribeActual
方法内部,先创建一个Scheduler.Worker
对象,也就是实际的线程调度者,这里的Scheduler为NewThreadScheduler
,所以对应的Worker为NewThreadWorker
;然后将入参Subscriber
封装成一个ObserveOnSubscriber
对象; -
Subscriber.onSubscribe
方法在主线程上执行; -
在调用
Subscriber.onNext
等相关方法的时候,通过 worker 进行调度;