设计方案

分布式锁实现

2019-12-28  本文已影响0人  feihui_

在多实例部署中,当我们需要对实例间作并发控制,那么就涉及分布式锁问题,我们常用的分布式锁实现有基于 Redis / Zookeeper / Database ( 例如 Mongo ) 等,今天我们就来看下如何实现一个分布式锁 ( 示例样本给出 Mongo 版本实现 )。

分布式锁关键点有三:

  1. 设置锁标示的同时设置超时时间,这个操作需要具备原子性。之所以要设置超时时间是为了防止锁没能被正常释放 ( 例如持有锁的实例宕机 ) 而导致锁无法再被获取,之所以强调原子性就是为了防止锁标示设置之后实例突然宕机而引发前述的问题。
  2. 删除锁标示的同时比对所属实例,该操作同样要具备原子性。之所以要比对所属实例是为了防止误删 ( 例如将要删除之前锁还是你的,当删除的时候已经被其他实例持有,从发送命令到执行命令是存在一定时间差 ),原子性也是为了避免前述的问题。
  3. 要意识到即便上述操作没有问题也有可能产生并发问题。原因在于锁具有一定期限,假如持有锁的实例未能在这个期限内完成业务处理 ( 例如循环内没有判断剩余期限或当前线程被阻塞等 ),则很明显就会发生这个问题,解决办法一是适当增加超时时间 ( 但切记不能太大,异常锁释放的时候就会很长,需要人为干预 ),另一方面还需要业务逻辑容忍一定的重复处理 ( 例如实现幂等 )。
模板 ( 需要注意的是下面的方法均不支持本地多线程并发获取同一锁,需要自行处理 )
@Service
class LockService(
    private val lockDao: LockDao
) {

    private companion object : KLogging()

    fun acquire(key: String): Boolean =
        doAcquire(key)

    fun release(key: String): Boolean =
        doRelease(key)

    // =================================================================================================================

    @PreDestroy
    fun destroy() = locks.forEach { doRelease(it.key) }

    // 当前实例持有锁本地缓存
    private val locks: ConcurrentHashMap<String, Lock> = ConcurrentHashMap()

    private fun doAcquire(key: String): Boolean {
        fun apply(): Lock =
            Lock(key = key).run {
                (lockDao.getLockByKey(this.key) ?: lockDao.setLockAndReturnExisted(this)).also { locks[key] = it }
            }
        /**
         * 1。首先查看锁是否有本地缓存,没有则从数据源中获取,假如锁在数据源也不存在则尝试申请锁并返回有效锁(设置的锁或已存在的锁);
         * 2。超时判断,针对已经超时则尝试申请锁并并返回有效锁;针对即将超时的锁(两秒钟内),休眠下然后再去尝试申请锁并并返回有效锁;
         * 3。属主判断,针对当前实例的锁,假如不久将来即将超时(一分钟内),则适当延长下改锁的超时时间(延长十分钟),任务场景非常实用。
         */
        return (locks[key] ?: apply())
            .run {
                // 超时判断
                if (this.isTimeout()) apply()
                else if (this.isGoingToBeTimeout()) this.waitMoment().run { apply() }
                else this
            }
            .run {
                // 属主判断
                if (this.isOwner()) {
                    if (this.isTimeToBeExtended()) {
                        if (lockDao.extendLockByKeyAndOwner(this.key, this.owner)) {
                            logger.info("extend successfully with key(${this.key}) and owner ${this.owner}")
                            this.extended()
                        } else {
                            logger.info("could not to extend with key(${this.key}) and owner ${this.owner}")
                            false
                        }
                    } else {
                        true
                    }
                } else {
                    logger.info("failed to acquire lock with key($key) by ${Lock.HOST}, current owner is ${this.owner}!")
                    locks.remove(this.key)
                    false
                }
            }
    }

    /**
     * 释放锁的时候同时需要判断当前锁的持有者避免误删
     */
    private fun doRelease(key: String) =
        (locks[key] ?: shouldNotReachHere()).run {
            if (this.isOwner() && this.isAvailable()) {
                lockDao.deleteLockByKeyAndOwner(this.key, this.owner)
                logger.info("released lock with key(${this.key}) and owner (${this.owner})")
            }
        }.run {
            locks.remove(key)
            true
        }
}

