JVM · Java虚拟机原理 · JVM上语言·框架· 生态系统 移动 前端 Python Android Java知识图谱

RxJava 原理和封装使用<1>

2021-03-17  本文已影响0人  zcwfeng

前言:RxJava 本质压缩一句话,异步操作库

好用并且流行的原因:简洁(针对思维和处理业务)

RxJava扩展的观察者模式

① OnSubscribe参数对象
相当于计划表,被订阅后call()方法自动背调用,事件按顺序触发

② create()
RxJava最基本的创造事件序列的方法
基于这个方法,提供了一些方法来快捷创建事件队列

Subscriber,扩展Observer,通常我们使用Observer和Subscribe基本一致。只不过Subscriber 加入了一些方便的方法。

终点列出几个方法和用途。

① 增加onStart() 方法
subscribe刚开始未发送之前
做一些准备工作

② unsubscribe()取消订阅
subscribe之后Observable持有Observer引用
不使用的时候释放,如onPause,onStop中

③ doOnSubScribe()
和onStart一样,subscribe()调用后,
发送事件前调用。
区别,可以指定线程

observable.subscribe(observer);
或者:
observable.subscribe(subscriber);

Observable 和 Observer搭建链条关系

    // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。 
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }

subscribe() 做了三件事

  1. 调用Subscribe.onStart().是一个可选的。subscribe刚开始未发送之前做一些准备工作
  2. 调用Subscribe.call(subscriber).事件开始发送的逻辑。只有在被订阅subscribe()时执行(创建的时候不会立即开始)
  3. 将传入的Subscriber 作为Subscribtion 返回,为了方便unsubscribe()

onComplete()
onError()
onNext()

Scheduler线程控制

① Schedulers.immediate()
直接在当前线程执行,不指定线程

② Schedulers.newThread()
总是启用新线程运行,
并在新线程操作

③ Schedurers.io()
I/O操作,内部是一个无数量上限
的线程池,可以重用空闲线程。
不要把计算工作放在IO,
避免创建不必要的线程

④ Schdulers.computation()
计算所使用的Scheduler。
指的是CPU密集型计算,
例如图形的计算,会被IO限制的操作。
线程池大小为CPU的核数。
不能放IO操作,否则IO操作等待时间浪费CPU

⑤ AndroidSchedulers.mainThread()
Android专用操作,指定操作在Android主线程

① subscribeOn()
事件产生线程,subscribe()发生的线程

② observerOn()
事件消费的线程,Subscriber所在线程

Observable.just(1, 2, 3, 4)
                .subscribeOn(Schedulers.io())
                // 指定 subscribe() 发生在 IO 线程
                .observeOn(AndroidSchedulers.mainThread())
                // 指定 Subscriber 的回调发生在主线程
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer number) {
                        Log.d("zcw", "number:" + number);
                    }
                });

内部原理也是基于lift()的

  1. subscribeOn()和observeOn() 都做了线程切换。

  2. subscribeOn() 的线程切换发生在OnSubscribe中,(也就是在她通知上一级OnSubscribe还没有开始发送)subscribeOn()的线程控制可以从事件发出的开端造成影像;

  3. observerOn() 线程切换发生在内建的Subscriber 中,(发生在给下一级Subscriber发送事件时候),observerOn()控制的是它后面的线程。

可以有多次切换。subscribeOn,observeOn.
但是当使用了锁哥subscrib

变换和原理

① lift()
针对事件项和事件序列

② compose(Transformer)
对Observable自身整体的变换

RxJava 内部基于同一个基础变换
lisft()

  // 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber subscriber) {
                Subscriber newSubscriber = operator.call(subscriber);
                newSubscriber.onStart();
                onSubscribe.call(newSubscriber);
            }
        });
    }
  1. 生成了新的Observable,创建Observable所用的参数OnSubscribe,它的回调call()和Observable.subscribe() 不是一样的

  2. 当含有lift()时候·

① lift()创建了一个Observable后,加上之前的Observable,已经有两个Observable了。

新Observable新OnSubscribe 加上之前原始 Observable中的OnSubscribe,也就有了两个OnSubscribe

③ 当用户调用lift()后的Observable的subscribe()的时候,使用的是lift()返回的新的Observable,于是出发的onSubscribe.call(subscriber),也是用新的Observable中的新OnSubscribe---<就是在lift()中生成的OnSubscribe>

④ 这个新的OnSubscribe的call()方法中的onSubscribe,就是指的原始Observable中的原始OnSubscribe对象,在这个call()方法里,新OnSubscribe利用operator.call(subscriber) 生成了新的Subscriber
Operator 就是在这里,通过自己call()方法将新Subscriber和 原始Subscriber进行关联,并插入到自己的“变换代码”中实现变换。
然后用新Subscribe向原始Observer进行订阅【有点类似代理,通过拦截和处理实现序列的变换】

