队列redis

翻译-使用redis做任务队列

2016-08-31  本文已影响125人  岳艳涛

前段时间,在工作中我们使用azure storage队列作为任务队列引擎,但过段时间后我们发现它并没有我们希望的那么快,随之,我们集中地使用redis并开始考虑将redis作为任务队列。虽然有许多的文档介绍如何使用redis的发布/订阅服务,但使用redis作为任务队列的屈指可数,所以我决定来描述如何去做。

什么是任务队列?

任务队列允许某些服务的客户端异步发送任务给它。通常服务有许多clients,可能许多处理的workers。总之整个工作流程是这样的:

  1. client将任务放到队列中
  2. workers定期循环检查队列中的新任务,如何存在,则执行任务
    但也有一个队列一些额外的要求:
  3. 服务质量:client不应该阻塞其他client的请求。
  4. 批处理:clients & workers应该能够获得多个任务以获得更好的性能。
  5. 可靠性:如果worker在处理task失败时,该任务可以被其他worker再次处理。
  6. 死信:如果某些任务被多次尝试后失败,它可以放在死信存储。
  7. 一个任务仅可以被成功处理一次

每个客户端将使用一个redis list,list key将使用一个任务队列名作为前缀,第二部分将是client id。当client准备把消息放入队列前,会将队列名和它自己的ID的进行关联成列表键。我们将使用Redis的list为每个客户端。列表键将使用一个任务队列名作为前缀和第二部分将是客户端ID。当客户希望将消息放入队列会由队列名和它自己的ID的级联计算列表键。会有很多的list存入单独的redis db中,但在该情况下,我们须共用一个redis db和其他一些代码,同时允许在它们的名字前添加额外的前缀,如:"queues:"。因此,我们定义一个类RedisQueue来隐藏这些细节。

import json
import datetime
import pytz
from random import randint
import logging
import time

main_prefix = "bqueues:"

class ClientRedisQueue():
    def __init__(self, qname, clientid, redis):
        self.client_id = clientid
        self.queue_name = main_prefix + qname + ":" + clientid
        logging.debug("created queue %s" % self.queue_name)
        self.redis = redis

    def send(self, msgs):
        jmsgs = [json.dumps({ "client_id" : self.client_id, "msg" :msg, "attempts" : 0}) for msg in msgs]
        self.redis.lpush(self.queue_name, *jmsgs)

    def exists(self):
        return self.redis.exists(self.queue_name)

    def length(self):
        return self.redis.llen(self.queue_name) 

    def delete(self):
        self.redis.delete(self.queue_name)

r = redis.StrictRedis("localhost")
cq = ClientRedisQueue("worker1", "client", r)

cq2 = ClientRedisQueue("worker1", "client2", r)
cq.send([1,2])
cq2.send([3,4,0]) 

所以发送端容易实现,那接受端呢?首先,我们需要找到所有队列列表list。有三种方式:

  1. 使用KEYS“prefix:*"命令, 该命令能够列出来所有列表。但这个命令可能会导致生产出现严重的问题,当针对大型数据库中执行它可能毁性能。所以永远不要使用此方式。
  2. 使用SCAN命令, 该命令的作用相当于上一条命令,但没有性能问题。
  3. 使用redis set存储所有list名字,即发送消息时将list名字添加到redis set中,当消息被处理时,删除名字。不幸的是,该步需要额外的代码来实现,所以我们将使用第二个选项。

当我们发现的所有的队列,我们​​需要他们随机排序以保证所有的list以相同的概率处理。之后,我们需通过redis pipeline的方式,一次处理一批大量的消息,随后,如果没有找到的消息,我们需要删除它们。此外,我们需要防止消息的双重处理列表,并防止消息因失败等异常情况造成的消息丢失。要做到这一点,我们将使用RPOPLPUSH命令,它以原子从列表中删除的消息,并把它变成一个额外的“processing”列表,并返回至调用者。因此,我们将使用其他列表中为每个队列与关键“processing:queue_name”。消息处理后,我们必须从prccessing列表中删除。但在几次不成功的尝试过程中消息的情况下,我们需要最终将其移动到死信中。并将之设置为:"dead:queue_name"。不时,我们需要检查的处理列表,如果算上尝试的消息低于允许最大计数然后把它返回到客户端列表或在其他情况下,把它设置成一纸空文。

AX_ATTEMPTS_COUNT = 4
class WorkerRedisQueue():
    def __init__(self, name, redis):
        self.queue_name = main_prefix + name
        self.processing_lname = main_prefix + "processing:" + name
        self.dead_sname = main_prefix + "dead:" + name
        self.refresh_period = datetime.timedelta(seconds=2)
        self.queue_pattern = self.queue_name + "*"
        self.redis = redis
        self.last_refresh_time = datetime.datetime.now(pytz.utc) - self.refresh_period - self.refresh_period
        self.list_names = []

    def proccessed(self, msg):
        self.redis.lrem(self.processing_lname, 0, json.dumps(msg))

    # start this from time to time
    def recover(self):
        recovered = 0
        proc_msgs = self.redis.lrange(self.processing_lname, -5,-1)
        for (msg, orig) in [(json.loads(msg),msg) for msg in proc_msgs if msg]:
            if msg["attempts"] > MAX_ATTEMPTS_COUNT:
                print "found dead letter"
                self.redis.sadd(self.dead_sname, orig)
            else:
                print "recovering"
                recovered = recovered + 1
                msg["attempts"] = msg["attempts"] + 1
                self.redis.lpush("%s:%s" % (self.queue_name, msg["client_id"]), json.dumps(msg))
            self.redis.lrem(self.processing_lname, 0, orig)

        return recovered

    def get_list_names(self):
        lists = []
        print "searching pattern", self.queue_pattern
        for l in self.redis.scan_iter(self.queue_pattern):
            print "found list", l
            lists.append(l)
        return lists

    def refresh(self, force = False):
        now = datetime.datetime.now(pytz.utc)
        time_to_refresh = now - self.last_refresh_time > self.refresh_period
        if force or time_to_refresh:
            self.list_names = self.get_list_names()
            self.last_refresh_time = now
        else:
            print "skip refresh"

    def receive(self, msg_count):
        self.refresh()
        count = len(self.list_names)
        if count == 0:
            print "queues not found"
            return []
        p = self.redis.pipeline()
        for i in range(msg_count):
            l = self.list_names[randint(0, count - 1)]
            p.rpoplpush(l,self.processing_lname)
        msgs = p.execute()
        return [json.loads(msg) for msg in msgs if msg]

    def length(self):
        self.refresh(True)
        res = 0
        for l in self.list_names:
            res = res + self.redis.llen(l)
        return res

wq = WorkerRedisQueue("worker1", r)
while(True):
    time.sleep(1)
    msgs = wq.receive(2)
    if len(msgs) == 0:
        if randint(0, 10) == 0 and wq.length() == 0 and wq.recover() == 0:
            print "sleeping"
            time.sleep(1)
            
    for msg in msgs:
        print "received msg", msg
        try:
            a = 10/msg["msg"]
            wq.proccessed(msg)
        except Exception,e: 
            print "exception", str(e)   

原文:http://hodzanassredin.github.io/2016/03/29/redis_task_queue.html
翻译:yyt030

上一篇 下一篇

猜你喜欢

热点阅读