Android开发经验谈Android开发Android技术知识

Rxjava2-线程切换解析

2019-02-28  本文已影响57人  Colaman丶

ObservableOn()

直接查看实现,会发现onSubscribe()中做了一些判断,比如82 104等几行都是做了一些同步 异步 等的判断,然后初始化DisposableonSubscribe()是上游Observable完成了整条订阅链之后调用的,所以这些操作是在开始订阅之后才初始化操作,然后106行可以看出把一个包装处理过的Disposable传递给下游
image.png
和之前的一样,subscribeActual方法里会将observer进行包装,然后传递给source也就是上游进行订阅
* `40`行进行了判断所传进来的`scheduler`是否跟原本的线程一致,如果是一样的就直接传递不用进行处理
*  `43`行创建了一个对应`scheduler`的`worker`,`worker`在后续负责把数据在对应的线程进行发射操作
image.png
发射数据onNext处理
        @Override
        public void onNext(T t) {
          ...
          // 前面的都先忽略掉,会发现最后会调用这个方法
            schedule();
        }
        
      void schedule() {
         if (getAndIncrement() == 0) {
         // 在这个可以看到,上面根据schedule的worker执行了schedule(),并且把自身传进去,this其实实现了runnable,所以可以理解为传了一个runnable进去
            worker.schedule(this);
        }
    }
接着上面的以AndroidSchedulers.mainThread()这个scheduler为例,这里实际上是将主线程的looper传进去了
image.png

查看一下这个scheduler的worker,会发现worker的基类schedule()方法是相同的互相调用的,所以可以直接看多个参数的schedule(),可以看到73行创建了一个ScheduledRunnable对象,并且把主线程的handler以及外面的Observer传递过去,接着82行用主线程的handler发送消息,119ScheduledRunnable里的run被调用,接着Observer也就是runnable也调用run方法

image.png image.png
到这里可以看出,实际上当切换线程的时候,observer(也实现了Runnable)的onNext往scheduler里发送自身,让scheduler来决定自身应该在什么线程执行run方法,接下来看回observer的run方法,就是判断了一下要执行哪个方法
image.png
可以看到最后是调用了onNext方法,到这里就完成了指定线程发射数据的功能
        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        disposed = true;
                        upstream.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

值得注意的是可以看到的是v也就是我们要发射的数据,是通过poll方法获取的,查看代码可以发现

queue实际上就是一个Disposable也就是说是上游Observable,通过上游的poll方法去获取要onNext的数据

image.png
查看Observable其中一个实现ObservableMap的poll方法,可以看到这里实际上也是调用上游的poll方法,并且对数据的格式也就是不允许为null做了一层判断
        public U poll() throws Exception {
            T t = qd.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
poll方法操作的对象实际上是下图104行的时候new出来的,具体查看其实就是缓存数据,类似一个容量池的作用

[图片上传失败...(image-a0f00c-1551301878533)]

image.png
用一段伪代码来展示切换线程之后的observer,其实相当于onNext等方法都被放在指定的线程里去发射数据
public class Observer {
    Observer oldObserver;

    public Observer(Observer observer) {
        oldObserver = observer;
    }

    public void onNext(T t) {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onNext(t);
            }
        } .start();
    }

    public void onError(Throwable e) {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onError(e);
            }
        } .start();
    }

    public void onComplete() {
        // 一些其他操作
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onComplete();
            }
        } .start();
    }
}
上一篇下一篇

猜你喜欢

热点阅读