Rxjava解析
Rxjava git:地址
RxAndroid git:地址
至于为什么提到RxAndroid,切换线程时将会用到。
RxJava中文文档
先说Rxjava 的好处,Rxjava的好处是异步调用,那么Android中已经有Handler,AsynTask了,为什么还需要Rxjava呢?首先是Android中进行耗时操作不能在UI线程,当进行网络请求的时候,需要另开线程,当更新UI时又需要切换到UI线程,首先引起的是代码可阅读性不是强,但是Rxjava通过操作符,实现链式编程,用户可以关注业务逻辑的实现,不用去管线程调度的问题,保持代码的可读性,再有就是Rxjava提供各种操作符,可以将请求过程中的数据更新,过滤成为自己想要的数据。
Rxjava操作符,如何使用可以看下官方文档,下面来看下Rxjava实现。
首先Rxjava中必须要的Observable(被观察者)observer(观察者),因为只有观察者订阅被观察者,被观察者中的事件才会发送发送出来。
看一个Rxjava的简单实现。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("111");
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("info","onSubscribe");
}
@Override
public void onNext(Object o) {
Log.e("info","onNext");
}
@Override
public void onError(Throwable e) {
Log.e("info","onError");
}
@Override
public void onComplete() {
Log.e("info","onComplete");
}
});
首先来看下Observer对象 ,如果其中onError
,onComplete
这俩个方法不能同时执行,如果俩个方法已经执行了了,则不能调用onNext
方法了,也就是当前Observable
和Observer
已经无订阅关系了。
再来看下 为什么要先说只有观察者订阅被观察者之后才能发送消息呢?看下subscribe
方法,是将Observable
与Observer
连接起来,看下其中的实现
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
...
//关键的实现
subscribeActual(observer);
...
}
发现调用了subscribeActual(observer)
方法,这是一个抽象方法。然后再看下Observable.create()
方法中
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
获取一个Observable 的真正实例ObservableCreate
,看下它的subscribeActual(observer)
方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);//1
observer.onSubscribe(parent);//2
try {
source.subscribe(parent);//3
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
CreateEmitter
extends Disposable
这个类的主要作用是让观察者与被观察者建立联系,也通过此将联系中断。看到上面的2
,我们知道系统会默认回调Disposable
对象给我们,如果当前Disposable
调用dispose()
,将会可以手动释放掉当前的联系。
1
处将发射器与观察者关联起来,在发射器中存放一个观察者的实例。
3
处是将发射器与被观察者关联起来,在被观察中通过发射器发送一个消息,通过发射器转发给关联转发器的观察者对象,所以emitter.onNext("111");
那么我们将会在观察者中的onNext(Object o)
方法中收到一个消息中CreateEmitter
中的接收。
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t);
}
}
上面就是一个简单的Rxjava调用已经接收,以及处理数据流的流程。
如果我们想取消订阅
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
Log.e("info","--subscribe");
emitter.onNext("11");
// emitter.onComplete();
// emitter.onNext("22");
// emitter.onNext("22");
// emitter.onNext("33");
}
}).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
disposable =d;
//关键代码
disposable.dispose();
}
...
}
disposable.dispose()
直接取消订阅即可,我们看下里面如何取消订阅的 disposable
其实就是前面的parent
的,也就是CreateEmitter
实例,CreateEmitter
继承自AtomicReference<Disposable>
,AtomicReference<Disposable>
这个类是对对象
的原子操作。disposable.dispose()
调用
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
执行的是DisposableHelper.dispose(this);
方法
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
// 如果当前的对象不为DISPOSED
if (current != d) {
//设置当前为状态为DISPOSED,并且返回原来的值
current = field.getAndSet(d);
if (current != d) {
// 如果前面的原子引用不为null,则废弃前面的。
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
再 onNext之前先判断当前的状态
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t);
}
}
看下isDisposed ()
方法
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
也是调用的DisposableHelper
中的方法
/**
* Checks if the given Disposable is the common {@link #DISPOSED} enum value.
* @param d the disposable to check
* @return true if d is {@link #DISPOSED}
*/
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
下面看下Rxjava的线程切换模型。
*Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
。
-
Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。 -
Schedulers.io(): I/O
操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler
。行为模式和newThread()
差不多,区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比newThread()
更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。 -
Schedulers.computation()
: 计算所使用的Scheduler
。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler
使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()
中,否则 I/O 操作的等待时间会浪费 CPU。 - Android 还有一个专用的
AndroidSchedulers.mainThread()
,它指定的操作将在 Android 主线程运行。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("111");
Log.e("info","---->"+Thread.currentThread());
}
}).flatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(String s) throws Exception {
Log.e("info","---->"+Thread.currentThread()); //4
return null;
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("info","--onSubscribe" +Thread.currentThread()); //5
}
@Override
public void onNext(Object o) {
Log.e("info","--onNext"+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.e("info","--onError"+Thread.currentThread());
}
@Override
public void onComplete() {
Log.e("info","--onComplete"+Thread.currentThread());
}
});
}
Rxjava的订阅流程是自下而上传递的。也即是说先会执行离Observable.subscribe(observer)
最近的Obsevable,然后再向上传递。
subscribeOn()
表示订阅的线程,observeOn ()
表示观察的线程,
subscribeOn
表示被观察者中的需要执行操作执行所在的线程,并非指的是当前线程。
先看下subscribeOn()
方法,
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
...
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);//1
observer.onSubscribe(parent);//2
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));//3
}
...
}
具体实现在ObservableSubscribeOn
类中
- 首先从上面
2
,5
可以确认Observer.onSubscribe()
是在当前线程执行的。 - 通过查看日志
4
可以确认订阅流程是在新开的线程中。
下面分析代3
: - 首先
3
中的scheduler
为IoScheduler
。 - parent.setDisposable()方法设置返回的Disposable对象,这个Disposable对象实例是一个DisposeTask对象。
由外而内的分析,
parent.setDisposable()
方法的调用
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
具体的调用是
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
ObjectHelper.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
默认field字段是无值的,将DisposeTask
传递给SubscribeOnObserver
,上面的主要目的是设置 为SubscribeOnObserver
默认值,DisposeTask
可以关联到订阅的线程,这样的话,如果取消订阅就可以将正在订阅的任务废弃掉。
事实也是这样,当调用废弃线程时的操作。
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
w
是 EventLoopWorker实例,所以这个地方调用的是EventLoopWorker.dispose()
。
现在知道dispose ()
的真实调用地址了,
现在知道如何取消Observer
和Observable
的订阅关系了,那么Observable
是如何切换到其他线程的?
我们知道Observable是从下往上传递的,而Observable 中的回调是从上往下传递的。可能听起来很蛋疼,以上面的例子来说
Observable.create(new ObservableOnSubscribe<Object>() {
....
}).flatMap(new Function<Object, ObservableSource<?>>() {
...
}).subscribeOn(Schedulers.io())
...
.subscribe(Observer<? super T> observer)
....
首先这个执行顺序是从subscribe ()
调用开始 ,这个很关键,前面说了,如果没有订阅,那么被观察者中的将不会执行,只有在该方法执行的情况下,订阅者与被订阅者才会产生联系。在subscribe ()
方法中,
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
...
subscribeActual(observer);
...
}
实现的是抽象方法是subscribeActual ()
,具体是实现是在子类中,不同子类的实现不同,每个子类中都有上一个ObservableSource
的对象source
,然后在子类的subscribeActual ()
方法中调用上一个Observable
(也就是source
)的subscribe()
方法以此向上传递。到这里也就明白,为什么在subscribeOn(Schedulers.io())
中线程切换之后其他Observable中的调用就都是在新的线程了。
如何切换到新线程?实现在scheduler.scheduleDirect(new SubscribeTask(parent))
方法中。
SubscribeTask
实现Runnable
接口,在run
方法中
@Override
public void run() {
source.subscribe(parent);
}
这个source
就是上一个ObservableSource
对象,parent
猜也能猜到是一个Observer
对象,现在的任务是要将new SubscribeTask(parent)
在一个新线程中执行,就可实现订阅在新线程执行的操作。由此可知scheduler.scheduleDirect
必定开了一个新线程,scheduler
实现是IoScheduler
,具体的调用是在EventLoopWorker
类中的父类NewThreadWorker
,通过线程池调用。
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
...
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//decoratedRun 就是前面的 SubscribeTask
....
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
...
}
return sr;
}
启动新线程的订阅的过程分心完成。
至于如何切回到主线程,其实是通过Handler
,通过主线程的handler
来实现切换。