kotlin协程

2022-11-22  本文已影响0人  taoyyyy

协程基础

image.png
GlobalScope.launch(Dispatchers.Main) {//开始协程:主线程
   val result = userApi.getUserSuspend("suming")//网络请求(IO 线程)
   tv_name.text = result?.name //更新 UI(主线程)
}

在主线程中创建协程A中执行整个业务流程,如果遇到异步调用任务则协程A被挂起,切换到IO线程中创建子协程B,获取结果后再恢复到主线程的协程A上,然后继续执行剩下的流程。

xxxScope.launch()、runBlocking:T与async

# Builders.common.kt
    
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

Job与Deffered

public interface Job : CoroutineContext.Element {
    //活跃的,是否仍在执行
    public val isActive: Boolean


    //启动协程,如果启动了协程,则为true;如果协程已经启动或完成,则为false
    public fun start(): Boolean
    
    //取消Job,可通过传入Exception说明具体原因
    public fun cancel(cause: CancellationException? = null)
    
    //挂起协程直到此Job完成
    public suspend fun join()
    
    //取消任务并等待任务完成,结合了[cancel]和[join]的调用
    public suspend fun Job.cancelAndJoin()


    //给Job设置一个完成通知,当Job执行完成的时候会同步执行这个函数
    public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
}

Job 还可以有层级关系,一个Job可以包含多个子Job,当父Job被取消后,所有的子Job也会被自动取消;当子Job被取消或者出现异常后父Job也会被取消。具有多个子 Job 的父Job 会等待所有子Job完成(或者取消)后,自己才会执行完成。

public interface Deferred<out T> : Job {
    //等待协程执行完成并获取结果
    public suspend fun await(): T
}

协程作用域

fun launchTest2() {
    print("start")
    //开启一个IO模式的协程,通过协程上下文创建一个CoroutineScope对象,需要一个类型为CoroutineContext的参数
    val job = CoroutineScope(Dispatchers.IO).launch {
        delay(1000)//1秒无阻塞延迟(默认单位为毫秒)
        print("CoroutineScope.launch")
    }
    print("end")//主线程继续,而协程被延迟
}
private suspend fun testSupervisorScope() = supervisorScope {    
    launch { throw IllegalArgumentException("随便抛一个异常") }    
    launch {        
        delay(1000)        
        Log.e("crx", "另一个协程")
    }
}


private suspend fun testCoroutineScope() = coroutineScope {    
    launch { throw IllegalArgumentException("随便抛一个异常") }    
    launch {        
        delay(1000)        
        Log.e("crx", "另一个协程")    
    }
}

//执行testSupervisorScope方法打印的结果
E/crx: 异常信息: 随便抛一个异常
E/crx: 另一个协程
E/crx: 在执行完了Scope之后


//执行testCoroutineScope方法打印的结果
E/crx: 异常信息: 随便抛一个异常
class MainActivity : AppCompatActivity() {


    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)


        btn_data.setOnClickListener {
            lifecycleScope.launch {//使用lifecycleScope创建协程
                //协程执行体
            }
        }
    }
}


class MainViewModel : ViewModel() {
    fun getData() {
        viewModelScope.launch {//使用viewModelScope创建协程
            //执行协程
        }
    }
}

协程异常

当协程作用域中的一个协程发生异常时,此时的异常流程如下所示:

被封装到deferred对象中的异常才会在调用await时抛出。

    private val job: Job = Job()
    private val scope = CoroutineScope(Dispatchers.Default + job)

    private fun doWork(): Deferred<String> = scope.async { throw NullPointerException("自定义空指针异常") }


    private fun loadData() = scope.launch {
        try {
            doWork().await()
        } catch (e: Exception) {
            Log.d("try catch捕获的异常:", e.toString())
        }
    }

Job.cancel 取消任务时会抛出CancellationException 给指定协程,但是不会结构化并发到父协程。

CoroutineExceptionHandler

CoroutineExceptionHandler只能处理当前域内开启的子协程或者当前协程抛出的异常。

supervisorScope 和SupervisorJob

supervisorScope 和 SupervisorJob的原理是:将异常不传播给自己的父协程。

调度器Dispatcher

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {


    //将可运行块的执行分派到给定上下文中的另一个线程上。这个方法应该保证给定的[block]最终会被调用。
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)


    //返回一个continuation,它封装了提供的[continuation],拦截了所有的恢复。
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>


    //CoroutineDispatcher是一个协程上下文元素,而'+'是一个用于协程上下文的集合和操作符。
    public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}

使用方式一:

fun dispatchersTest() {
    //创建一个在主线程执行的协程作用域
    val mainScope = MainScope()
    mainScope.launch {
        launch(Dispatchers.Main) {//在协程上下参数中指定调度器
            print("主线程调度器")
        }
        launch(Dispatchers.Default) {
            print("默认调度器")
        }
        launch(Dispatchers.Unconfined) {
            print("任意调度器")
        }
        launch(Dispatchers.IO) {
            print("IO调度器")
        }
    }
}

使用方式二:

//用给定的协程上下文调用指定的挂起块,挂起直到它完成,并返回结果。
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T


GlobalScope.launch(Dispatchers.Main) {//开始协程:主线程
    val result: User = withContext(Dispatchers.IO) {//网络请求(IO 线程)
        userApi.getUserSuspend("FollowExcellence")
    }
    tv_title.text = result.name //更新 UI(主线程)
}

启动模式

CoroutineStart是一个枚举类,为协程构建器定义启动选项。在协程构建的start参数中使用。

image.png

协程上下文

协程使用以下几种元素集定义协程的行为,它们均继承自CoroutineContext:

suspend本质

//Continuation接口表示挂起点之后的延续,该挂起点返回类型为“T”的值。
public interface Continuation<in T> {
    //对应这个Continuation的协程上下文
    public val context: CoroutineContext


    //恢复相应协程的执行,传递一个成功或失败的结果作为最后一个挂起点的返回值。
    public fun resumeWith(result: Result<T>)
}


//将[value]作为最后一个挂起点的返回值,恢复相应协程的执行。
fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))


//恢复相应协程的执行,以便在最后一个挂起点之后重新抛出[异常]。
fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

Kotlin 使用堆栈帧来管理要运行哪个函数以及所有局部变量。挂起(暂停)协程时,会复制并保存当前的堆栈帧以供稍后使用,将信息保存到Continuation对象中。恢复协程时,会将堆栈帧从其保存位置复制回来,对应的Continuation通过调用resumeWith函数才会恢复协程的执行,然后函数再次开始运行。同时返回Result<T>类型的成功或者异常的结果。

@GET("users/{login}")
suspend fun getUserSuspend(@Path("login") login: String): User

