[架构基本功]kotlin协程的协议改造
两年前,到微信面试的时候,人家问我懂不懂协程,知不知道里面原理,我当时懵B。一年前,去另外一家公司面试的时候,人家也是这样问我kotlin协程会用吗?我也是无法回答。
如果没有实践过,估计也无法说出个所以来,因为压根不知道究竟他怎么用,使用的时候需要注意什么。
终于近来有一个节点,自己可以去接触协程了,需要写出对接口使用协程的方式扩展。
1.协程
很多人都会讲进程,线程,协程来讨论。
其实就简单的说一下我所理解的吧
进程可以有多个线程,线程可以有多个协程,使用协程其实还是线程切换。
使用协程必定要有作用域(Scope),有一个全局的作用域GlobalScope是供App内全局使用的。
然后需要一个标识上下文的context
说一下协程的优势
1.无需系统内核的上下文切换,减小开销;
2.无需原子操作锁定及同步的开销,不用担心资源共享的问题;
3.单线程即可实现高并发,单核 CPU 即便支持上万的协程都不是问题,所以很适合用于高并发处理,尤其是在应用在网络爬虫中;
需要注意的地方:
GlobalScope调用withContext时无法使用Dispatch.Default,因为默认是一个EmptyCoroutineContext,协程不会运行,并且不会有任何报错。
2.请求 Continuation
大家可以考虑一个场景,当网络协议发送后,通过非堵塞的机制来等待协议结果返回,在不改动原有的协议的使用情况下,加入协程去改造协议处理。
初学者,其实很容易会想到使用一个协程去完成发送,然后使用另外一个协程来完成接收,这样做就可以简单完成操作。
那么有没更优美的编写呢,先给大家看一个简单的代码
Continuation.png
Continuation.resume.png
可以看到通过suspendCoroutine可以创建一个continuation对象,此对象是用于协程结果回调。当suspendCorountine执行完成后,rsp会堵塞等待continuation返回结果,再次执行。代码是堵塞的,然而线程并没有堵塞。只要对应返回的地方使用continuation来完成回调。
这里模拟使用handler.sendMessage来模拟发送,然后通过handler.handleMessage来模拟接收。只会产生一个协程对象,任何的协议结果处理后通过continuation.resume就可以返回成功的结果到rsp去。
3.suspend 泛型 内联
需要注意的是,协程域里面,全部都需要声明为suspend fun的形式,提示是协程的方法,程序执行挂起的时候估计是需要特殊的标记。
协程编写泛型的形式和java差距不是很大,但是需要注意的是,使用了协程包含了泛型对象,使用is判断,会提示你,需要使用内联。
inline.png
使用内联那么私有变量全部都需要变为public, 而T会被转变为 reified T。其实到编译阶段内联的T泛型是类型是确定的,编译系统会将其替换掉。
而且有使用内联,那么方法无法声明为接口方法,产生很大的局限。
基本来说我们确定类型,直接强转T就可以了。
4.广播 channel received 标记
协议也并非只有请求接收,特别如果是使用socket,那么你肯定是能有接收广播的情况。而上面使用协程continuation只能模拟出请求和接收的情况,那是否有办法接收些成广播呢?
这里可以使用
private val channel = Channel<Ent>()
fun send() {
async {
withContext(coroutineContext) {
val obj = ChildEnt()
obj.name = "协程广播"
obj.count = 4
channel.send(obj)
// channel.close()
}
}
}
fun <T : Ent> register(callback: CoroutinesCallback<T>): Job {
return async {
withContext(coroutineContext) {
for (ent in channel) {
callback.block.invoke(ent as T)
}
}
}
}
这里需要使用channel,使用一个协程来做发送,另外一个协程需要来接收。
如果你使用channel.receive()只能接收到一条数据,这里使用,in channel的方式可以一直监听到channel.send的数据。
当然如果确定通道不可用,要使用channel.close关闭通道。
5.java调用协程
如果你使用java的代码,你会发现无法使用协程,无法使用域声明。
那怎么怎么才能调用协程?
java中还是能声明域对象以及CoroutineContext上下文对象的,那么只能传输作用域,context,以及使用的回调的方法来做处理。
class CoroutinesCallback<T : IEnt>(
var scope: CoroutineScope,
var context: CoroutineContext,
var block: (suspend (T) -> Unit),
var error: (suspend (Exception) -> Unit)? = null
)
override fun <T : IEnt> sendAsCoroutineAsync(
rspClass: Class<T>,
scope: CoroutineScope,
context: CoroutineContext,
s: (T) -> Unit,
e: ((Exception) -> Unit)?
): Deferred<Unit?>? {
return sendAsCoroutineAsync(rspClass,
CoroutinesCallback(scope, context, {
s.invoke(it)
}, {
e?.invoke(it)
}))
}
java传输这些可以声明的对象,再通过kotlin转包一层。那为何不让外层直接传入一个CoroutinesCallback回调对象就可以呢?
java是无法办法初始化suspend的初始方法的,这就非常尴尬了。折中的方法,只能使用suspend block的再包一层普通block的方法,而普通block s: (T) -> Unit可以对应java中的Function1<T, Unit> s的方法。
6.协程的回收
当然是需要考虑协程的回收的,特别在外Activity生命周期结束后,才到达协程结果返回,如果你只是封装消息外抛或者不在主线程还好,不然就很有可能造成崩溃了。
协程域使用async的方法会传回一个Deffered<T>的对象,和Job类似,可以通过这个对象cancel的方法可以完成释放,自己挑选时机就好。
想要更加智能,参照rxjava的处理,是需要绑定lifecycle,改造的时候也是这样做的。新版本的lifecycle加入了对协程的支持,直接是有lifecycle CoroutineScope,执行的时候,直接使用这个域就非常安全了。旧版的lifecycle并没有,那这时候绑定释放就只能自己编写了。
class LifecycleCoroutineListener(
private val job: Job, private val cancelEvent: Lifecycle.Event =
Lifecycle.Event.ON_DESTROY
) : LifecycleObserver {
@OnLifecycleEvent(Lifecycle.Event.ON_PAUSE)
fun pause() = handleEvent(Lifecycle.Event.ON_PAUSE)
@OnLifecycleEvent(Lifecycle.Event.ON_STOP)
fun stop() = handleEvent(Lifecycle.Event.ON_STOP)
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun destroy() = handleEvent(Lifecycle.Event.ON_DESTROY)
private fun handleEvent(e: Lifecycle.Event) {
if (e == cancelEvent && !job.isCancelled){
job.cancel()
}
}
}
//使用的时候参数传入lifecycle,然后完成绑定
lifecycle?.addObserver(LifecycleCoroutineListener(j))
这里还有优化的地方,协程域上下文CoroutineContext是带有isActive的方法的。通过封装extendsion的方法,来对Continuation回调时先对存活判断
private fun <T> Continuation<T>.resumeIfActive(value: T) {
if (this.context.isActive) {
resume(value)
}
}
7.协程的异常处理
上面介绍了continuation的对象,使用resume可以返回结果到挂起的等待的地方,如果失败了的情况,可以放回resumeWithException的方法来返回Exception内容到接收处,但是这里需要try catch来获得Exception。
8.线程池问题
协程自身也是会开通线程池的,如果本来就有rxjava的一套代码,无疑会增加线程数量的。有没很好的方法规避呢,可以选择和rxjava公用线程池。
object XXDispatchers {
/**
* 后台任务分发器, 使用的线程池与 Schedulers.computation() 一样
*/
@JvmStatic
val Default: CoroutineDispatcher = Schedulers.computation().asCoroutineDispatcher()
/**
* 主线程
*/
@JvmStatic
val Main: CoroutineDispatcher = Dispatchers.Main
/**
* 协程挂起后恢复回到的线程, 与最后挂起函数运行时所在的线程相同. 即与 Dispatchers.Unconfined 相同
*/
@JvmStatic
val Unconfined: CoroutineDispatcher = Dispatchers.Unconfined
/**
* IO任务分发器, 使用的线程池与 Schedulers.io() 一样
*/
@JvmStatic
val IO: CoroutineDispatcher = Schedulers.io().asCoroutineDispatcher()
}
最后的提醒,使用协程一定是需要作用域和上下文的,并且要考虑释放等问题。暂时并没有像rxjava一样链式调用那么方便
如果有更优化的方案,可以再评论区评论,我会认真跟进。
两个群号都可以加入,群2群号763094035,我在这里期待你们的加入!!!
image群1号是316556016。
image