RxJava入门笔记

2020-02-24  本文已影响0人  jkCodic

特别注意

下文中的所有 [Observable] 都是指可观察者对象(被观察者)这个概念,在 RxJava 中有一个 [Observable] 的实现,它的名字叫做 Observable.class,一个是概念一个是实现,下面文章看的时候还请特别区分~

前言

RxJavaReactiveXJava 上的一个实现.官网为:

http://reactivex.io/

笔者是一个做 Android 的开发者,RxJava 从入门到现在的水平已经过了两年的时间.其实这东西入门真的挺难的.很多人看了几篇介绍的博客就觉得自己会了.其实并不是这样的.因为很多人看了之后依然不明白内在的很多东西,比如:

所以笔者写这个文章就是想分享和大家讨论以上的情况.下面的内容会陆陆续续的讲解到以上的内容, 但是顺序不一定是上述的顺序, 还请大家专心一点. 我想让大家真正的认识RxJava,而并不是像现在 Android 大多使用的情况一样,只是和 Rretrofit 结合一下做一下网络请求.对于那些不支持 RxJava 的库或者场景就不知道如何去设计成响应式.好了,废话就说到这里了,下面我会对 RxJava 整体做一个简单的介绍和用它到底能做什么

RxJava 到底是什么

官方翻译

RxJava 是一个基于观察者的库,利用可观察的序列然后去编写基于异步的程序. 并且提供了大量的操作符可以让您不再关注多线程和线程切换.让你专注于业务流程的编写

通俗易懂的解释

先罗列一下 RxJava 带来的好处

RxJava 的坏处

一个小故事了解 RxJava

RxJava 的世界中, EveryThing 都是 [Observable](被观察者). 利用一个小故事来解释什么是 RxJava 世界中的 !![Observable]!!(被观察者)

有一个小伙叫做小金子,他在他想喝的时候会去超市买一箱牛奶.

方式1:每次想喝的时候我去到超市,和老板说,老板,给我送一箱牛奶,这是我住的地址,老板让一个小伙 A 拿起牛奶就当当当的往我住的地方扛
方式2:在我第一次发现这个超市的时候,我就和超市老板说,老板,你给我一个电话好吗,以后每次我想喝的时候打你电话告诉你住的地址,你就给我送一箱牛奶,老板每次收到我的电话都会叫小伙 A 当当当的往我住的地方扛

方式3:我自己去超市买牛奶,自己把牛奶扛回来,整个过程自己完成(这种是全程同步完成的,不在这次讲解的范围)

两种方式都可以完成我的需求,但是不知道你们发现两者的区别了没有.

那到底区别是什么呢?

简单的可以把上面的描述分为两个事情:

第一种方式就是把两者结合到一起了

第二种方式是先和老板描述了要做的事情, 至于什么时候触发完全是看后续的心情

所以 RxJava 简单的说就是做了一件上述的事情,让所有的动作或者事情都可以滞后的发生,并且上面我们拿到的超市老板的电话,在 RxJava 中就是 [Observable](被观察者,能够对外发射信号(对应上面的例子中就是小伙 A 送牛奶)). 当你打电话给超市老板的时候,实际上就是对一个 !![Observable]!!(被观察者) 订阅 的过程,当你订阅之后小伙 A 送牛奶

上面解释了 RxJava 在上述的例子中扮演了什么角色,但是你们还不清楚,这么做到底带来了什么好处

再看两个场景,一个使用 RxJava 去完成这个逻辑(登录紧接着获取订单信息),另一个不使用