interface LockDao {

    /**
     * 获取锁
     */
    fun getLockByKey(key: String): Lock?

    /**
     * 设置锁,并返回已经存在的锁(包括本次设置的)
     */
    fun setLockAndReturnExisted(lock: Lock): Lock

    /**
     * 延长锁,需要注意的是延长的时候有可能锁已经过期删除
     */
    fun extendLockByKeyAndOwner(key: String, owner: String): Boolean

    /**
     * 释放锁,需要注意的是删除的时候有可能锁已经过期删除
     */
    fun deleteLockByKeyAndOwner(key: String, owner: String): Boolean
}

class Lock(
    val key: String,
    val owner: String = HOST
) {
    var timestamp: Long = currentTimeSeconds()
        private set

    constructor(
        key: String, owner: String, timestamp: Long
    ) : this(key = key, owner = owner) {
        this.timestamp = timestamp
    }

    fun isOwner() = owner == HOST

    fun isAvailable() = timestamp >= currentTimeSeconds()

    fun isTimeout() = timestamp <= currentTimeSeconds()

    fun isGoingToBeTimeout() = (timestamp - currentTimeSeconds()) <= IS_GOING_TO_BE_TIMEOUT

    fun isTimeToBeExtended() = (timestamp - currentTimeSeconds()) <= IS_TIME_TO_BE_EXTENDED

    fun waitMoment() = TimeUnit.SECONDS.sleep(this.timestamp - currentTimeSeconds())

    fun extended(): Boolean {
        timestamp += EXTENDED_TIME
        return true
    }

    companion object {
        private fun currentTimeSeconds() = System.currentTimeMillis() / 1000

        val HOST = ObjectId().toString()

        private const val ONE_SECOND = 1000
        private const val TWO_SECONDS = ONE_SECOND * 2
        private const val ONE_MINUTE = ONE_SECOND * 60
        private const val TEN_MINUTES = ONE_MINUTE * 10

        const val IS_GOING_TO_BE_TIMEOUT = TWO_SECONDS
        const val IS_TIME_TO_BE_EXTENDED = ONE_MINUTE
        const val EXTENDED_TIME = TEN_MINUTES
    }
}
Mongo 版本 ( 之所以会有这个版本考虑到项目刚好使用 Mongo,直接使用 Mongo 来实现可以避免引入其他系统带来新的复杂度,并且本身并发度也不高 )
@Component
class MongoLockDaoImpl(
    private val mongoTemplate: MongoTemplate
) : LockDao {

    override fun getLockByKey(key: String): Lock? =
        mongoTemplate.findOne(
            Query.query(
                Criteria.where(MongoLock.KEY).`is`(key)
            ),
            MongoLock::class.java
        )?.toLock()

    override fun setLockAndReturnExisted(lock: Lock): Lock =
        mongoTemplate.findAndModify(
            Query.query(
                Criteria.where(MongoLock.KEY).`is`(lock.key)
            ),
            Update()
                .setOnInsert(MongoLock.OWNER, lock.owner)
                .setOnInsert(MongoLock.TIMESTAMP, lock.timestamp)
                .setOnInsert(MongoLock.EXPIRE, timestampToLocalDateTime(lock.timestamp)),
            FindAndModifyOptions().apply { upsert(true) }.apply { returnNew(true) },
            MongoLock::class.java
        ).toLock()

    override fun extendLockByKeyAndOwner(key: String, owner: String): Boolean =
        mongoTemplate.findOne(
            Query.query(
                Criteria.where(MongoLock.KEY).`is`(key).and(MongoLock.OWNER).`is`(owner)
            ),
            Lock::class.java
        )?.let {
            val timestamp = it.timestamp + Lock.EXTENDED_TIME
            mongoTemplate.updateFirst(
                Query.query(Criteria.where(MongoLock.KEY).`is`(key).and(MongoLock.OWNER).`is`(owner)),
                Update()
                    .set(MongoLock.TIMESTAMP, timestamp)
                    .set(MongoLock.EXPIRE, timestampToLocalDateTime(timestamp)),
                Lock::
                class.java
            ).run {
                this.n == 1
            }
        } ?: false

    override fun deleteLockByKeyAndOwner(key: String, owner: String): Boolean =
        mongoTemplate.remove(
            Query.query(Criteria.where(MongoLock.KEY).`is`(key).and(MongoLock.OWNER).`is`(owner)),
            Lock::class.java
        ).run {
            this.n == 1
        }

    private fun timestampToLocalDateTime(timestamp: Long) =
        LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault())

    // 需要注意的是下面这种设置索引的方式并不是很好, Spring 每次启动都会去检查索引是否已经设置。
    // 推荐提前设置好,在此仅限说明使用
    @Document(collection = "Lock")
    @CompoundIndex(name = "key", def = "{'key': 1}", unique = true, background = true)
    class MongoLock @PersistenceConstructor constructor(
        val key: String,
        val owner: String,
        val timestamp: Long,
        // 超时设置
        @field:Indexed(expireAfterSeconds = 60 * 30, background = true)
        val expire: LocalDateTime
    ) {
        fun toLock(): Lock = Lock(key = key, owner = owner, timestamp = timestamp)

        companion object {
            const val KEY = "key"
            const val OWNER = "owner"
            const val TIMESTAMP = "timestamp"
            const val EXPIRE = "expire"
        }
    }
}
Redis 版本 -- 伪代码展示 ( To Be Continue )
class RedisLockDaoImpl(
    private val redisTemplate: RedisTemplate<String, String>
) : LockDao {

    override fun getLockByKey(key: String): Lock? {
        /**
         * val timestamp = TTL key
         * val owner = GET key
         * return Lock(key = key, owner = owner, timestamp = current time + timestamp)
         */
        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
    }

    override fun setLockAndReturnExisted(lock: Lock): Lock {
        /**
         * val result = SET key lock.owner NX EX (lock.timestamp - current time)
         * if result is "OK" then return lock
         *
         * val existed = GET key
         * if exited is null then return setLockAndReturnExisted(lock)
         * else return getLockByKey(lock.key)
         */
        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
    }

    override fun extendLockByKeyAndOwner(key: String, owner: String): Boolean {
        /** Lua 脚本 或者 mutil/exec
         * MULTI
         *
         * val existed = GET key
         * if existed is null then return false
         * else if existed.owner != owner then return false
         *
         * val expire = TTL key + ten minutes
         * val result = SET key owner XX EX expire
         * if result is "OK" then return true
         * else return false
         *
         * EXEC
         */
        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
    }

    override fun deleteLockByKeyAndOwner(key: String, owner: String): Boolean {
        /** Lua 脚本 或者 mutil/exec
         * MULTI
         *
         * val existed = GET key
         * if existed is null then return false
         * else if existed.owner != owner then return false
         * else if DEL key and return 1 then return true
         * else return false
         *
         * EXEC
         */
        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
    }
}

基于 Redis 实现的分布式锁版本比较多,从早期的 GETSET 到 SET ( 2.6.12 版本 ) 再到 MULTI/EXEC, 或多或少有些"遗憾",例如:GETSET 会覆盖当前值导致无法在数据源处判断当前持有锁的实例 ( 也就导致不知道属主是谁而误删 )、SET 并不支持 Hash、MULTI/EXEC 则是不具备原子 ( It's important to note that even when a command fails, all the other commands in the queue are processed – Redis will not stop the processing of commands. )、Lua 脚本的原子性源于单线程执行。此处推荐下 Redis 官方的分布式框架 Redisson!

相对于上述两种通过设置超时时间释放异常锁,基于 ZK 的分布式锁则是通过心跳检测来释放异常锁。

上一篇下一篇

猜你喜欢

热点阅读