Rxjava源码解析--observeOn指定线程

2017-11-19  本文已影响19人  Rogge666

基于rxjava1.1.0 rxandroid 1.0.1

用例代码↓
        Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("1");
                subscriber.onCompleted();
            }
        });

        Subscriber<String> subscriber1 = new Subscriber<String>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(String s) {
                Log.e("haha",s);
            }
        };
 observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1);
observeOn源码精简↓
public final Observable<T> observeOn(Scheduler scheduler) {
        return lift(new OperatorObserveOn<T>(scheduler));
    }
AndroidSchedulers 源码↓
public final class AndroidSchedulers {
    private AndroidSchedulers() {
        throw new AssertionError("No instances");
    }

    private static final Scheduler MAIN_THREAD_SCHEDULER =
            new HandlerScheduler(new Handler(Looper.getMainLooper()));

    ①
    public static Scheduler mainThread() {
        Scheduler scheduler =
                RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
        return scheduler != null ? scheduler : MAIN_THREAD_SCHEDULER;
    }
}
lift精简源码↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        ②
        //create Observable2  OnSubscribe2
        return new Observable<R>(new OnSubscribe<R>() {
            ③
            @Override
            public void call(Subscriber<? super R> o) {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                st.onStart();
                ⑤
                onSubscribe.call(st);//onSubscribe1.call(subscriber2)
            }
        });
    }
OperatorObserveOn源码片段↓
    ④
    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {//child = subscriber1
        if (scheduler instanceof ImmediateScheduler) {
            // avoid overhead, execute directly
            return child;
        } else if (scheduler instanceof TrampolineScheduler) {
            // avoid overhead, execute directly
            return child;
        } else {
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
            parent.init();
            return parent;
        }
    }

        ⑥
        @Override
        public void onNext(final T t) {
            if (isUnsubscribed()) {
                return;
            }
            ⑦
            if (!queue.offer(on.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            schedule();
        }

final Action0 action = new Action0() {
            @Override
            public void call() {
                pollQueue();
            }
        };

        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                recursiveScheduler.schedule(action);
            }
        }

pollQueue精简版↓
void pollQueue() {
        Object o = queue.poll();
        if (o != null) {
            ⑧
            child.onNext(on.getValue(o));
        } else {
            break;
        }
    }

OperatorObserveOn.ObserveOnSubscriber源码片段↓
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
            this.child = child;
            this.recursiveScheduler = scheduler.createWorker();
            if (UnsafeAccess.isUnsafeAvailable()) {
                queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
            } else {
                queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
            }
            this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
        }

代码调用流程由①到最后
代码分解
observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1) =
observable1.lift(operatorObserveOn(func)).subscribe(subscriber1)=
observable2.subscribe(subscriber1)

执行代码首先在①创建一个HandlerScheduler 其Looper为主线程的Looper
继续执行②创建observable2 OnSubscribe2 此时订阅关系变成observable2 .subscribe(subscriber1) 执行observable2.OnSubscribe2.call(subscriber1)到达③传入subscriber1到④中作为call()的入参 此时child = subscriber1创建subscriber2

继续执行到达⑤等价执行onSubscribe1.call(subscriber2) 即subscriber2.onNext("1")到达⑥其中subscriber2.onNext方法中在节点⑦把数据存放在队列中然后执行schedule();在节点⑧会在指定的线程从队列中取出数据重新发射出来child.onNext(on.getValue(o));其中child为subscriber1 即调用subscriber1.onNext("123"));

至此流程完结

上一篇下一篇

猜你喜欢

热点阅读