Kotlin协程实现资源竞争锁

2023-02-09  本文已影响0人  ZuYuan
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.*
import kotlin.coroutines.resume

/**
 * is fair lock.
 */
class SuspendResourceLock(
    private val resourceCount: Int,
    private val occupiedCount: Int = 0
) {

    private var availableCount: Int

    private val suspendedContinuations = LinkedList<CancellableContinuation<Unit>>()

    private val lock = Any()

    init {
        if (resourceCount < occupiedCount) {
            throw IllegalArgumentException("resourceCount: $resourceCount, but occupiedCount: $occupiedCount")
        }
        availableCount = resourceCount - occupiedCount
    }

    fun tryAcquire(): Boolean {
        if (availableCount > 0) {
            withLock {
                if (availableCount > 0) {
                    --availableCount
                    return true
                }
            }
        }
        return false
    }

    suspend fun acquire() {
        var needAwait = false
        if (availableCount > 0) {
            withLock {
                if (availableCount > 0) {
                    --availableCount
                } else {
                    needAwait = true
                }
            }
        } else {
            needAwait = true
        }

        if (needAwait) {
            awaitUntilResourceAvailable()
        }
    }

    private suspend fun awaitUntilResourceAvailable() = suspendCancellableCoroutine<Unit> { cont ->
        var canResume = false
        withLock {
            if (tryAcquire()) {
                canResume = true
            } else {
                suspendedContinuations.add(cont)
            }
        }
        if (canResume) {
            cont.resume(Unit)
        }
    }

    fun release() {
        if (availableCount == resourceCount) {
            throw IllegalStateException("availableCount:$availableCount, no resource to release.")
        }
        var continuation: CancellableContinuation<Unit>? = null
        //保证公平的关键: 释放锁之后立马释放队列的第一个元素
        withLock {
            ++availableCount
            if (suspendedContinuations.isNotEmpty()) {
                --availableCount
                continuation = suspendedContinuations.removeFirst()
            }
        }
        continuation?.resume(Unit)
    }

    //atomic lock has ABA problem
    private inline fun withLock(block: () -> Unit){
        synchronized(lock) {
            block()
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读