// 使用原生写法
login(new CallBack(){
    void onSuccess(final User user){
        handler.post(new Runnable() {
            @Override
            public void run() {
                // 更新 User 信息
                getOrderDetail(new CallBack(){
                    handler.post(new Runnable() {
                        @Override
                        public void run() {
                            // 更新 Order 信息
                        }
                    });
                });
            }
        });
    }
});
// 使用 RxJava 
login()
    .observeOn(AndroidSchedulers.mainThread())
    .doOnSuccess(new Consumer<User>() {
        @Override
        public void accept(User user) throws Exception {
            // 更新 user 信息
        }
    })
    .observeOn(Schedulers.io())
    .flatMap(new Function<User, SingleSource<Order>>() {
        @Override
        public SingleSource<Order> apply(User user) throws Exception {
            return getOrderDetail();
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .doOnSuccess(new Consumer<Order>() {
        @Override
        public void accept(Order order) throws Exception {
            // 更新 order 信息
        }
    })
    // 订阅
    .subscribe();       

两段代码实现同样的功能,表面上就能看出来 RxJava 方式有以下几个优点

你不容易发现的优点还有

其实最终的效果和 Callback 的方式没有什么实质性的差别,无非就是 RxJava 让你的代码写起来更加的清爽、摆脱了 Callback 嵌套、更方便的线程切换、大量的内置操作符的支持你平常的工作

而使用 RxJava 方式完成上述工作用一个小故事来说如下:

  1. 小金子去到超市,要了一个老板的电话,打这个电话告诉老板地址就会送牛奶
  2. 我拿着第一步拿到的电话号码来到冰激凌店,告诉冰激凌老板 超市老板的电话号码 和我要做牛奶冰激凌的意向,我成功的拿到了冰激凌店的电话
  3. 最后当我想吃牛奶冰激凌的时候,我就打电话给冰激凌店,冰激凌店就会打电话给超市要求送一箱牛奶到冰激凌店用于加工,我只要在家等待 冰激凌店 给我送做好的冰激凌就好了
  4. 以后我只要想吃牛奶冰激凌,就可以打电话冰激凌店,而不需要我每次从告诉超市老板我要买牛奶开始

以上我希望我已经能讲明白 RxJava 到底是一个什么东西了,此时你再回过头看下 RxJava 的定义,应该能更理解一些了吧

RxJava 的几个重要概念

signal(信号)

在很多很多的文章中,这个词叫法很多,比如:元素、Item、信号等等,这些说的其实都是一个东西,说的都是
ObServable 可能会发射出来的东西

Observable(被观察者)

这里指的是可观察者(被观察者)这个概念,表示这个对象是可以被观察的。
理论上被观察者可以是任何一个对象,everything
在你观察的时候,你可以收到它发射出来的信号.在 RxJava 中有五种基本实现.
因为这个 Observable 名词和 Observable 实现类重名了,所以之后的文章,我都会用 被观察者 来表示它,而之后出现的 ObServable 都表示具体的 ObServable 实现类

介绍了这五种操作符, 建议大家平时真的要注意区分, 不要什么场景都使用 Observable, 下面简单的教大家区分一下使用场景:

Observer(观察者)

表示观察者,是去观察被观察者的对象。一个很直观的场景就是 Android 中的监听点击事件,下面的 Button 就是被观察者,listener 就是观察者 button 就是一个 被观察者

button.setOnClickListener(listener);

RxJava 一个简单原理图和解释

<img src="https://xiaojinzi.oss-cn-shanghai.aliyuncs.com/blogImages/20190624165127.png" width=400px height=400px />

看不懂没关系,下面会用大白话来解释清楚

首先有 AA,BB,CC 三个人, AA 是一个卖汽车的销售员,BB 是奔驰4s店,CC 是一个奔驰汽车的提供商.而帅气的我 小金子 是一个穷逼,准备买车!,分为两个过程

上面的例子我简明扼要的说明了 RxJava 整个流程是如何执行的,

RxJava 线程切换原理

上面我们简单的描述了一下 RxJava 的原理,我们可以清楚一个事实那就是,每一个流程都会分为两个过程

所以这就对应了 RxJava 为什么会有两个切换线程的操作符,假设我们的业务流程从上而下应该是

CC 供应商 —> BB 4s店 —> AA 销售员 —> 小金子

你会发现 observeOn 切换线程的方向是和业务代码的书写流程是一致的,所以很多人对 observeOn 切换线程的大致理解是蒙对的,但是很多人对 subscribeOn 操作符是一脸懵逼

这里简单的写了一段代码,代码做的事情很简单简单,最顶层的 [Observable] 是用于发送一个 hello 字符串,然后 map 操作符在原有的基础上拼接上 world,最终完成 hello world 这个信号的发射,然后多个地方被我插入了两个切换线程的操作符

假设当前线程是 MainThread


// 创建Observable(创建 Observable 的过程在当前线程MainThread)
Single<String> singleObservable = Single.just("hello")
            // 这句话让信号发射的线程切换到 Thread-3, 并用 Thread-3 线程继续信号的发射
        .observeOn(Thread-3线程调度器)
            // 这句话让订阅的线程切换到 Thread-2,并用 Thread-2 线程继续订阅上游的 [Observable]
            .subscribeOn(Thread-2线程调度器)
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                // 因为这里是信号的发射流程,所以这里的线程取决上游的最近的一个 observeOn 操作符
                // 所以这里的线程是 Thread-3 而不是 Thread-2
                return s + " world";
            }
        })
            .doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                // 这是订阅流程的 Callback, 所以和下游的声明的最近的一个 subscribeOn 操作符
                // 你可以做一些事情,或者立马取消订阅, 通过 disposable.dispose()
            }
        })
            // 这句话让信号发射的线程切换到 Thread-4, 并用 Thread-4 线程继续信号的发射
        .observeOn(Thread-4线程调度器)
            // 这句话让订阅的线程切换到 Thread-1,并用 Thread-1 线程继续订阅上游的 [Observable]
        .subscribeOn(Thread-1线程调度器); 

