Rxjava源码解析--observeOn指定线程
2017-11-19 本文已影响19人
Rogge666
基于rxjava1.1.0 rxandroid 1.0.1
用例代码↓
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onCompleted();
}
});
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("haha",s);
}
};
observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1);
observeOn源码精简↓
public final Observable<T> observeOn(Scheduler scheduler) {
return lift(new OperatorObserveOn<T>(scheduler));
}
AndroidSchedulers 源码↓
public final class AndroidSchedulers {
private AndroidSchedulers() {
throw new AssertionError("No instances");
}
private static final Scheduler MAIN_THREAD_SCHEDULER =
new HandlerScheduler(new Handler(Looper.getMainLooper()));
①
public static Scheduler mainThread() {
Scheduler scheduler =
RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
return scheduler != null ? scheduler : MAIN_THREAD_SCHEDULER;
}
}
lift精简源码↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
②
//create Observable2 OnSubscribe2
return new Observable<R>(new OnSubscribe<R>() {
③
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
⑤
onSubscribe.call(st);//onSubscribe1.call(subscriber2)
}
});
}
OperatorObserveOn源码片段↓
④
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {//child = subscriber1
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
parent.init();
return parent;
}
}
⑥
@Override
public void onNext(final T t) {
if (isUnsubscribed()) {
return;
}
⑦
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
final Action0 action = new Action0() {
@Override
public void call() {
pollQueue();
}
};
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(action);
}
}
pollQueue精简版↓
void pollQueue() {
Object o = queue.poll();
if (o != null) {
⑧
child.onNext(on.getValue(o));
} else {
break;
}
}
OperatorObserveOn.ObserveOnSubscriber源码片段↓
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
}
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
}
代码调用流程由①到最后
代码分解
observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1) =
observable1.lift(operatorObserveOn(func)).subscribe(subscriber1)=
observable2.subscribe(subscriber1)
执行代码首先在①创建一个HandlerScheduler 其Looper为主线程的Looper
继续执行②创建observable2 OnSubscribe2 此时订阅关系变成observable2 .subscribe(subscriber1) 执行observable2.OnSubscribe2.call(subscriber1)到达③传入subscriber1到④中作为call()的入参 此时child = subscriber1创建subscriber2
继续执行到达⑤等价执行onSubscribe1.call(subscriber2) 即subscriber2.onNext("1")到达⑥其中subscriber2.onNext方法中在节点⑦把数据存放在队列中然后执行schedule();在节点⑧会在指定的线程从队列中取出数据重新发射出来child.onNext(on.getValue(o));其中child为subscriber1 即调用subscriber1.onNext("123"));
至此流程完结