Android开发经验谈Android开发Android技术知识

Rxjava2解析-订阅流程

2019-02-28  本文已影响13人  Colaman丶

订阅

首先创建一个observerobservable

new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
 Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {

            }
        });
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        // 1
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 2
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
        // 3 
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
1. 在这里把传进来的ObservableOnSubscribe对象保存起来
2. 这里new了一个Emitter,也就是我们用来onNext,onError的类,然后把Emitter传给observer中,由于CreateEmitter实现了Disposable接口,所以可以看到Observer里有一个onSubscribe(Disposable d),我们可以用来控制流的结束等操作,实际上这个Disposable就是源Obserable创建的,
3. 这里把Emitter传给source,也就是传给我们new的ObservableOnSubscribesubscribe()

到这里可以看出最简单的一个订阅流程是什么样的

变化操作

image.png image.png
image.png

结论

image.png
上一篇 下一篇

猜你喜欢

热点阅读