RxJava2 源码分析(二)

2019-08-08  本文已影响0人  徘徊0_

简述:简单分析Rxjava2 常见操作符的源码

range操作符原理

range是一个生产操作符,例如下面的示例代码,发送0~5给下游:

Disposable disposable1 = Observable.range(0, 5).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: t" + integer);
            }
        });

生产操作符主要是在range实现,具体看一下:

  //规定了类型为Integer
public static Observable<Integer> range(final int start, final int count) {
        //一些判断操作
        if (count < 0) {
            throw new IllegalArgumentException("count >= 0 required but it was " + count);
        }
        if (count == 0) {
            return empty();
        }
        if (count == 1) {
            return just(start);
        }
        if ((long)start + (count - 1) > Integer.MAX_VALUE) {
            throw new IllegalArgumentException("Integer overflow");
        }
        //主要的方法
        return RxJavaPlugins.onAssembly(new ObservableRange(start, count));
    }

继续看一下ObservableRange,类不是太长主要包含RangeDisposable静态内部类和subscribeActual();方法:

public final class ObservableRange extends Observable<Integer> {
    private final int start;
    private final long end;

    public ObservableRange(int start, int count) {
        this.start = start;
        this.end = (long)start + count;
    }

    //将观察者传入
    @Override
    protected void subscribeActual(Observer<? super Integer> o) {
        RangeDisposable parent = new RangeDisposable(o, start, end);
        o.onSubscribe(parent); // 观察者跟Disposable 关联,可以截断从上游到下游的数据
        parent.run();
    }

    static final class RangeDisposable
    extends BasicIntQueueDisposable<Integer> {

        private static final long serialVersionUID = 396518478098735504L;

        final Observer<? super Integer> downstream;

        final long end;

        long index;

        boolean fused;

        RangeDisposable(Observer<? super Integer> actual, long start, long end) {
            this.downstream = actual;
            this.index = start;
            this.end = end;
        }

        void run() {
            if (fused) {
                return;
            }
            Observer<? super Integer> actual = this.downstream;
            long e = end;
            for (long i = index; i != e && get() == 0; i++) {
                actual.onNext((int)i);
            }
            if (get() == 0) {
                lazySet(1);
                actual.onComplete();
            }
        }

        @Nullable
        @Override
        public Integer poll() throws Exception {
            long i = index;
            if (i != end) {
                index = i + 1;
                return (int)i;
            }
            lazySet(1);
            return null;
        }

        @Override
        public boolean isEmpty() {
            return index == end;
        }

        @Override
        public void clear() {
            index = end;
            lazySet(1);
        }

        @Override
        public void dispose() {
            set(1);
        }

        @Override
        public boolean isDisposed() {
            return get() != 0;
        }

        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fused = true;
                return SYNC;
            }
            return NONE;
        }
    }
}
      void run() {
            if (fused) {
                return;
            }
            Observer<? super Integer> actual = this.downstream;
            long e = end;
            // 这里for循环,调用观察者的onNext,依次发送 int,
            for (long i = index; i != e && get() == 0; i++) {
                actual.onNext((int)i);
            }
            if (get() == 0) {
                lazySet(1);
                actual.onComplete();
            }
        }
Map操作符原理

map是转换操作符,例如下面的代码,将String类型转换为Integer类型:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //发送 String 类型
                emitter.onNext("1");
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String s) throws Exception {
                //String 类型 转换为 Integer
                return Integer.valueOf(s);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });

转换操作,是在map这里实现,跟一下代码:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
         //判空操作
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        // 重要方法 
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

注意:

ObservableMap 分析

ObservableMap类中包含一个静态内部类:MapObserver 具体如下:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qd.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
        @Override
        public void onNext(T t) { // 这个T = String ,代表转换前的类型
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v; // U: Integer ,指的是需要转换的类型

            /**
             *具体的转换方法是:mapper.apply(t);,将t转换为v类型
             */
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            //将转换好的v ,调用onNext()发送给下游,从而实现了类型转换
            downstream.onNext(v);
        }

注意

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception; 
}

所以,上面调用mapper.apply(t)其实是找具体实现的apply方法(这里apply返回值类型R其实就是Integer)

其实最开始代码已经具体实现了Function方法:

Function具体实现.png

到这里,也就完成了一次类型转换,将String 转换成了 Integer。

总结:

通过上面的2个操作符的分析,其实都是对Observable进行包装/变换,例如下面的两个操作符(可以猜想其它操作符应该也是差不多的流程):
1,range使用new ObservableRange(start, count);Observable进行包装处理。
2,map 使用new ObservableMap<T, R>(this, mapper)Observable进行包装处理。

上一篇 下一篇

猜你喜欢

热点阅读