继续来,同我一起撸Kotlin Channel 深水区

2023-10-22  本文已影响0人  进击的老六

前言

本篇文章将着重分析协程间的通信方式。通过本篇文章,你将了解到:

  1. Channel的引入及简单使用
  2. Channel的原理
  3. Channel四种类型深入解析
  4. produce/actor的使用与原理

1. Channel的引入及简单使用

初级版协程间通信

先来看一个简单的通信Demo:

    fun testChannel() {
        //协程1
        var deferred= GlobalScope.async {
            //假装在加工数据
            Thread.sleep(2000)
            "Hello fishforest"
        }
        //协程2
        GlobalScope.launch {
            var result = deferred.await()
            println("get result from coroutine1: $result")
        }
    }

如上,协程2拿到了协程1的值,这就是一次简单的协程间通信过程。
现在需求变了,协程1一直在生产数据,协程2也需要不断地从中取数据,此时靠async/await 配合无能为力了。当然,我们很容易想到的方案是:

共享一个变量,这个变量可以是个队列。

于是Demo改造如下:

    fun testChannel2() {
        //阻塞队列
        var queue = ArrayBlockingQueue<String>(5)
        //协程1
        GlobalScope.launch {
            var count = 0
            while (true) {
                //假装在加工数据
                Thread.sleep(1000)
                queue.put("fish ${count++}")
            }
        }

        //协程2
        GlobalScope.launch {
            while (true) {
                Thread.sleep(1000)
                println("get result from coroutine1:${queue.take()}")
            }
        }
    }

通过阻塞队列,当协程2取数据时,如果队列是空,那么等待协程1往队列里放数据;当协程1放数据时,如果队列满了,那么等待协程2从队列里取出数据。如此,就是简单的协程通信。
看似美好,实际上此处有个很大的漏洞:

队列满/队列空 时,此时等待动作阻塞的是线程,而我们知道协程的挂起并不阻塞线程,因此此种方式并没有利用到协程的优势。

我们期望协程发现队列满/空时将自己挂起等待,此时就引入了Channel。

升级版协程间通信-Channel

同样的需求,我们用Channel 实现:

    fun testChannel3() {
        //定义Channel
        var channel = Channel<String>()
        //协程1
        GlobalScope.launch {
            var count = 0
            while (true) {
                //假装在加工数据
                Thread.sleep(1000)
                var sendStr = "fish ${count++}"
                println("send $sendStr")
                channel.send("$sendStr")
            }
        }

        //协程2
        GlobalScope.launch {
            while (true) {
                Thread.sleep(1000)
                println("receive:${channel.receive()}")
            }
        }
    }

与之前的实现方案相比,仅仅只是将队列换成了Channel,可以看出,Channel 和队列比较类似,而Channel的send/recevie 函数并没有阻塞线程,仅仅只是挂起了协程。
查看打印结果:

你可能发现了端倪:发送者和接收者是成对出现的,难道Channel的内部实现不是队列?
要想解开这个谜题,最好的方法是从源码入手深究其原理。

2. Channel的原理

Channel的构造

先从Channel 构造开始:

#Channel.kt
public fun <E> Channel(
    //Channel 容量/叫做Channel类型更合理一些
    capacity: Int = Channel.RENDEZVOUS,
    //缓冲区满后,发送端的处理方式
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    //信息没有传递出去时的回调
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        //默认是约会模式
        Channel.RENDEZVOUS -> {
            //默认挂起
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
            else
                ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
        }
        //...
    }

此处的Channel() 并不是构造函数,而是顶层函数,Kotlin里有很多伪装为构造函数的顶层函数。该顶层函数默认构造并返回RendezvousChannel类型的Channel。
RendezvousChannel 类本身很简单,就重写了一些属性,它继承自AbstractChannel。

重点在AbstractChannel/AbstractSendChannel及其子类里。

Channel的队列结构

AbstractSendChannel 里有个很重要的成员变量:

    protected val queue = LockFreeLinkedListHead()

