Kotlin从入门到放弃Android 开发经验集Android开发经验谈

使用Kotlin协程撸一个简易异步调用库

2017-11-10  本文已影响733人  027f63d16800

由于android限制了只能在UI线程更新视图,而在UI线程中做耗时任务又会导致ANR,因此在平时的开发中,需要将耗时的数据请求工作放到子线程中执行,而视图更新工作放到UI线程中,使用传统的handler或者asyncTask,需要将逻辑分到多个函数内

使用kotlin的协程机制,可以用同步的方式实现异步
kotlin的协程机制是基于状态机模型和C-P-S风格实现的。
一个协程通过resume启动,当协程内部调用supended函数时,协程会被暂停,通过调用 resume可以再次启动协程。每次暂停都会修改协程的状态,再次启动协程时,会从新的状态处开始执行。

现在通过kotlin的基础api实现一个简单的异步调用接口,最后的效果如下:

btn.setOnClickListener {
           runOnUI {  
               //执行在主线程,可以做一些初始化操作                         
               Log.e("log", Thread.currentThread().name)
               var used = async {               //从工作线程直接返回数据到主线程
                  //切换到工作线程执行,而且lambda可以直接访问外部变量,构成闭包
                   Log.e("log", Thread.currentThread().name)
                   var start = System.currentTimeMillis()
                   Thread.sleep(3000)
                   System.currentTimeMillis() - start
               }
               //继续执行在主线程
               Log.e("log", Thread.currentThread().name)
               Toast.makeText(this@MainActivity, "后台线程用时${used}ms", Toast.LENGTH_SHORT).show()
           }
       }

在后续的内容中,我将在实现的过程中逐步分析kotlin协程机制的基本原理

首先声明一个创建协程的函数:

//该函数接收一个 suspend类型的lambda
inline fun runOnUI(noinline block: suspend () -> Unit) {
    var continuation = object : Continuation<Unit> {
      //ThreadSwitcher是ContinuationInterceptor的子类,用于在协程resume时切换到主线程执行
        override val context: CoroutineContext
            get() = ThreadSwitcher()  

        override inline fun resume(value: Unit) = Unit

        override inline fun resumeWithException(exception: Throwable) = Unit
    }
        //使用suspend类型的lambda创建一个协程并启动
        block.createCoroutine(continuation).resume(Unit)
}

createCoroutine是官方提供的一个基础api,该函数如下:

public fun <T> (suspend () -> T).createCoroutine(
        completion: Continuation<T>
): Continuation<Unit> = SafeContinuation(createCoroutineUnchecked(completion), COROUTINE_SUSPENDED)

可以看到调用了createCoroutineUnchecked创建一个Coroutine,继续查看该方法:

@SinceKotlin("1.1")
@kotlin.jvm.JvmVersion
public fun <T> (suspend () -> T).createCoroutineUnchecked(
        completion: Continuation<T>
): Continuation<Unit> =
//这里的this是执行createCoroutine函数的block
        if (this !is kotlin.coroutines.experimental.jvm.internal.CoroutineImpl)
            buildContinuationByInvokeCall(completion) {
                @Suppress("UNCHECKED_CAST")
                (this as Function1<Continuation<T>, Any?>).invoke(completion)
            }
        else
