Android开发经验谈Android技术知识Android开发

RxJava subscribeOn和observeOn源码介绍

2019-06-03  本文已影响8人  103style

转载请以链接形式标明出处:
本文出自:103style的博客

Base on RxJava 2.X

简介

首先我们来看subscribeOnobserveOn这两个方法的实现:

我们可以看到分别返回了ObservableSubscribeOnObservableObserveOn对象,下面对这两个类分别介绍。


ObservableSubscribeOn 源码解析

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ....
}

SubscribeOnObserver源码:

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    private static final long serialVersionUID = 8094547886072529208L;
    final Observer<? super T> downstream;
    final AtomicReference<Disposable> upstream;

    SubscribeOnObserver(Observer<? super T> downstream) {
        this.downstream = downstream;
        this.upstream = new AtomicReference<Disposable>();
    }

    @Override
    public void onSubscribe(Disposable d) {
        DisposableHelper.setOnce(this.upstream, d);
    }

    @Override
    public void onNext(T t) {
        downstream.onNext(t);
    }

    @Override
    public void onError(Throwable t) {
        downstream.onError(t);
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
    }
    ...
    void setDisposable(Disposable d) {
        DisposableHelper.setOnce(this, d);
    }
}

我们来个示例介绍下:

Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println("subscribe = " + Thread.currentThread().getName());
                for (int i = 0; i < 3; i++) {
                    emitter.onNext(String.valueOf(i));
                }
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.single())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext s = " + s + " thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete thread name = " + Thread.currentThread().getName());
            }
        });

输出结果:

onSubscribe thread name = main
subscribe = RxSingleScheduler-1
onNext s = 0 thread name = RxSingleScheduler-1
onNext s = 1 thread name = RxSingleScheduler-1
onNext s = 2 thread name = RxSingleScheduler-1
onComplete thread name = RxSingleScheduler-1

通过输出结果我们可以看到 任务处理都是在 Schedulers.single()构建的线程池中执行的。
现在来一步一步介绍,顺便复习一下:
流程图大致如下:

SubscribeOnObserver示例流程图

ObservableObserveOn 源码解析

来我们继续举个例子:给subscribeOn例子加上observeOn 方法:

Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                System.out.println("subscribe = " + Thread.currentThread().getName());
                for (int i = 0; i < 5; i++) {
                    emitter.onNext(String.valueOf(i));
                }
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.io())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("d.classname = " + d.getClass().getSimpleName());
                System.out.println("onSubscribe thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext s = " + s + " thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError thread name = " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete thread name = " + Thread.currentThread().getName());
            }
        });

输出结果:

System.out: d.classname = ObserveOnObserver
System.out: onSubscribe thread name = main
System.out: subscribe = RxSingleScheduler-1
System.out: onNext s = 0 thread name = RxCachedThreadScheduler-1
System.out: onNext s = 1 thread name = RxCachedThreadScheduler-1
System.out: onNext s = 2 thread name = RxCachedThreadScheduler-1
System.out: onComplete thread name = RxCachedThreadScheduler-1

通过输出结果我们可以看到 :

继续来看下subscribeOn流程图:

SubscribeOnObserver示例流程图

小结

subscribeOn返回得即ObservableSubscribeOn对象。
ObservableSubscribeOnsubscribeActual即为在 传入的 XXXScheduler中 执行 上一步返回对象的 subscribeActual方法。

observeOn返回得即ObservableObserveOn对象。
ObservableObserveOnsubscribeActual即为把 传入的 XXXSchedulerobserver包装成一个 Observer 传给上一步返回对象的 subscribeActual方法,让 onNextonCompleteonNext都在传入的 XXXScheduler 构建的线程池中执行。

所以,你知道RxJava是如何完成线程切换的了吗?

以上

上一篇 下一篇

猜你喜欢

热点阅读