RxJava2系列第二篇---异步
目录
异步
在该系列第一篇最开始,我们已经说了RxJava是一个异步编程框架,之所以这么说,就是因为它在线程的切换方面非常方便。
介绍异步之前,我们先看看下面几个方法
subscribe(Observer<? super T> observer)
subscribe(Consumer<? super T> onNext)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete)
subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe)
在基本使用中,我们用的是第一个方法,但此方法需要重写所有的事件,但有的时候我们并不需要对所有的事件进行处理,因此就有了下面几种方法,看参数我们就知道每个方法分别处理的是什么事件,比如第二个方法,只处理next事件,第三个方法,只处理next和error事件等等。
为了代码的简洁性,接下来我将使用Consumer作为观察者。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("TAG","subscribe:"+Thread.currentThread().getName());
emitter.onNext("1");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG","accept:"+Thread.currentThread().getName());
}
});
输出结果
D/TAG: subscribe:main
D/TAG: accept:main
结果分析
如果我将上述代码,放在一个子线程中去,发现结果线程的名字将不再是main。说明在哪创建上述代码,则上游和下游就会处于那个线程,并且它们处于同一个线程。
如果我们要在子线程中发送交易,主线程更新UI,这种情况就满足不了我们的需求了。我们需要的是上线处于子线程,负责发送网络请求,下游处于主线程,负责更新UI。通过RxJava的线程调度器可以轻松实现上述需求。
在Observable中有两个方法
subscribeOn(Scheduler scheduler) //指定上游所在的线程
observeOn(Scheduler scheduler) //指定下游所在的线程
先来看看下面的代码
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
emitter.onNext("1");
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
}
};
//关注点
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer);
输出结果
D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:main
我们来看看Schedulers和AndroidSchedulers
这两个类并无继承关系,是相互独立的两个final类
AndroidSchedulers
/** A {@link Scheduler} which executes actions on the Android main thread.*/
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
前者指定方法在主线程中执行
后者指定方法在哪个线程执行,由Looper所在的线程决定
Schedulers
该调度器里面有下面几个主要方法
//新的子线程
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
//计算密集型任务
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
//io密集型任务
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
public static Scheduler trampoline() {
return TRAMPOLINE;
}
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}
其他两个暂时没用到,就先不说明了。
回到开始的异步代码
修改关注点
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.subscribe(consumer);
输出结果
D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:RxCachedThreadScheduler-2
结果分析
修改后的代码指定了2次上游发送事件的线程,下游也指定了2次线程,通过输出结果,我们可以得出结论:上游线程只有第一次指定的有效,下游线程最终会切换至最后一个指定的线程。
为了更加清晰的知道下游线程的切换过程,我们修改代码如下
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d("TAG", "subscribe:" + Thread.currentThread().getName());
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
}
};
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d("TAG", "accept:" +s+":"+ Thread.currentThread().getName());
}
})
.subscribe(consumer);
输出结果如下
D/TAG: subscribe:RxNewThreadScheduler-1
D/TAG: accept:1:main
D/TAG: accept:2:main
D/TAG: accept:3:main
D/TAG: accept:1:RxCachedThreadScheduler-2
D/TAG: accept:1:RxCachedThreadScheduler-2
D/TAG: accept:2:RxCachedThreadScheduler-2
D/TAG: accept:2:RxCachedThreadScheduler-2
D/TAG: accept:3:RxCachedThreadScheduler-2
D/TAG: accept:3:RxCachedThreadScheduler-2
结果分析
从输出结果,我们可以看出,每一个doOnNext都会接受到全部事件,并且每一个observeOn指定的是它下面的那个事件所处的线程。
线程切换原理分析
RxJava线程切换图.png来一段RxJava的调用链代码
Observable.just(1)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
LogUtil.d("rxjava", "map1: " + Thread.currentThread().getId());
return integer.toString();
}
})
.subscribeOn(Schedulers.newThread()) // s1
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
LogUtil.d("rxjava", "map2: " + Thread.currentThread().getId());
return s.hashCode();
}
})
.observeOn(Schedulers.newThread()) // o1
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
LogUtil.d("rxjava", "map3: " + Thread.currentThread().getId());
return integer.toString();
}
})
.subscribeOn(Schedulers.newThread()) // s2
.observeOn(Schedulers.newThread()) // o2
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
LogUtil.d("rxjava", "accept: " + Thread.currentThread().getId());
}
});
结合图和代码来分析线程的切换过程
这段代码中包含了很多操作符,每一个点后面的都是RxJava的操作符,如just,map,subscribe等等,对应图中的lift
在这些操作符中,每调用一次操作符,都返回Observable,这就像Builder模式,只有subscribe返回的不是Observable,而是Disposable
subscribe意味着RxJava调用链开始启动,对应图中的底端的actual-subscriber。
自下而上找subscribeOn,每经过一个subscribeOn就切换一次线程(如果一个都没有,则线程默认为当前线程),直到到达顶端的Observable,对应图中的onSubscribe
自上而下找observeOn(图中的Subscribe),同样是每经过一个observeOn就切换一次线程
通过这个思路,大家想一想上述程序的打印结果
just,map1和map2处于一个线程,并且是s1所指定的线程
map3处于o1所处的线程
accept处于o2所处的线程
打印验证一下
map1: 732
map2: 732
map3: 733
accept: 734
结果一致
[]