//编译时,block会被编译成一个CoroutineImpl的子类,所以走这个分支
            (this.create(completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade

查看编译之后生成的block

//查看在Activity#onCreate调用runOnUI处传入的lambda的编译类
final class ymc/demo/com/asyncframe/MainActivity$onCreate$1$1 
          extends kotlin/coroutines/experimental/jvm/internal/CoroutineImpl   
          implements kotlin/jvm/functions/Function1  {      //lambda编译类都实现FunctionN函数
  ...
}

可以看到传入runOnUIlambda确实被编译成了一个CoroutineImpl,这是因为编译器推断出了这个lambdasuspend类型的。

继续上面的分析,创建协程所涉及到的两个方法中都出现了 Continuation这个类,那么这个类是干嘛的呢?
首先,先看看completion,这个是我们调用createCoroutine手动传入的,当协程结束时,他的resume会被调用,当协程异常结束时,他的resumeWithException会被调用。
再看看createCoroutineUnchecked,这个函数也返回了一个Continuation,那么这个又是什么呢?

 (this.create(completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade

可以看到,返回的是CoroutineImplfacade,那这个又是什么呢?
我们进入CoroutineImpl,可以看到

abstract class CoroutineImpl(
        arity: Int,
        @JvmField
        protected var completion: Continuation<Any?>?
) : Lambda(arity), Continuation<Any?> {     //Coroutine本身是一个Continuation

  override val context: CoroutineContext
          get() = _context!!

  private var _facade: Continuation<Any?>? = null
 
  val facade: Continuation<Any?> get() {
          if (_facade == null) _facade = interceptContinuationIfNeeded(_context!!, this)
          return _facade!!
      }
  ...
}

原来这是一个代理属性,接着查看interceptContinuationIfNeeded

internal fun <T> interceptContinuationIfNeeded(
        context: CoroutineContext,
        continuation: Continuation<T>
) = context[ContinuationInterceptor]?.interceptContinuation(continuation) ?: continuation

这个函数从Coroutine的上下文中查找ContinuationInterceptor,如果有就调用他的interceptContinuation对传入的continuation进行包装,否则直接返回传入的continuation

Continuation是一个可继续执行体的抽象,每个Coroutine都是一个可继续执行体,Continuation是一个协程对外的接口,启动/恢复协程的resume就是在该接口中定义的。
协程可以是链式连接的,一个协程可以有子协程,子协程持有父协程的引用,当子协程执行时,父协程暂停,子协程结束时,内部通过调用父协程的resume返回父协程。

还记得我们前面用到的ThreadSwitcher吗,他就是一个ContinuationInterceptor
我们来看看来看ThreadSwitcher的实现:

/**
Interceptor用于用于拦截并包装Continuation,让我们有机会在协程resume前做一些额外的操作,比如线程切换
**/
class ThreadSwitcher : ContinuationInterceptor, AbstractCoroutineContextElement(ContinuationInterceptor.Key) {

    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
            = object : Continuation<T> by continuation {

        override fun resume(value: T) {
          //如果在主线程,直接执行
            if (Looper.getMainLooper() === Looper.myLooper()) {
                continuation.resume(value)
            } else {
            //否则,使用handler机制post到主线程执行
                postman.post {
                    resume(value)
                }
            }
        }

        override fun resumeWithException(exception: Throwable) {
            if (Looper.getMainLooper() === Looper.myLooper()) {
                continuation.resumeWithException(exception)
            } else {
                postman.post {
                    resumeWithException(exception)
                }
            }
        }
    }
}

从上面的分析中,我们可以想象,我们创建的协程会被ThreadSwitcher包装,

block.createCoroutine(continuation).resume(Unit)

createCoroutine返回的实际是ThreadSwitcher返回的Continuation,所以当我们执行resume启动协程时,会先切换到主线程执行。

紧接着,我们来实现async

suspend inline fun <T> async(crossinline block: () -> T): T
        = suspendCoroutine {
//dispatcher是一个对线程池的封装,将任务分发到子线程中
    dispatcher.dispatch {
        it.resume(block())
    }
}

使用suspend修饰的方法只可以在协程内部调用,而suspendCoroutine方法是kotlin提供的一个基础api,用于实现暂停协程。
我们接着来分析suspendCoroutine,查看他的实现:

public inline suspend fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
        suspendCoroutineOrReturn { c: Continuation<T> ->
            val safe = SafeContinuation(c)
            block(safe)
            safe.getResult()
        }

可以看到这个方法接收的block是带Continuation参数的
真正实现功能的是suspendCoroutineOrReturn,当我们继续跟进时,发现:

public inline suspend fun <T> suspendCoroutineOrReturn(crossinline block: (Continuation<T>) -> Any?): T =
        throw NotImplementedError("Implementation is intrinsic")

what!直接抛出异常了???
这是因为这是一个特殊的函数,需要编译器特殊处理,他需要将当前协程内的_facade属性,包装成SafeContinuation,再作为我们传入的block的参数,而且这个_facade是经过ContinuationInterceptor处理过的,也就是说当我们调用resume恢复线程时,会先切换到主线程。
为了验证上面的分析,我们查看async编译之后的字节码:

//可以看到编译之后,我们的async多了一个Continuation类型的参数
 private final static async(Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
   L0
    LINENUMBER 70 L0
    NOP
   L1
    LINENUMBER 77 L1
    ICONST_0
    INVOKESTATIC kotlin/jvm/internal/InlineMarker.mark (I)V
    ALOAD 1  //将第二个参数,也就是Continuation入栈
//调用CoroutineIntrinsics.normalizeContinuation 
    INVOKESTATIC kotlin/coroutines/experimental/jvm/internal/CoroutineIntrinsics.normalizeContinuation (Lkotlin/coroutines/experimental/Continuation;)Lkotlin/coroutines/experimental/Continuation;  
//将返回值存到slot3
    ASTORE 3
   L2
    LINENUMBER 78 L2
//new 一个SafeContinuation
    NEW kotlin/coroutines/experimental/SafeContinuation
    DUP  
  //将刚刚normalizeContinuation返回的continuation传入SafeContinuation的构造函数
    ALOAD 3
    INVOKESPECIAL kotlin/coroutines/experimental/SafeContinuation.<init> (Lkotlin/coroutines/experimental/Continuation;)V
    ASTORE 4
   L3
  ...

我们可以看到,编译之后的字节码已经没有了suspendCoroutinesuspendCoroutineOrReturn的身影,因为这两个函数都是inline函数。
我们接着来看CoroutineIntrinsics.normalizeContinuation的实现:

fun <T> normalizeContinuation(continuation: Continuation<T>): Continuation<T> =
        (continuation as? CoroutineImpl)?.facade ?: continuation

还记得我们刚刚分析过facade这个属性吗?他是对_facade的代理,这个函数返回的是经过拦截器处理过的Continuation
根据刚刚的字节码,我们可以发现suspend类型的函数,都会隐式额外接受一个当前协程的引用,但是又不能在函数中直接访问。

最后,还有两个上文出现过的线程切换处理类,postmandispatcher,使用的是单例模式:

object postman : Handler(Looper.getMainLooper()) {
    override fun handleMessage(msg: Message?) {
        msg?.callback?.run()
    }
}

object dispatcher {
    val mCachedThreads = Executors.newCachedThreadPool()
    inline fun dispatch(noinline block: () -> Unit) {
        mCachedThreads.execute(block)
    }
}

到此,我们实现了一个简易的异步调用库!

上一篇 下一篇

猜你喜欢

热点阅读