《Redis深度历险》 读书笔记 (1)

2019-02-17  本文已影响0人  芒果菠萝蛋炒饭

分布式锁

问题

解决方法

延时队列

比较适合异步消息处理,将当前冲突的请求扔到另一个队列延后处理以避开冲突

Redis的 list 常常被用来作为异步消息队列使用,使用 rpush/lpush 入队, lpop/rpop 出队

问题

  1. 队列为空
    队列为空时,客户端会陷入pop操作的死循环,导致空轮询,空轮询多的话,会导致Redis的慢查询显著增多

让线程sleep来解决这个问题,请求不到的话,让线程睡1秒钟,这样能降低客户端的CPU占用和Redis的QPS

  1. 队列延迟
    睡眠的方法虽然可以解决问题,但是会导致消息延迟增大,如何降低延迟呢?

使用blpop/brpop,这俩个指令的前缀 d 代表blocking,阻塞读。
阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来,消息延迟几乎为0.

  1. 空闲连接自动断开
    若果线程一直阻塞,Redis在连接闲置过久的时候会主动断开连接以减少闲置资源占用,这个时候 bpop/lpop 会抛出异常

在编写客户端消费者时要注意捕获异常,重新连接

延时队列的实现

延时队列可以通过Redis的zset(有序列表)实现,将消息序列化作为 zsetvalue ,将到期处理时间作为 score
使用多个线程轮询 zset 获取到期的任务进行处理(保证一个线程挂了其他的线程可以继续处理)

def delay(msg):
    msg.id = str(uuid.uuid4())  # 保证 value 值唯一
    value = json.dumps(msg)
    retry_ts = time.time() + 5  # 5 秒后重试
    redis.zadd("delay-queue", retry_ts, value)


def loop():
    while True:
        # 最多取 1 条
        values = redis.zrangebyscore("delay-queue", 0, time.time(), start=0, num=1)
        if not values:
            time.sleep(1)  # 延时队列空的,休息 1s
            continue
        value = values[0]  # 拿第一条,也只有一条
        success = redis.zrem("delay-queue", value)  # 从消息队列中移除该消息
        if success:  # 因为有多进程并发的可能,最终只会有一个进程可以抢到消息
            msg = json.loads(value)
            handle_msg(msg)...

使用 zrem 来保证任务被唯一的线程获取并执行

上一篇 下一篇

猜你喜欢

热点阅读