Android

RxJava->doOnNext()

2017-09-13  本文已影响5773人  冉桓彬

example:

Observable
    .create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            LogUtils.log(Test01.class, "onNext()->1");
            emitter.onNext(1);
            LogUtils.log(Test01.class, "subscribe()->2");
            emitter.onNext(2);
            LogUtils.log(Test01.class, "subscribe()->3");
            emitter.onNext(3);
            LogUtils.log(Test01.class, "subscribe()->onComplete()");
                 emitter.onComplete();
            }
        })
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.log(Test01.class, "accept()->integer:" + integer);
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                sDisposable = disposable;
                LogUtils.log(Test01.class, "onSubscribe()");
            }

            @Override
            public void onNext(Integer value) {
                LogUtils.log(Test01.class, "onNext()->value:" + value);
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(Test01.class, "onError()");
            }

           @Override
           public void onComplete() {
               LogUtils.log(Test01.class, "onComplete()");
           }
      });

doOnNext():

.doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        LogUtils.log(Test01.class, "accept()->integer:" + integer);
    }
})

public interface Consumer<T> {
    void accept(T t) throws Exception;
}

public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> doOnNext(Consumer<? super T> onNext) {
        return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
    }

    @SchedulerSupport(SchedulerSupport.NONE)
    private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
    }
}

public final class RxJavaPlugins {
    return source;
}

class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T>;
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>;
new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate)

public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {

    public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
    }
}

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    protected final ObservableSource<T> source;
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
}

什么时候能模仿着写出这种结构的代码, 什么时候就牛逼了

subscribe():

public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        subscribeActual(observer);
    }
    protected abstract void subscribeActual(Observer<? super T> observer);
}
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(Observer<? super T> t) {
        source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
    }
}
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
    Disposable s;
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            actual.onSubscribe(this);
        }
    }
}

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }
}

这段代码应该用到的是适配器模式


static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        CreateEmitter(Observer<? super T> observer) {...}

        @Override
        public void onNext(T t) {...}

        @Override
        public void onError(Throwable t) {...}

        @Override
        public void onComplete() {...}

        @Override
        public void setDisposable(Disposable d) {...}

        @Override
        public void setCancellable(Cancellable c) {...}

        @Override
        public ObservableEmitter<T> serialize() {...}

        @Override
        public void dispose() {...}

        @Override
        public boolean isDisposed() {...}
}

总结:

试试连续调用多个doOnNext()方法:

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.log(Test01.class, "subscribe->onNext()->1");
                emitter.onNext(1);
                LogUtils.log(Test01.class, "subscribe()->onNext()->2");
                emitter.onNext(2);
                LogUtils.log(Test01.class, "subscribe()->onNext()->3");
                emitter.onNext(3);
                LogUtils.log(Test01.class, "subscribe()->onComplete()");
                emitter.onComplete();
            }
        })
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.log(Test01.class, "accept()_1->integer:" + integer);
            }
        })
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.log(Test01.class, "accept()_2->integer:" + integer);
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                sDisposable = disposable;
                LogUtils.log(Test01.class, "onSubscribe()");
            }

            @Override
            public void onNext(Integer value) {
                LogUtils.log(Test01.class, "onNext()->value:" + value);
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(Test01.class, "onError()");
            }

            @Override
            public void onComplete() {
                LogUtils.log(Test01.class, "onComplete()");
            }
        });

打印结果如下所示:

09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onSubscribe()
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe->onNext()->1
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:1
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:1
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:1
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onNext()->2
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:2
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:2
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:2
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onNext()->3
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:3
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:3
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:3
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onComplete()
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onComplete()

结合源码看看为何打印会是这种打印结果;
下面的分析可能会很绕, 也可能会让人感觉废话连篇; 这也体现了RxJava架构的复杂性;

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
}
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}
public abstract class Observable<T> implements ObservableSource<T> {
    public final Observable<T> doOnNext(Consumer<? super T> onNext) {
        return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
    }
    private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
    }
}
public final class RxJavaPlugins {
    public static <T> Observable<T> onAssembly(Observable<T> source) {
        source;
    }
}
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    final Consumer<? super T> onNext;
    public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
        this.onNext = onNext;
    }
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    protected final ObservableSource<T> source;
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
}

接下来看subscirbe(...)何如实现doOnNext()的连续调用:

public abstract class Observable<T> implements ObservableSource<T> {
    public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
    protected abstract void subscribeActual(Observer<? super T> observer);
}
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    public void subscribeActual(Observer<? super T> t) {
        source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
    }
}
public final class ObservableCreate<T> extends Observable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }
}
static final class CreateEmitter<T> {
    @Override
    public void onNext(T t) {
        observer.onNext(t);
    }
}
static final class DoOnEachObserver<T> {
    @Override
    public void onNext(T t) {
        onNext.accept(t);
        actual.onNext(t);
    }
}

通过几张图来对文字进行归纳总结:

上一篇下一篇

猜你喜欢

热点阅读