总结
在 Observable 执行了 lift(Operator) 方法之后,会返 回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

操作符

Rxjava操作符应用
创建操作符
    复杂数据遍历
        just
        fromArray
        fromIterable
        range
    定时任务
        interval
        intervalRange
    嵌套回调异步事件
        create
    延迟任务
        defer
        timer
变换操作符
    变换
        map
        flatMap
        concatMap
合并操作符
    组合多个被观察者,合并事件
        concatArray(发送事件--串行)
        concatDelayError
        megerArray(发送事件--并行)
        megerArrayDelayError
    组合多个被观察者,合并为一个被观察者
        zip
        combineLatest
    发送事件前追加其他事件
        startWithArray
    组合多个事件为一个事件
        reduce
        collect
    汇总发送事件数量
        count
过滤操作符
    指定过滤条件,过滤需要的事件/数据
        filter
    过滤指定类型的事件/数据
        ofType
    过滤条件不满足的事件/数据
        skip
    过滤掉重复的事件/数据
        distinct
        distinctUntilChanged
    按时间或者数量过滤事件/数据
        take
    过滤指定位置的事件
        elementAt
    按事件段过滤事件
        throttleFirst
        throttleLast
条件操作符
    判断所有事件是否满足
        all
    发送的事件判断条件不满足时,就会终止后续事件接收
        takeWhile
    发送的事件判断条件不满足时,才接收后续的事件
        skipWhile
    过滤事件(同filter)
        takeUntil
    接收判断条件满足之外的事件
        skipUntil
    判断2个被观察者发生的事件是否一样
        sequenceEqual
    判断发送的数据里面是否包含指定数据
        contains
    判断发送的数据是否为空
        isEmpty
    多个被观察者,只接收"第一个成功发送数据的被观察者"
        amb
其他功能操作符
    常用的do系列操作符
        doOnEach
        doOnSubscribe
        doOnNext
        doAfterNext
        doOnComplete
        doOnErro
    错误/异常处理
        抓捕异常并反馈特殊的结果&正常终止
            onErrorRetuturn
        抓捕异常并返回一个新的事件
            onErrorResumeNext
        异常重试
            retry
    事件重发
        repeat
        repeatWhen
        repeatUntil
    延迟发送被观察者的事件
        delay
    发送事件超时处理
        timeout

接口说明

Action1<String> onNextAction = new Action1<String>() { // onNext()
        @Override
        public void call(String s) {
            Log.d(tag, s);
        }
    };
    Action1<Throwable> onErrorAction = new Action1<Throwable>() { // onError()
        @Override
        public void call(Throwable throwable) { // Error handling
        }
    };

    Action0 onCompletedAction = new Action0() { // onCompleted()
        @Override
        public void call() {
            Log.d(tag, "completed");
        }
    };

        // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
        observable.subscribe(onNextAction);
        // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction);
        // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
        observable.subscribe(onNextAction,onErrorAction,onCompletedAction);

简单解释一下这段代码中出现的 Action1 和 Action0。

Action0 是 RxJava 的一个接口,
它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无 参无返回值的,
因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包 起来将自己作为一个参数传入 subscribe()
以实现不完整定义的回调。这样其实也可 以看做将 onCompleted()方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭 包』。

Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回 值,但有一个参数;
与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参 数无返回值的,
因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe()
以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,

但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被
用以包装不同的无返回值的方法。
注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中最终会被转换成 Subscriber 对象,
因此,描述用 Subscriber 来代替 Observer ,这样更加严 谨。

这只是1.x里面的,现在知识说原理。2.x会有变化,甚至现在最新3.x

Observable.just("images/logo.png") // 输入类型 String
                .map(new Func1<String, Bitmap>() {

                    @Override
                    public Bitmap call(String filePath) { // 参数类型 String 
                        return getBitmapFromPath(filePath); // 返回类型 Bitmap
                    }
                })
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) { // 参数类型 Bitmap 
                        // showBitmap(bitmap);
                    }
                });

这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接 口,用于包装含有一个参数的方法。
Func1 和 Action 的区别在于, Func1 包装的是 有返回值的方法。另外,和 ActionX 一样,
FuncX 也有多个,用于不同参数个数的 方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

这只是1.x里面的,现在知识说原理。2.x会有变化,甚至现在最新3.x

rxjava2与rxjava1的一些函数变化,核心思想不变的
Action
Action0 改成了 Action
Action1 改成了 Consumer
Action2 改成了 BiConsumer
删除了Action 3-9具体原因不清楚
Func -> Function
Func1 改成了 Function
Func2 改成了 BiFunction
Func[3-9] 改成了 Function[3-9


开源库地址

【Github RxJava】

【Github RxAndroid】

上一篇下一篇

猜你喜欢

热点阅读