自定义view

RxJava 3.x系列(一)简介及其Observable订阅

2020-09-20  本文已影响0人  文泰ChrisTwain
优先参考:给初学者的RxJava2.0教程 1-10系列

RxJava 3.x系列(二)操作符 & 背压(BackPressure)

一、RxJava简介

二、RxJava第一行代码与Observable订阅过程

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "onNext: " + value);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "error");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "complete");
    }
});
1.整体流程:上游创建、下游创建、订阅
2.Observable create后subscribe下游Observer,使得Observable触发subscribeActual()方法
3.ObservableCreate类中subscribeActual()方法通过ObservableOnSubscribe类的subscribe()方法订阅CreateEmitter
4.Observer作为成员变量暴露给ObservableEmitter,ObservableEmitter调用onNext()、onError()、onComplete()将回调下游Observer的onNext()、onError()、onComplete()方法
注:RxJava基于观察者模式,而观察者模式本质是回调。上游订阅下游实际表现为:CreateEmitter传入Observer作为其成员变量,CreateEmitter调用onNext()发射数据,将立即回调Observer的onNext()方法

三、线程切换过程

    ...
        emitter.onNext(3);
        emitter.onComplete();
    }
}) .subscribeOn(Schedulers.io())                // 切换上游线程
   .observeOn(AndroidSchedulers.mainThread())   // 切换下游线程
   .subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "onNext: " + value);
    }
    ...

原理:RxJava提供Scheduler作为线程调度器,底层通过封装HandlerExecutor实现线程切换

参考:

ReactiveX - Operators
ReactiveX/RxJava文档中文翻译
Github-ReactiveX-RxJava
详解RxJava消息订阅和线程切换原理
Rxjava线程变换原理浅谈

P.S. 本文源码基于RxJava 3.0.4 结尾时序图引用自微信公众号
By the way: 非常感谢能提出批评和修改意见
contact: tu.wentai@nexuslink.cn
WeChat:HUC-Kris

上一篇 下一篇

猜你喜欢

热点阅读