Rxjava2源码分析

2021-04-01  本文已影响0人  快乐的橙橙宝

分析目的

  1. Observable发出数据和Observer接收数据
  2. 如何实现线程调度和操作符原理

文章仅分析Observable不分析带背压的Flowable

Observable创建和订阅

一个常见例子

 Observable.create((ObservableOnSubscribe<String>) emitter -> {
            emitter.onNext("test");
            emitter.onComplete();
        }).map(s -> s + "111")
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull String s) {
                        LPLogger.e("onNext:" + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        e.printStackTrace();
                        LPLogger.e("onError:" + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        LPLogger.e("onComplete");
                    }
                });
    }

以上常见例子展示了Observable 创建,订阅和调度以及转换操作符的整个过程下面分析各个过程

Observable创建

Observable.create()方法参数ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> e) throws Exception;
}  

该接口方法subscribe(ObservableEmitter)参数ObservableEmitter实现了onNext(),onError()onComplete()即我们使用发送数据的地方

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

其中RxJavaPlugins.onAssembly()方法是hook方法,默认返回原值即ObservableCreate(source)是一个Observable,实现了subscribeActual()此方法是订阅真正执行的方法,先不用关注
只需要注意Observable.create()方法传入参数ObservableOnSubscribe返回ObservableCreate即可

Observable订阅

先不看线程调度和操作符转换处理仅看最简单的部分subscribe()

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //..
        subscribeActual(observer);
    }

实际是调用的subscribeActual(observer),即前面创建的ObservableCreate里面的subscribeActual()

protected void subscribeActual(Observer<? super T> observer) {
     //实现自ObservableEmitter
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //这里是外界的回调
    observer.onSubscribe(parent);
    try {
        //这里source即create传入的ObservableOnSubscribe
        //parent即前面传入的ObservableEmitter,即emitter.onNext("test")中的emitter
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

看下CreateEmitter源码

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("xxx"));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("xxx");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
  1. 以上onNext(T)调用了observer.onNext(t)即让订阅者接收到发送端数据
    我们可以看到subscribeActual(observer)中的方法已经将ObservableObserver联系起来
  2. 同时注意到CreateEmitter是继承自Disposable即我们可以使用回调中的onSubscribe(Disposable d)中的Disposable去结束Observable发送,当我们subscribe(Consumer)获取的返回值同理
  3. 从上面代码可以得出我们之前记住的一些结论如onError()onComplete()只能调用一次,onSubscribeonNext()之前执行等

线程调度

subcribeOn

subscribeOn(Schedulers.io())返回值类似于create(),返回的是ObservableSubscribeOn<T>(this, scheduler)ObservableCreate<T>(source)多一个线程处理,典型的装饰器模式应用
源码也和ObservableCreate类似

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        //和CreateEmitter一样也是包装Observer,最终调用Observer.onNext之类方法
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        observer.onSubscribe(parent);
        //线程切换SubscribeTask是Runnable最终执行的还是source.subscribe(parent);
        //source.subscribe(parent)执行后会执行到ObservableCreate的subscribeActual()
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

继续看

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //实际是一个Dispose并设置到ObservableSubscribeOn方便管理任务
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

执行调用如下不一一展开:
w.schedule(task, delay, unit)->IoScheduler.EventLoopWorker.schedule->NewThreadWorker.scheduleActual()->ScheduledRunnable.call()->ScheduledRunnable.run()->DisposeTask.run()->new SubscribeTask(parent)
即在使用线程池执行了source.subscribe(parent)

observeOn

observeOn(AndroidSchedulers.mainThread())中创建的是ObservableObserveOn

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //创建worker
        Scheduler.Worker w = scheduler.createWorker();
        //还是和source.subscribe(parent)一致
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

查看ObserveOnObserver是继承Runnable

public void onNext(T t) {
    if (done) {
        return;
    }
    //先把值存储到队列中,然后切换线程处理
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}
//run方法运行到这里
void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }
            //切到线程后再执行onNext
            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}
  1. ObserveOnObserveronNext()中把发送值存到队列,然后调用schedule()
  2. 调用的是worker.schedule(this);和前面分析subcribeOn一样直接查看run(),此时已经完成线程切换
  3. run()中调用的是drainNormal(),从1中存储的队列中取出值发送
切换线程分析

AndroidSchedulers.mainThread()实现

private static final class MainHolder {
    static final Scheduler DEFAULT
        = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
        });

/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
    return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}

继续看HandlerScheduler

public Worker createWorker() {
return new HandlerWorker(handler, async);
}

private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;

private volatile boolean disposed;

HandlerWorker(Handler handler, boolean async) {
    this.handler = handler;
    this.async = async;
}

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");
    if (unit == null) throw new NullPointerException("unit == null");

    if (disposed) {
        return Disposables.disposed();
    }

    run = RxJavaPlugins.onSchedule(run);
    
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    //设置了runnable()后续发送到主线程会执行run()
    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    if (async) {
        message.setAsynchronous(true);
    }

    handler.sendMessageDelayed(message, unit.toMillis(delay));

    // Re-check disposed state for removing in case we were racing a call to dispose().
    if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
    }

    return scheduled;
}

ObservableObserveOn中的subscribeActual()创建的worker就是HandlerWorker

  1. AndroidSchedulers.mainThread() 创建了一个带主线程HandlerHandlerScheduler
  2. schedule()中通过handler.sendMessageDelayed(msg,delay)发送消息到主线程,因为message设置了Runnable(),消息发送到主线程后会调用message.callback.run()从而调用schedule()中的参数run(),即实际完成切换线程回调到ObserveOnObserverrun()
操作符原理

仅分析下map操作符,其它操作符类似
直接看ObservableMap源码

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qd.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
  1. 可以看到subscribeActual()直接是source.subscribe(parent)类型,我们直接看MapObserveronNext()
  2. 非聚合模式sourceMode的值是NONE,相当于Observer.onNext(mapper.apply(t))mapper.apply(t)则是我们写的lambda表达式的返回值即s + "111",由此可以看出map是直接把值返回
总结
  1. subscribeActual()方法中实际完成订阅,subscribe订阅后各个操作符才完成订阅,即订阅是自下而上进行的
  2. 线程操作是通过线程池和Handler完成切换
上一篇下一篇

猜你喜欢

热点阅读