rxKotlin 响应式编程
前言
rxKotlin :一个在JVM上使用可观测的序列来组成异步的、基于事件的程序的库。
我所理解的 rxKotlin 是一个实现异步操作的库,Android开发过程中将会用到很多异步操作,这种响应式编程的方式能使程序可读性提高,思路清晰,使开发人员能更好地去做代码维护。
为什么推荐RxKotlin以及Kotlin语言
Kotlin是由JetBrains公司最新开发的基于JVM的编程语言,2017 的Google IO 上Kotlin正式成为Android 开发的官方语言。
在我们的日常Android开发过程中,有太多的业务需要用到异步操作,时而开启新线程,时而切回主线程。业务庞大之后,看着自己之前写的业务代码,容易一脸懵逼 +_+ ,如果写了批注估计能好点,要是当时没写批注,估计会瞬间爆炸💥。
先贴一组我分别用java 和 kotlin 写的例子: 读取一组文件夹中png格式的图片,并在UI界面上进行相应的处理操作。
先贴出的是我用java写的..以前我可能还真会这么去写...有时候写着写着代码就直接飞到屏幕外面去了 O(∩_∩)O
final List<File> folders = new ArrayList<>();
new Thread(){
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
Bitmap bitmap = BitmapFactory.decodeFile(file.getAbsolutePath());
runOnUiThread(new Runnable() {
@Override
public void run() {
// 处理 bitmap
}
});
}
}
}
}
}.start();
接下来是用rxKotlin写的相同功能的代码
Observable.from(folders)
.flatMap {
Observable.from(it.listFiles())
}
.filter {
it.name.endsWith(".png")
}
.map {
BitmapFactory.decodeFile(it.absolutePath)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// 处理 bitmap
}
肿么样?是不是感觉瞬间逻辑上就变得清晰了起来?(主要是逼格一下子就上去了🙃)
这就是我为什么推崇kotlin和响应式编程的理由。可能一个人学会一门技能,他可以靠着这项技能吃一辈子的饭,但是,如果他不断更新或加强这项技能,也许他很快就能吃上肉吧。
原理解析
1. 核心:观察者模式
该库的异步实现,是通过扩展的观察者模式来实现的。
什么是观察者模式?用一个通俗点的例子说明。
这里,Button
作为一个被观察的对象,观察者OnClickListenner
监听其点击事件。 当按钮被点击时,触发一个 OnClick
的消息事件,然后传递给 OnClickListener
。如果用观察者模式的方式去定义此次点击事件的话,我会这么说: 观察者 OnClickListener
通过setOnClickListener()
去订阅了 被观察者Button
,一旦Button
被点击,变回触发事件 OnClick
。
换言之,就有了如下的对应关系:
Button ---> 被观察者
OnclickListener ---> 观察者
setOnClickListener() ---> 订阅
OnClick() ---> 事件
2. 实现方式
哦差点忘了,本文中的链式操作如
map
,filter
之类的api在这里我就不一一介绍了,感兴趣的话可以去找一些rxJava的文章看看它们的介绍与用法,一抹多的文章都介绍了它们的基本用法。
1)创建 observer
创建观察者observer
,它将决定事件到来的时候做出什么样的动作。
var observer = object : Observer<String> {
override fun onNext(t: String?) {
Log.e("name",t)
}
override fun onError(e: Throwable?) {
Log.e("error",e.toString())
}
override fun onCompleted() {
Log.e("status","completed!!")
}
}
它里面有三个会掉方法,可以自行定义当事件到来时,执行什么样的响应操作。随着每一个事件到来,正常的话会在回调onNext()
中响应。若所有事件都执行完毕,则会调用onCompleted()
。
2) 创建Observable
Observable
被观察者,它将决定触发什么事件以及事件触发的规则。
var observable = Observable.create<String> {
it.onStart()
it.onNext("jiangyu")
it.onNext("jy")
it.onNext("god")
it.onCompleted()
}
这里,我创建了三个事件,依次是发送"jiangyu","jy","god",这三个字符串,然后调用onCompleted()
作为事件发送完毕标记。
3)Subscribe 订阅
有了观察者和被观察者,使用subscribe
订阅让二者连接起来。
即:
observable.subscribe(observer)
发现没有,这里逻辑上是反的,可能会和思维上有些出入。明显这里的逻辑变成了被观察者去订阅观察者,为什么要和人们的惯性思维背道而驰呢?下面来讲解一下这流式API的工作方式
3. API工作原理
首先放上一张刚才的程序运行截图
运行截图仔细回想一下刚才所创建的观察者和被观察者,看看他们都做了些什么事。我总结性的概括一下 :1.首先被观察者定义了发生的事件以及事件发生的顺序(就是这里的发送字符串的顺序)2.每一个事件到来时,观察者对到来的事件进行处理(这里为打印日志)
以上代码和逻辑分析又可以写成这样一个等价的代码段(除了没有重写onCompleted
),更能方便对于整个过程的理解:
Observable
.just("jiangyu", "jy", "god")
.subscribe {
Log.e("name",it)
}
这是一种API提供的更加简洁的写法,网上对于这种流式操作有着各种理解,有人说这就像是一个发射器,将事件一个一个的发射然后处理。
为了更进一步展示工作原理,我对代码做了如下的变形:
Observable
.just("jiangyu", "jy", "god","000")
.filter {
Log.e("start with j", it)
it.startsWith("j")
}
.map {
Log.e("name", it)
it.toUpperCase()
}
.subscribe {
Log.e("after map name", it.toString())
}
测试结果
结果表明,不是说所有元素执行完filter
过滤再执行map
映射的,意思是流上的事件或元素都沿着链垂直移动。这正是印证了之前所说的,这就好比发射器,将事件一个一个发射出去并依次处理。
线程控制
终于,讲了辣磨多观察者模式的原理,现在正是进入正题!一起来研究rxKotlin响应式编程在Andorid中的使用方式。在Android开发过程中,最重要的无非就是线程间的切换。因此,掌握Scheduler
调度器(线程控制器)的工作方式和使用方法,我认为大概率就可以在Android开发过程中使用rxKotlin掌控雷电了⚡️
先介绍几个常用的API自带的Scheduler:
- Schedulers.newThread(): 总是启用新线程并执行操作
- Schedulers.io(): 主要用于读写文件、网络请求等,功能上和newThread()差不多,但是他是用了无上限线程池,并能够复用空闲的线程。
- AndroidSchedulers.mainThread(): 指定在主线程运行
那我们怎么来对线程进行控制呢?先来看如下代码:
@GET("getUsers")
fun getUsers(@Query("token") token: String):Array<User>
val retrofit = Retrofit
.Builder()
.client(OkHttpClient())
.addConverterFactory(GsonConverterFactory.create())
.build()
val userService = retrofit.create(UserService::class.java)
Observable.from(userService.getUsers("token"))// io 线程
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.filter {
it.age < 18
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// 在UI主线程上操作
}
这里结合了使用Retrofit网络框架,举了一个线程切换的栗子,可能这个栗子有点烂....主要操作就是先获取用户信息,然后过滤得到年龄小于18岁的用户,最后在UI上完成操作。
我将subsribeOn
和 observeOn
的作用域简单的标注了一下:
简单总结一下的话,可以分为如下三点:
- subscribeOn 控制其之前的语句所处线程
- observeOn 控制其之后语句,到下一个observeOn之前的语句所处线程
- observeOn 之后如果再添加subscribeOn会没有任何作用
线程间的切换游刃有余,是什么样的强大原理支撑这样的操作呢?让我们来慢慢研究。首先来普及并解释一下我认为rxKotlin中比较关键的操作 - 变换。
map & flatMap
前面的代码中其实我已经用到很多了,个人呢认为这两兄弟在rxKotlin中是相当重要的。
map:
这里呢我们可以认为他相当于映射,也有那么点意思在里面,但是也不完全是吧。我认为更好的解释是 对于事件对象的变化操作吧,即由一个事件对象转变为另一个事件对象。
Observable
.just(user1, user2, user3)
.map {
it.name
}
.subscribe {
// print 这里打印的是每个user的名字
}
这很好理解,我传入三个user
,对于每一个user
我通过map
来返回他们对应的名字,然后再打印出来。
flatMap:
这个的概念比较难去理解,我把它理解为"铺平"操作。何谓"铺平"操作呢?再来看一个栗子🌰
// User的数据结构
data class User(val name: String, val age: Int, val courses : Array<Course>)
Observable
.from(arrayOf(user1,user2,user3))
.flatMap {
Observable.from(it.courses)
}
.filter {
it.classRoom.equals("528")
}
首先还是将三个user
作为输入并依次处理,对于每一个user
,在flatMap
操作中取出它的courses
,此时并不发送它们,而是激活。接着再初始化一个新的Observable
,相当于将每一个user
对应的courses
这一堆课程再次分发下去。 很难理解......用心体会😄
为了更好的去理解这些操作的原理,先来看看map
的API:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
map
中传入的是一个function
(将T 转换为 R),再通过lift()
方法返回一个Observable<R>
。没错,传入的是一个函数,这属于高阶函数,即传入的参数或返回的参数是一个函数。
然后我们再来看看lift()
的API:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
很有意思的是他在返回的Observable<R>
中有一个OnSubscribe<R>
的一个监听,是对该Observable<R>
的subscibe
的一个监听,只有当发生该事件的subscribe时
,才会触发上述代码中call()
方法里面的操作。仔细回想一下之前我们提到的,为什么事件的处理顺序是一个垂直的方向进行的,这就很好的解释了这个疑惑点。像map
这样的中间操作过程,都相当于是一个模板,它会定义一个事件会进行怎样的变换操作,但是却不会立刻执行,只有等到其监听到subscribe()
方法时才会触发变换。
线程切换的原理也大同小异,同样是用到这个核心的lift()
方法,具体可以自己去参见API中的源码。
总结
研究Kotlin语言 和 rxKotlin、rxJava 我也处于一个起步的阶段,也算是发现了这些新鲜东西的一些亮点之处吧,自己研究了一番然后把研究出来的一些东西总结与分享了出来。希望可以在以后的项目和学习过程中更深入地去理解它的原理。
By the way , 不管是看到第一段就直接跳到这儿的,还是中途睡着的无意间手抖滑到这儿的...
↙️↙️↙️左下角 ❤️ 谢谢
😄😄😄😄😄😄😄😄