// 创建 Observer
SingleObserver<String> singleObserver = new SingleObserver<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        // 这是订阅流程的 OnSubscribe Callback, 取决于 Observer 在什么线程上订阅 Observable
        // 所以这个地方的线程是 MainThread
        System.out.println("onSubscribe");
    }

    @Override
    public void onSuccess(String result) {
        // 因为这里是信号的发射流程,所以这里的线程取决上游的最近的一个 observeOn 操作符
        // 所以这里是 Thread-4 线程
        System.out.println("onSuccess:" + result);
    }

    @Override
    public void onError(Throwable e) {
        // 因为这里是信号的发射流程,所以这里的线程取决上游的最近的一个 observeOn 操作符
        // 所以这里是 Thread-4 线程
        System.out.println("onError");
    }
};

// singleObserver 订阅 singleObservable, 在 MainThread 上
singleObservable.subscribe(singleObserver);

RxJava 常用的操作符有哪些

这里推荐一个 rx 思想的操作符可视化的一个网站, 还是一个信号可移动的

http://rxmarbles.com/

几乎常用的操作符在上面都有, 我这里给大家稍微解释下, 教大家如何去看

这里举例看一个 merge 操作符

image

了解了图中各个部分的意义, 就可以来看 merge 操作符到底干了什么了

我们从图中可以看到, 第一个和第二个 [Observable] 发射的信号最终都在第三个 [Observable] 中了. 并且是按照时间的顺序的.

所以 merge 操作符是合并 N 个 [Observable] 发射的信号, 所有的信号按照发射的时间排列.

那你可能会问了, 那 merge 操作符有啥用?请记住一句话, 在 RxJava 的世界中, 万物都是 [Observable], 所以可完成的事情很多,最常用的可以是如下使用:

所以 merge 操作符是可以并发的去完成一些事情的, 从这里你可以感受到你没有用到多线程,但是却完成了多线程的工作. 而你用 RxJava 仅仅要做的只不过是每一个要让 merge 合并的 [Observable] 切换订阅线程到一个独立的线程或者一个 IO 线程即可.

我们继续看一个操作符!

Map 操作符

image

我们可以看到 Map 操作符针做的事情特别简单, 就是改变每一个发射的信号, 从上图中我们看得出, 上图的 Map(x ==> 10 * x) 的具体实现是把每一个信号都 * 10, 所以使用了就可以让每一个信号都乘以 10 的效果.

