RxJava-Observable分析

2017-06-25  本文已影响0人  风雪围城

背景

看完Flowable的流控机制之后,对Observable的应对机制好奇 。希望在以后的应用过程中,不会犯一些不必要的错误。

环境与目的

RxJava版本信息如下:

'io.reactivex.rxjava2:rxandroid:2.0.1'
'io.reactivex.rxjava2:rxjava:2.1.0'

目的:探究Observable实现异步事件流的原理。

Obserable的构建过程

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

与Flowable构建过程类似,它也是通过ObservableCreate来构建Observable对象。而ObservableOnSubscribe实际上就是一个接口,接口中只有一个方法:

void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;

用以通过发射器Emitte发射事件。

subscribe过程

该过程实际上是执行了subscribeActual,具体代码为:

    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //observer.onSubscribe会首先执行
        observer.onSubscribe(parent);
        try {
            //这个source是ObservableOnSubscribe对象
            //该过程实际为通过Emitter发射事件
            //parent就是Emitter,在该方法中为CreateEmitter
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

我们通过Emitter的onNext发射事件,在CreateEmitter中,具体为:

public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

可以看到,每次onNext发射事件之后,由于生产和消费是在同一个线程中,马上就通过oberver.onNext进行消费。
即,每发射一个事件,该事件会立即被消费。
以上,观察者和被观察者者默认都运行在主线程中。

Observable涉及到不同运行线程时的构建流程

subscribeOn(Schedulers.newThread())

主要完成功能是将发射事件的过程置于一个新的线程中。
该方法实际返回了一个ObservableSubscribeOn对象,当然,这个类是Observable的子类:

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));

其subscribeActual为:

public void subscribeActual(final Observer<? super T> s) {
        //SubscribeOnObserver实现了对Observer的一层包装,将s包装成parent
        //(1)
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //s 实际就是我们自己定义的Observer
        //(2)
        s.onSubscribe(parent);         
        //(3) 重点在这里
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));  (3)
    }

(1) 封装 s 为 parent 。
(2) 对外暴露parent,实际上s就是我们自己定义的Observer,我们可以在其onSubscribe中获取到parent。
(3) 首先我们来看一下scheduler。
我使用了一个新线程Schedulers.newThread()。该scheduler的构建过程如下:

//NEW_THREAD 是一个 Scheduler
public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }

//进一步查看NEW_THREAD
--------->
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

//通过NewThreadTask() 的call方法,返回了NEW_THREAD
------>
public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }

//DEFAULT 这个实际上就是我们使用的scheduler的本尊
------>
static final Scheduler DEFAULT = new NewThreadScheduler();

来看看它的scheduleDirect方法:

  public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try {
            Future<?> f;
            if (delayTime <= 0L) {
                //从线程池中拿出线程执行了任务
                f = executor.submit(task);
            } else {
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }

而SubscribeTask就是一个Runnable任务,执行:

public void run() {
            source.subscribe(parent);
        }

以上,就是发射事件能够运行在新线程的根本,同时,后续消费事件默认也运行再该线程中(如果没有主动将消费事件的运行设置在另外一个线程中)。
我们应该注意到,对外暴露给我们的parent,是一个Disposable,它有一个dispose方法,该方法的具体实现在NewThreadWorker里:

 @Override
    public void dispose() {
        if (!disposed) {
            disposed = true;
            executor.shutdownNow();
        }
    }

它关闭了整个线程池。

observeOn(AndroidSchedulers.mainThread())

主要作用是将消费线程置于新的独立线程中。
该方法同样将返回一个ObservableObserveOn对象,实际执行过程:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

我们可以看到,和subscribeOn相比,这里就有了bufferSize的概念。bufferSize实际上是一个int常量128,不用关注。
它的subscribeActual方法为:

protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

这里可以看到,事件的消费,实际上是通过ObserveOnObserver对象执行的。该对象的onNext方法为:

public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                //将事件先存到队列中
                queue.offer(t);
            }
            //消费事件
            schedule();
        }
上一篇下一篇

猜你喜欢

热点阅读