LockFreeLinkedListHead 继承自LockFreeLinkedListNode,而这个Node 我们在分析Kotlin 协程之取消与异常处理探索之旅(上) 有提及过,此处再拎出来说说。
先看其定义:

#LockFreeLinkedList.kt
public actual open class LockFreeLinkedListNode {
    //后驱指针
    private val _next = atomic<Any>(this) // Node | Removed | OpDescriptor
    //前驱指针
    private val _prev = atomic(this) 
    //...
}

很典型的一个链表结构,并且是无锁链表,意思是它的插入/删除是无需上锁的,核心是使用了CAS。 回到Channel里的成员变量queue,初始的链表结构如下:

可以看出,当前节点的_next、_prev分别指向自己。

当往链表里面添加Node时,形成如下结构:

链表头为固定节点,通过它构造了双向循环链表。
AbstractSendChannel 里的queue 就是个链表头,通过它我们可以快速找到链表里的第1个节点(_next 指向的节点),也可以快速找到链表的最后一个节点(_prev指向的节点)。
于是形成了一个队列结构,每次往队列里放入数据,就放到链表的尾部,每次从队列里取数据,就从链表头后的第一个节点取。

Channel的send/receive

send 分析

#AbstractChannel.kt
    public final override suspend fun send(element: E) {
        //快速判断是否可以放入queue 队列
        //若能成功,则直接返回
        if (offerInternal(element) === kotlinx.coroutines.channels.OFFER_SUCCESS) return
        //不能退出,则挂起协程
        return sendSuspend(element)
    }

    protected open fun offerInternal(element: E): Any {
        while (true) {
            //先找到队列第一个Node节点,如果存在并且是Receive 类型,说明有接收者在等待
            val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
            //给接收者协程赋值
            val token = receive.tryResumeReceive(element, null)
            if (token != null) {
                kotlinx.coroutines.assert { token === RESUME_TOKEN }
                //重新恢复接收者协程
                receive.completeResumeReceive(element)
                //返回插入的结果
                return receive.offerResult
            }
        }
    }

    private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
        //suspendCancellableCoroutineReusable 里有挂起协程的逻辑
        loop@ while (true) {
            if (isFullImpl) {
                //构造SendElement,它是Node类型 
                val send = if (onUndeliveredElement == null)
                    //SendElement 有两个成员变量:1是具体的值,2是当前协程的封装体cont
                    SendElement(element, cont) else
                    SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
                //将Element 加入到队列尾部
                val enqueueResult = enqueueSend(send)
                //插入成功,则返回
            }
        }
    }

用图表示以上流程:

接收者协程被恢复后,重新调度执行协程,而传入的值即为send发送的值,最终recevie返回的即是send过来的值。
对协程的挂起有疑惑请移步:讲真,Kotlin 协程的挂起没那么神秘(原理篇)

receive 分析
与send流程类似,就不贴代码了,仅用图表示:

可以看出,send/receive 通过判断queue的状态来决定是否挂起当前协程,而queue里的Node 又分为三种类型:

[图片上传失败...(image-51eac8-1698040950561)]

综合以上得出:

在RENDEZVOUS类型(默认类型)下,发送者协程需要等待接收者就位了(到队列里等待)才会继续往下走。同样的,接收者协程需要等待发送者就位了(到队列里等待)才会继续往下走。因此,形成的现象是发送者/接收者成对出现。

成对出现的场景,我们称RENDEZVOUS 为约会类型。

3. Channel四种类型深入解析

CONFLATED 类型

前面的分析是基于约会类型,实际上Channel还有其它类型,通过其构造过程可看出:

