Kotlin

Kotlin协程实现 CountDownLatch

2022-09-07  本文已影响0人  ZuYuan
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.resume
import kotlin.math.max


/**
 * 使用mutex或者Semaphore来实现,释放的时候每个Coroutine都要执行一遍获取锁&释放锁的操作,比较慢
 * 下面这个类实现了批量地恢复Coroutine
 *
 * Mutex issue: https://github.com/Kotlin/kotlinx.coroutines/issues/2371
 */
internal class SuspendCountDownLatch {

  private val countDownNumber: AtomicInteger

  //哨兵节点
  private val firstContinuationNode = ContinuationNode(null)

  @Volatile
  private var lastContinuationNode = firstContinuationNode

  constructor(count: Int) {
    countDownNumber = AtomicInteger(count)
  }

  fun getLockedCount() = max(0, countDownNumber.get())

  fun isLocked() = countDownNumber.get() > 0

  fun countDown() {
    if (!isLocked()) return

    if (countDownNumber.decrementAndGet() == 0) {
      val firstContinuableNode = releaseAllContinuableNodes()
      firstContinuableNode?.let { resumeContinuableNodes(it) }
    }
  }

  suspend fun await() {
    if (isLocked()) awaitSlowPath()
  }

  //CancellableContinuationImpl 能够根据resumeMode+协程上下文进行恢复, 并且在协程已取消时不会造成Crash
  private suspend fun awaitSlowPath() = suspendCancellableCoroutine<Unit> { cont ->
    val newContinuationNode = ContinuationNode(cont)
    //抢占链表尾
    while (!tryAddToQueueTail(newContinuationNode)) {
      if (!isLocked()) {
        cont.resume(Unit)
        return@suspendCancellableCoroutine
      }
    }
    //确保其它线程可以接着插入节点
    lastContinuationNode = newContinuationNode
  }

  private fun releaseAllContinuableNodes(): ContinuationNode? {
    val emptyNode = ContinuationNode(null)
    //占位,确保不会有新的节点进入链表
    while (!tryAddToQueueTail(emptyNode)) {
      //...
    }

    if (firstContinuationNode.nextRef.get() == emptyNode) {
      //没有可恢复的节点
      return null
    }

    //第一个是哨兵节点
    val firstContinuableNode = firstContinuationNode.nextRef.get() ?: return null
    //释放除了哨兵以外的所有对象,防止内存泄露
    firstContinuationNode.nextRef.set(null)
    return firstContinuableNode
  }

  private fun resumeContinuableNodes(first: ContinuationNode) {
    var nextNode: ContinuationNode? = first
    while (nextNode != null) {
      nextNode.continuation?.resume(Unit)
      nextNode = nextNode.nextRef.get()
    }
  }

  private fun tryAddToQueueTail(node: ContinuationNode): Boolean =
      lastContinuationNode.nextRef.compareAndSet(null, node)

  override fun toString(): String {
    return super.toString() + "[Count = " + getLockedCount() + "]"
  }

  private class ContinuationNode(val continuation: CancellableContinuation<Unit>?) {

    val nextRef = AtomicReference<ContinuationNode?>(null)
  }
}
上一篇下一篇

猜你喜欢

热点阅读