反编译后
public abstract getUserSuspend(Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public interface Continuation<in T> {
    //协程上下文
    public val context: CoroutineContext


    //恢复相应协程的执行,传递一个成功或失败的[result]作为最后一个挂起点的返回值。
    public fun resumeWith(result: Result<T>)
}

协程原理

https://mp.weixin.qq.com/s/nXfweTaOCpm6Bj34rW-wLA 协程的本质和原理:基于CPS( Continuation-Passing-Style Transformation)和状态机

image.png image.png
internal abstract class BaseContinuationImpl(...) {
// 实现 Continuation 的 resumeWith,并且是 final 的,不可被重写
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由编译生成的协程相关类来实现,例如 postItem$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
suspend fun testCoroutine() {
    log("start")
    val user = getUserInfo()
    log(user)
    val friendList = getFriendList(user)
    log(friendList)
    val feedList = getFeedList(friendList)
    log(feedList)
}
fun testCoroutine(completion: Continuation<Any?>): Any? {

    //completion表示运行完testCoroutine挂起函数后需要运行的代码
    class TestContinuation(completion: Continuation<Any?>?) : ContinuationImpl(completion) {
        // 表示协程状态机当前的状态
        var label: Int = 0
        // 协程返回结果
        var result: Any? = null


        // 用于保存之前协程的计算结果
        var mUser: Any? = null
        var mFriendList: Any? = null


        // invokeSuspend 是协程的关键(Continuation#resumeWith中会调用invokeSuspend)
        // 它最终会调用 testCoroutine(this) 开启协程状态机
        // 状态机相关代码就是后面的 when 语句
        // 协程的本质,可以说就是 CPS + 状态机
        override fun invokeSuspend(_result: Result<Any?>): Any? {
            result = _result
            label = label or Int.Companion.MIN_VALUE
            return testCoroutine(this)
        }
    }

    //说明在运行期间只会生成一个Continuation对象
    val continuation = if (completion is TestContinuation) {
        completion
    } else {
        //                作为参数
        //                   ↓
        //1.初次运行
        //2.用一个新的Continuation包装了旧的Continuation
        TestContinuation(completion)
    }


        // 三个变量,对应原函数的三个变量
    lateinit var user: String
    lateinit var friendList: String
    lateinit var feedList: String


    // result 接收协程的运行结果
    var result = continuation.result


    // suspendReturn 接收挂起函数的返回值
    var suspendReturn: Any? = null


    // CoroutineSingletons 是个枚举类
    // COROUTINE_SUSPENDED 代表当前函数被挂起了
    val sFlag = CoroutineSingletons.COROUTINE_SUSPENDED

    when (continuation.label) {
    0 -> {
        // 检测异常
        throwOnFailure(result)


        log("start")
        // 将 label 置为 1,准备进入下一次状态
        continuation.label = 1


        // 执行 getUserInfo
        suspendReturn = getUserInfo(continuation)


        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    1 -> {
        throwOnFailure(result)


        // 获取 user 值
        user = result as String
        log(user)
        // 将协程结果存到 continuation 里
        continuation.mUser = user
        // 准备进入下一个状态
        continuation.label = 2


        // 执行 getFriendList
        suspendReturn = getFriendList(user, continuation)


        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    2 -> {
        throwOnFailure(result)


        user = continuation.mUser as String


        // 获取 friendList 的值
        friendList = result as String
        log(friendList)


        // 将协程结果存到 continuation 里
        continuation.mUser = user
        continuation.mFriendList = friendList


        // 准备进入下一个状态
        continuation.label = 3


        // 执行 getFeedList
        suspendReturn = getFeedList(friendList, continuation)


        // 判断是否挂起
        if (suspendReturn == sFlag) {
            return suspendReturn
        } else {
            result = suspendReturn
            //go to next state
        }
    }


    3 -> {
        throwOnFailure(result)


        user = continuation.mUser as String
        friendList = continuation.mFriendList as String
        feedList = continuation.result as String
        log(feedList)
        loop = false
    }
}

}

协程线程切换原理

https://mp.weixin.qq.com/s/iitYHxn6vPpE_wMsiPoplg

协程的并发处理

https://mp.weixin.qq.com/s/6paEFQDD-lHYMjcWmwZdhw

非阻塞式锁Mutex:拿不到锁时协程就挂起

线程中锁都是阻塞式,在没有获取锁时无法执行其他逻辑,而协程可以通过挂起函数解决这个,没有获取锁就挂起协程,获取后再恢复协程,协程挂起时线程并没有阻塞可以执行其他逻辑。这种互斥锁就是Mutex,它与synchronized关键字有些类似,还提供了withLock扩展函数,替代常用的mutex.lock; try {...} finally { mutex.unlock() }:

image.png

Mutex大致逻辑还是非常清晰的,协程先获取锁,然后执行代码块,然后释放锁,其他协程如果进入,必须先获取锁,获取不到协程执行挂起方法suspend fun lockSuspend(owner: Any?), 加入等待队列,挂起协程。等待其他协程释放锁之后,恢复协程。

整体还是建立在CAS基础上,封装的一套解决方案。

协程的异常处理方式

https://juejin.cn/post/6935472332735512606/

如果协程本身不使用try-catch子句自行处理异常,则不会重新抛出该异常,因此无法通过外部try-catch子句进行处理。

异常会在“Job层次结构中传播”,可以由已设置的CoroutineExceptionHandler处理。 如果未设置,则调用该线程的未捕获异常处理程序。如下代码依然会崩溃

fun main() {

    val topLevelScope = CoroutineScope(Job())

    topLevelScope.launch {

        try {

            launch {

                throw RuntimeException("RuntimeException in nested coroutine")

            }

        } catch (exception: Exception) {

            println("Handle $exception")

        }

    }

    Thread.sleep(100)

}

为了使CoroutineExceptionHandler起作用,必须将其设置在CoroutineScope或顶级协程中, 给子协程设置CoroutineExceptionHandler是没有效果的。

// ...

val topLevelScope = CoroutineScope(Job() + coroutineExceptionHandler)

// ...

// ...

topLevelScope.launch(coroutineExceptionHandler) {

// ...

在代码的特定部分处理异常,可使用try-catch。

全局捕获异常,并且其中一个任务异常,其他任务不执行,可使用CoroutineExceptionHandler,节省资源消耗。

并行任务间互不干扰,任何一个任务失败,其他任务照常运行,可使用SupervisorScope+async模式。

协程的结构化并发

1、父作用域的生命周期持续到所有子作用域执行完;

2、当结束父作用域结束时,同时结束它的各个子作用域;

3、子作用域未捕获到的异常将不会被重新抛出,而是一级一级向父作用域传递,这种异常传播将导致父作用域失败,进而导致其子作用域的所有请求被取消。

上一篇 下一篇

猜你喜欢

热点阅读