#AbstractChannel.kt
public fun <E> Channel(
    capacity: Int = Channel.RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        //约会类型
        Channel.RENDEZVOUS -> {
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
            else
                //转为缓冲类型
                ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
        }
        //混合类型
        Channel.CONFLATED -> {
            //此种类型下必须是挂起模式
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedChannel(onUndeliveredElement)
        }
        //无限制类型
        Channel.UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
        //缓冲类型
        Channel.BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
            if (onBufferOverflow == BufferOverflow.SUSPEND) Channel.CHANNEL_DEFAULT_CAPACITY else 1,
            onBufferOverflow, onUndeliveredElement
        )
        //没有指定具体类型是以上4种内的组合
    }

先看CONFLATED(混合)类型。
ConflatedChannel 继承自AbstractChannel,有个成员变量:value。
重点来看其重写的函数:offerInternal与pollInternal,分别对应send与receive的逻辑。

send 分析

#ConflatedChannel.kt
protected override fun offerInternal(element: E): Any {
    var receive: ReceiveOrClosed<E>? = null
    //先上锁
    lock.withLock {
        //如果value 为空,也就是之前没有发送过,说明可能有接收者在等待。
        if (value === kotlinx.coroutines.channels.EMPTY) {
            loop@ while(true) {
                //尝试取出接收者
                receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
                if (receive is Closed) {
                    //是关闭Node,直接返回
                    return receive!!
                }
                //赋值给接收者协程
                val token = receive!!.tryResumeReceive(element, null)
                if (token != null) {
                    //跳出锁
                    return@withLock
                }
            }
        }
        //更新发送值到value里
        updateValueLocked(element)?.let { throw it }
        //成功插入
        return OFFER_SUCCESS
    }
    //如果找到接收者,则恢复接收者协程
    receive!!.completeResumeReceive(element)
    return receive!!.offerResult
}

receive 分析

#ConflatedChannel.kt
protected override fun pollInternal(): Any? {
    var result: Any? = null
    //上锁
    lock.withLock {
        //如果value 为空,说明没数据,取数据失败
        if (value === kotlinx.coroutines.channels.EMPTY) return closedForSend ?: POLL_FAILED
        //从value 里取数据
        result = value
        //恢复到无数据状态
        value = EMPTY
    }
    return result
}

由此可见:

在 CONFLATED类型下,发送者无需等待接收者就位,它可以一直更新数据。

BUFFERED 类型

此为缓冲类型,与其它类型最大的不同之处在于它内部有数据缓冲区。
ArrayChannel 继承自AbstractChannel,其成员变量:

//数据缓冲区
    private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)).apply { fill(EMPTY) }

重点函数还是offerInternal与pollInternal。
send 分析

#ArrayChannel.kt
protected override fun offerInternal(element: E): Any {
    var receive: ReceiveOrClosed<E>? = null
    lock.withLock {
        //size 为buffer 当前的实际存储数据的个数
        val size = this.size.value
        //更新size,此处根据发送策略,有可能会直接退出
        updateBufferSize(size)?.let { return it }
        if (size == 0) {
            //当前缓冲区没有数据
            loop@ while (true) {
                //查看是否有接收者等待
                receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
                //给接收者协程赋值
                val token = receive!!.tryResumeReceive(element, null)
                if (token != null) {
                    //缓冲区数量不变
                    this.size.value = size
                    return@withLock
                }
            }
        }
        //加入到缓冲队列
        enqueueElement(size, element)
        //插入成功
        return OFFER_SUCCESS
    }
    //恢复接收者协程
    receive!!.completeResumeReceive(element)
    return receive!!.offerResult
}

private fun updateBufferSize(currentSize: Int): Symbol? {
    if (currentSize < capacity) {
        //还可以继续存放数据
        size.value = currentSize + 1 // tentatively put it into the buffer
        return null // proceed
    }
    //缓冲区满
    return when (onBufferOverflow) {
        //协程需要挂起
        BufferOverflow.SUSPEND -> OFFER_FAILED
        //舍弃最新数据,相当于发送永远是成功的
        BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
        //舍弃旧的数据,发送继续走下面的流程
        BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
    }
}

receive 分析
receive 过程与send类似,就不贴源码了,直接上图对比:

可以看出,对于发送者来说:

先将数据放入数据缓冲队列,当缓冲区满后才会考虑是否需要挂起发送者协程。同样的,对于接收者来说,先从缓冲队列取数据,当缓冲区没数据时才会挂起自身。

UNLIMITED 类型

此类型为无限制类型,网上一些文章将此与BUFFERED类型类比,并归为“无限缓冲类型”,该说法是否正确,接下来一步步印证。
同样的,LinkedListChannel继承自AbstractChannel。
重点函数还是offerInternal与pollInternal。
send 分析

#LinkedListChannel.kt
protected override fun offerInternal(element: E): Any {
    while (true) {
        //快速查找是否有接收者等待
        val result = super.offerInternal(element)
        when {
            //找到接收者,插入算是成功
            result === OFFER_SUCCESS -> return OFFER_SUCCESS
            //没找到
            result === OFFER_FAILED -> {
                //加入到协程缓冲队列
                when (val sendResult = sendBuffered(element)) {
                    null -> return OFFER_SUCCESS
                    is Closed<*> -> return sendResult
                }
            }
        }
    }
}

protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
    //将SendBuffered 加入到queue里(队尾)
    queue.addLastIfPrev(AbstractSendChannel.SendBuffered(element)) { prev ->
        if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
        true
    }
    //添加成功
    return null
}

receive 分析 receive 过程完全依靠父类AbstractChannel完成,此处就不再赘述,用图表示:

可以看出:

此类型下,发送者不会挂起,会一直往队列里存放数据,理论上是可以无限制存放的。与BUFFERED类型不同的是,UNLIMITED 缓冲数据使用的是queue,它是链表。而BUFFERED 缓冲数据使用的是数组。

Channel 四种类型比对

对于接收者来说,只有一种逻辑:

有数据则消费数据,没数据则挂起等待。

4. produce/actor的使用与原理

使用

通过上面的分析,我们知道接收者有可能会阻塞,怎样才能让接收者知道数据已经发送完毕了呢?
答案是:Channel.close()。
当调用该函数时,会往queue里加入Closed节点,当send/receive 取出该节点时就知道Channel关闭了。
你说,能不能不手动调用该函数呢?刚好,produce可以解决该问题:

    fun testProduce() {
        //返回接收者
        var receiveChannel = GlobalScope.produce<String> {
            //
            for (x in 1..5) {
                var sendStr = "fish $x"
                println("send $sendStr")
                send("$sendStr")
            }
        }
        //接收数据
        GlobalScope.launch {
            while (true) {
                println("job2 receive:${receiveChannel.receive()}")
            }
            println("job2 end")
        }
        GlobalScope.launch {
            while (true) {
                println("job3 receive:${receiveChannel.receive()}")
            }
            println("job3 end")
        }
    }

produce 函数返回Channel,在produce的协程体里可以发送数据,而通过返回的Channel,其它协程可以接收数据。当produce协程执行完毕后,将会主动调用close关闭Channel,其它Receive的Channel就会有感知,从而退出挂起状态。
这是一个典型的单生产者--多消费者的模型。
反之单消费者--多生产者的模型如下:

    fun testActor() {
        //返回发送者
        var sendChannel = GlobalScope.actor<String> {
            //
            for (x in 1..5) {
                println("job1 receive:${receive()}")
            }
            println("actor end")
        }
        //发送者
        GlobalScope.launch {
            sendChannel.send("send from job2")
        }
        GlobalScope.launch {
            sendChannel.send("send from job3")
        }
    }

原理

produce和actor内部创建了RENDEZVOUS 类型的Channel,它们返回的Channel以及协程体里的Channel都是委托这个内部的Channel来完成功能的,并且Channel绑定了协程的生命周期,当协程取消时将会取消Channel。(由于篇幅原因,就不展开细说了,有兴趣可以自行阅读源码或是留言)。

更多Kotlin可以查看我的个人介绍!!!

上一篇下一篇

猜你喜欢

热点阅读