Concat 操作符

image

有了上面两个例子,相信这个操作符也不是什么问题了.

我们可以看到, concat 操作符可以把 N 个 [Observable] 发射的信号按照订阅 [Observable] 的顺序给排列

这非常符合我们要做一些有顺序的事情

比如你要做两件毫不相干的事情, 但是却有顺序, 那么 concat 再适合不过了.

如果你要做的事情不仅有顺序,而且前后之间还有联系, 那么情使用 FlatMap

FlatMap 操作符

FlatMap 可以把一个信号转化为另一个 [Observable], 这个操作符可以很好的做

一个逻辑和另一个逻辑的衔接.

比如请求 A 接口成功了之后, 需要使用 A 接口返回的数据继续请求 B 接口.

更多的操作符

其他更多的操作符希望大家多去 http://rxmarbles.com/ 网站了解下

并且 RxJava 库的每一个操作符方法的注释上都有一个这个操作符的原理图, 比如 filter 操作符

image

“Hot” and “Cold” Observables

下文中的

When does an Observable begin emitting its sequence of items? It depends on the Observable. A “hot” Observable may begin emitting items as soon as it is created, and so any observer who later subscribes to that Observable may start observing the sequence somewhere in the middle. A “cold” Observable, on the other hand, waits until an observer subscribes to it before it begins to emit items, and so such an observer is guaranteed to see the whole sequence from the beginning.

以上是官方文档中对热 Observable 和冷 Observable 的一个解释. 英文还行的同学其实对上面的这段话应该不难理解, 大概的意思就是:

一个冷 [Observable] , 不管订阅者何时来订阅它, 它都会从头到尾的发射所有的信号给这个订阅者. 而我们上述的的篇幅介绍的所有内容都是属于冷 [Observable] .

而一个热的 [Observable] 可能发射信号当它创建的时候, 当订阅者订阅了这个 热的 [Observable] 有可能只能收到订阅关系建立之后的信号. 热 [Observable] 的行为取决于热 [Observable] 的实现

作者建议一种方法, 可以十分方便的区分出来这个 [Observable] 是一个热的还是冷的

[Observable] 信号的产生是否和订阅者的订阅有关. 请注意看前面的几个字 "信号的产生"

因为一个热 [Observable] 信号的产生往往和订阅者无关, 比如你关注的一个博主发布了一篇文章, 而你作为关注者收到了, 这个博主发布这个文章不会因为你是否关注它而收到影响! 而如果你之前没有关注这个博主, 那么很可惜你就收不到这篇文章的推送了.

一个冷 [Observable] 信号是在订阅者订阅自己之后产生, 每一个订阅者订阅都会重新产生一份发射给订阅者.

从上面的一个解释可以看出, 热 [Observable] 才是真的体现观察者模式的 ,在我看来冷的 [Observable] 只是一个 "假的" 观察者模式.

我们上面所有的例子, 其实都是一个冷 [Observable] 的一个使用, 比如:

[Observable]RxJava 中的实现类为 Subject, 它既是一个 [Observable] 也是一个 Observer , 它有多个子类, 用于实现热 [Observable] 不同的行为

那热 [Observable] 到底在哪里使用?可以是以下几个场景

RxJavaMVVM 是什么关系

RxJava 其实是可以看成是一个 MVVM 的框架,它可以让您的项目整体架构是一个 MVVM 的风格. 这得益于 RxJava 中的热 [Observable], 它可以描述任何一个数据和其他数据的一个关系. 可以组建出一个关系网, 描述了哪个数据的变化会影响到哪个数据, 利用观察者的模式去写项目.

MVVM 只是一个概念, 是 Model-View-ViewModel的简写, 具体的解释, 可以进入 百度百科 进行了解 , 在 Android 中, 大家对 MVVM 的理解可能就局限于 DataBinding 这个框架, 认为它就是 MVVM.

DataBinding

