Kotlin协程实现资源竞争锁
2023-02-09 本文已影响0人
ZuYuan
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.*
import kotlin.coroutines.resume
/**
* is fair lock.
*/
class SuspendResourceLock(
private val resourceCount: Int,
private val occupiedCount: Int = 0
) {
private var availableCount: Int
private val suspendedContinuations = LinkedList<CancellableContinuation<Unit>>()
private val lock = Any()
init {
if (resourceCount < occupiedCount) {
throw IllegalArgumentException("resourceCount: $resourceCount, but occupiedCount: $occupiedCount")
}
availableCount = resourceCount - occupiedCount
}
fun tryAcquire(): Boolean {
if (availableCount > 0) {
withLock {
if (availableCount > 0) {
--availableCount
return true
}
}
}
return false
}
suspend fun acquire() {
var needAwait = false
if (availableCount > 0) {
withLock {
if (availableCount > 0) {
--availableCount
} else {
needAwait = true
}
}
} else {
needAwait = true
}
if (needAwait) {
awaitUntilResourceAvailable()
}
}
private suspend fun awaitUntilResourceAvailable() = suspendCancellableCoroutine<Unit> { cont ->
var canResume = false
withLock {
if (tryAcquire()) {
canResume = true
} else {
suspendedContinuations.add(cont)
}
}
if (canResume) {
cont.resume(Unit)
}
}
fun release() {
if (availableCount == resourceCount) {
throw IllegalStateException("availableCount:$availableCount, no resource to release.")
}
var continuation: CancellableContinuation<Unit>? = null
//保证公平的关键: 释放锁之后立马释放队列的第一个元素
withLock {
++availableCount
if (suspendedContinuations.isNotEmpty()) {
--availableCount
continuation = suspendedContinuations.removeFirst()
}
}
continuation?.resume(Unit)
}
//atomic lock has ABA problem
private inline fun withLock(block: () -> Unit){
synchronized(lock) {
block()
}
}
}