这里还是要描述清楚, MVVM 只是一个概念, 它的实现中有一个叫做 DataBinding 的框架. DataBinding 框架帮助你实现了视图中的控件和 ViewModel 中的数据进行了一个双向的绑定, 另外它也有提供数据的观察者模式, 比如 ObservableField, 利用诸如 ObservableField 的类对真实的数据进行包装, 你就可以描述数据和数据之间的关系. 比如 B数据 的改变是因为 A数据. 然后我们就可以利用 ObservableField 提供的监听方法设置监听然后拿到数据 A 之后去影响 B.

为什么 RxJava 也可以是一个 MVVM 框架

我们上面说过 RxJava 的热 [Observable], 如果我们不手动让热[Observable] 结束, 它可以永远都处于待发射信号的状态(也就是没有完成), 而我们的 App 在没有被杀死之前, 其实很多时候就是在描述数据和数据、数据和视图之间的关系. 而 RxJava[Observable] 正好可以胜任这件事情, 而且可以做的非常出色.下面我从简单的几个例子中说明 RxJavaMVVM 方面的好处:

用观察者模式去描述数据之间的关系

MVP 模式下, 如果多个流程会影响到同一个视图, 我们通常都是每一个流程的代码中会去调用 View 层的接口达到我们的目的. 示意图如下

graph LR
        A流程 --> A数据
        A1流程 --> A数据
    A流程 --> T视图的显示
    A1流程 --> T视图的显示
    B流程 --> B数据
    B流程 --> T视图的显示
    其他流程 --> 其他流程数据
    其他流程 --> T视图的显示

我们可以看到 T视图 的显示的代码嵌入在每一个流程中, 当以后维护的时候, 假如我其他地方一个地方修改到了 A产物, 我们都得记得去通知 T视图 显示, 那么很明显我们的设计是失败的, 因为我们后续的代码其实就是想修改 A产物 即可, 而我们还需关系 T视图 的显示. 很明显不利于维护

而用热 Observable 是怎么样的呢?

graph TB
        A流程 --> A数据
        A1流程 --> A数据
    B流程 --> B数据
    其他流程 --> 其他流程数据
    A数据 --> T视图的显示
    B数据 --> T视图的显示
    其他流程数据 --> T视图的显示

首先我们可以看见流程变清晰的, 每一个流程只会专注于修改自己流程上产生的数据. 而我们还可以看到各个数据都指向了 T视图 的显示, 这里是因为 T视图 的显示去订阅了 A数据B数据其他数据, 当其中任何一个数据变化都能导致 T视图 的变化. 很明显这种方式后期维护的时候更加的容易并且不容易出错.当添加了一个 A11 流程也是修改 A数据 的, 那么再也不需要关心 A数据 的修改还需要通知 T视图 这件事了.

我们这种场景对应的热 ObservableBehaviorSubject 这个实现类, 示例代码如下:

BehaviorSubject<A数据> aBehaviorSubject = BehaviorSubject.create();
BehaviorSubject<B数据> bBehaviorSubject = BehaviorSubject.create();

// 构造函数
xxx(){
  aBehaviorSubject
      // 中间你还可以添加很多很多的数据错误的操作符去写你的流程
        .subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            // 通知 T视图
            view.xxx();
        }
      });
  bBehaviorSubject
      // 中间你还可以添加很多很多的数据错误的操作符去写你的流程
        .subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            // 通知 T视图
            view.xxx();
        }
      });
}

感觉代码量上涨了?哈哈,确实是的, 但是我们要关注的是代码的一个维护性, 如果使用 java8 代码量会少很多.

上述有些人可以还是没有多大的感觉, 就觉得这只是把之前流程中的一些代码抽取出来写而已.

但是请大家注意, 上述两张图表达的编码方式和维护性其实完成不一样. 图一更加的专注数据的处理过程, 而图2更专注于数据和数据、数据和视图之间的关系. 当你描述完这些关系之后, 你写代码会更加的清晰明了并且不易出错. 而且有一句话是最好的注释其实是你的代码, 用 RxJavaObservable 去描述数据和数据、数据和视图之间的关系在代码上就会有体现, 而不是原始的方式你需要去研究各个的流程代码之后才能得到 xxx 和 xxx 之间的一个关系.

基于第一点我们再来一个经典例子(多表单验证)

我们都做过类似于登录注册的界面, 各个的影响关系的关系如下

graph LR
    用户输入name --> name输入框
    name输入框 --> check方法
        check方法 --> 按钮是否可用
        点击清除密码图标 --> password输入框
        password输入框 --> 清除密码图标是否显示
        用户输入password --> password输入框
        password输入框 --> check方法

代码很简单, 很多人第一反应就是去设置 name输入框pass输入框的文本改变监听, 然后在监听方法中调用同一个方法 check(), 去检查所有的输入框是否满足需求, 然后决定按钮是否可用.

换句话说就是上述的任何一个影响到 按钮是否可用 的流程你都要调用检查全部控件是否满足条件的方法.这种方式其实是对的, 但是却让人写代码不愉快

如果通过 RxJava 来构建他们的关系, 关系图如下. 关系图没有变多少, 但是内在的实现变化很大

graph LR
            用户输入name --> name输入框
      name输入框 --> ViewModel中的nameObservable
      点击清除密码图标 --> password输入框
      用户输入password --> password输入框
      password输入框 --> ViewModel中的passwordObservable
      ViewModel中的passwordObservable --> 清除密码图标是否显示
      ViewModel中的nameObservable --> 按钮是否可用
      ViewModel中的passwordObservable --> 按钮是否可用

配合操作符 combineLatest, 可以让代码变得非常的舒服

N 个 [Observable], 任何一个 [Observable] 发射一个信号都会产生一个结合信号, 组合每一个 [Observable] 最后一个信号. 效果图如下, 这个正好符合我们上述的多个输入框的值影响到同一个视图的场景.

image
// 名称的 [Observable]
BehaviorSubject<String> nameBehaviorSubject = BehaviorSubject.create();
// 密码的 [Observable]
BehaviorSubject<String> passwordBehaviorSubject = BehaviorSubject.create();
// 构造函数
LoginViewModel() {
// 可以合并 N 个 [Observable], 这里就账号和密码的数据
// 当账号和密码的数据其中一个发生变化都会触发一次组合合并出一个新的信号
    Observable
    .combineLatest(nameBehaviorSubject,passwordBehaviorSubject,....)
        .combineLatest(observable, observable, 
               (BiFunction<String, String, Boolean>) (name, pass) -> {
                             // 对 name 和 pass 一顿判断之后得出一个 boolean
                       return result;
               }
    })
    .subscribe(aBoolean -> {
      // 拿到这个 boolean 就可以去控制 按钮是否可用啦
    })
}

// 以下我没有用 databinding 的双向绑定实现, 用了最普通的代码

//  Activity 监听 name 输入框变化调用此方法
public void setName(String name) {
    nameBehaviorSubject.onNext(name);
}

//  Activity 监听 password 输入框变化调用此方法
public void setPassword(String password) {
    passwordBehaviorSubject.onNext(password);
}

// Activity 初始化的时候就调用此方法订阅 name Observable
// 请注意订阅的这个 Observable 是一个热 [Observable], 不会因为你的订阅而触发信号的产生
public Observable getNameObservable(){
    return nameBehaviorSubject
                        // 去重, 防止死循环
                        .distinct();
}

// Activity 初始化的时候就调用此方法订阅 password Observable
// 请注意订阅的这个 Observable 是一个热 [Observable], 不会因为你的订阅而触发信号的产生
public Observable getPasswordObservable(){
    return passwordBehaviorSubject
                        // 去重, 防止死循环
                        .distinct();
}

用热 [Observable] 代替静态变量和广播

我们 App 中经常会有静态变量, 它是一个好东西, 但是有一个很明显的问题是:

一个静态变量的值的改变有时候一些地方是关心的, 是需要迫切的知道这个静态变量已经被改变了.

这个问题其实可以让访问这个变量的方式改变一下, 静态变量不再是 public, 而是一个 private 的, 为这个变量添加 get set 方法, 让别的地方可以设置监听器来监听这个变量的值. 然后在 set 方法中我们可以发送一个广播或者调用监听器的回调通知外部. 这是完全可行的, 并且是易于维护的.

上面的优化方式使用 RxJava 的热 [Observable] 更为方便. 你只需设计原有的静态变量为

// 热 [Observable]
BehaviorSubject<String> xxxBehaviorSubject = BehaviorSubject.create();

别的地方如果关心这个变量只需订阅即可, 如果后面需要改变变量的值, 只需执行下面的代码发射一个新的信号即可

xxxBehaviorSubject.onNext("newString");

这样子所有关心此数据的地方都会收到通知, 这种方式不仅简写了代码, 而且代替了广播和少设计了监听器.

这种属于比较深入的使用 RxJava 了, 市面上很多的博客其实都是抄来抄去, 讲的都是操作符怎么使用. 其实没啥意思. 很多博客的比喻还都是错误的, 就好比一篇博客中拿 开关灯泡 之间的关系来举例, 说的都是冷 [Observable]的内容, 但是这个例子其实是一个热 [Observable]的典型代表. 你觉得你灯泡订阅了开关 和 开关产生 的信号有关吗?

开关产生信号和我们人什么去触发有关系, 和灯泡的订阅无关!热冷 [Observable] 的我小金子说的最典型的两个例子为:

这两种不用我说谁是冷的谁是热的了吧?自己好好体会去吧.

如何利用 RxJava 写多线程并发

很多人其实用 RxJava 根本不会写并发, 也不知道从哪里看. 其实这和我们平常写代码是一样的, 只不过要转化为 RxJava 的方式. 为什么要用 RxJava 的方式?因为它提供的操作符很简单呀, 我们根本不用例会底层是如何实现的, 也不用管线程之间的消息的传递, 更不用管如果丢弃操作.下面这块我就简明扼要的说关键点, 让大家知道如何使用 RxJava 写并发!

假设现在有一个需求, 我们有 10 个文件, 我们需要把它上传到服务器, 按顺序 拿到文件的地址

需求很简单. 但是不用 RxJava 会很恶心, 因为你需要循环做这些事情, 并且还要创建子线程, 最恶心的是你还要管理每一个子线程成功与否, 都要计数, 最终都回来了之后才能进行排序和下一步的处理

那用 RxJava 怎么做?请记住一点, 任何代码都要学会拆分, 首先我们的 10 个文件我们可以先考虑一个文件是怎么做的, 这个很简单, 顺手就写好了, 很简单:

// 这里描述了一个上传的操作, 入参是一个文件的本地地址, 返回值是一个 Single<String>
// 如果成功就拿到一个 url 地址, 如果失败就收到 error 回调
Single<String> upload(String filePath){
        return Single.just(filePath)
                .map(path -> new File(path))
                // 做一个文件是否存在检查
                .doOnSuccess(file -> {
                    // 如果不存在就抛异常
                    if (!file.exists()) {
                        throw new FileNotFoundException(file.getPath());
                    }
                })
                // 信号转换为另一个 [Observable]
                .flatMap((Function<File, SingleSource<? extends String>>) file -> {
                    // 调用网络请求
                    return Xxx.upload(file);
                });
    }

我们接下去写, 如果让 10 个都异步的跑起来. 我们定义一个类

// 任务上传的类, 方便 RxJava 流程处理的时候一直用一个类
// 不然你中途数据类型变换一定会用到成员变量的
class FileUploadTask {
        public String filePath;
        public int index;
        public String resultUrl;
        public FileUploadTask(String filePath, int index) {
            this.filePath = filePath;
            this.index = index;
        }
}
        String[] files = new String[]{"xxx", "xxx", "xxx"};
                // 转化为 N 个任务类
        FileUploadTask[] tasks = new FileUploadTask[files.length];
        for (int i = 0; i < files.length; i++) {
            tasks[i] = new FileUploadTask(files[i], i);
        }
        Observable.fromArray(tasks)
                // 这一步的操作至关重要, flatMap 操作符能让一个信号转化为一个新的 [Observable]
                    // 然后让这个 [Observable] 在另一个线程上执行即可
                .flatMapSingle((Function<FileUploadTask, 
                                SingleSource<FileUploadTask>>) task -> {
                  
                    final FileUploadTask currentTask = task;
                    return upload(currentTask.filePath)
                            // 切换订阅的线程为一个新的线程
                            // 为什么用 subscribeOn 可以看上面切换线程的介绍
                            // !!!!!!!!!!!!! 这个切换是最重要的 !!!!!!!!!!!!!
                            .subscribeOn(Schedulers.newThread())
                            .map(url -> {
                                currentTask.resultUrl = url;
                                return currentTask;
                            });

                })
                    // 为什么要排序?因为多线程会打乱数据会来的顺序, 如果你不在意顺序, 那就不要排序
                // 这里的排序是倒序还是正序我忘了, 不影响大局, 如果你们发现错了, o1 和 o2 记得换一下
                .sorted((o1, o2) -> o1.index - o2.index)
                // 只要 url 结果
                .map(task -> task.resultUrl)
                // 所有信号收集成为一个 List
                .toList()
                // 切换信号发射的线程到主线程
                .observeOn(AndroidSchedulers.mainThread())
                // 切换订阅的线程为 IO 线程
                .subscribeOn(Schedulers.io())
                .subscribe(urls -> {
                    // 这里拿到的就是所有上传好的 urls
                }, throwable -> {
                    // 就是就是其中发生了错误
                });

上面的代码我写了一个典型的例子, 希望大家能有所理解. 上面的使用 flatMap 操作符让新的 [Observable] 运行在新线程上, 其实还可以先创建多 N 个 [Observable] , 利用上面说过的 merge 操作符.

                String[] files = new String[]{"xxx", "xxx", "xxx"};
                // 先创建多 N 个上传任务的 Observable
        Observable<FileUploadTask>[] uploadTaskObservables = new Observable[files.length];
        for (int i = 0; i < files.length; i++) {
            final FileUploadTask fileUploadTask = new FileUploadTask(files[i], i);
            uploadTaskObservables[i] = upload(files[i])
                    .map(url -> {
                        fileUploadTask.resultUrl = url;
                        return fileUploadTask;
                    })
                    // 切换订阅的线程为一个新的线程
                    // !!!!!!!!!!!!! 这个切换是最重要的 !!!!!!!!!!!!!
                    .subscribeOn(Schedulers.newThread())
                    // 转化为 Observable, 只有 Observable 才能使用 merge
                    .toObservable();
        }
                // 利用 merge 操作符
        Observable.mergeArray(uploadTaskObservables)
                    // 为什么要排序?因为多线程会打乱数据会来的顺序, 如果你不在意顺序, 那就不要排序
                // 这里的排序是倒序还是正序我忘了, 不影响大局, 如果你们发现错了, o1 和 o2 记得换一下
                .sorted((o1, o2) -> o1.index - o2.index)
                // 只要 url 结果
                .map(task -> task.resultUrl)
                // 切换信号发射的线程到主线程
                .observeOn(AndroidSchedulers.mainThread())
                // 切换订阅的线程为 IO 线程
                .subscribeOn(Schedulers.io())
                .subscribe(urls -> {
                    // 这里拿到的就是所有上传好的 urls
                }, throwable -> {
                    // 就是就是其中发生了错误
                });

利用这种也是可以完成的, 好啦, 多线程的演示就到这里结束吧, 希望你们会喜欢. 如果觉得不错请点赞或者评论!!!

上一篇下一篇

猜你喜欢

热点阅读