程序员程序猿阵线联盟-汇总各类技术干货

python简单封装redis,实现批量增删查取

2018-05-29  本文已影响12人  geekrun

摘要

redis常用命令已经足够简单,但有些业务需求需要批量,增删查取,于是就简单的封装了下python-redis,同时对一些功能方法统一接口,方便调用

代码

import time
import redis

#工具类简单,如果是字节,转成str
def bytes_to_str(s, encoding='utf-8'):
    """Returns a str if a bytes object is given."""
    if isinstance(s, bytes):
        return s.decode(encoding)
    return s


class RedisClient(object):
    def __init__(self, url='redis://@localhost:6379/0'):
        self.redis = redis.from_url(url=url)
        
    #保证单例,减少连接数
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, '__instance'):
            cls.__instance = object.__new__(cls)
        return cls.__instance

    # 遍历队列,获取所有keys,用cursor在keys量大时不会堵塞redis
    def get_all_keys(self, match=None, cursor=0):
        result_list = []
        while True:
            iter_result = self.redis.scan(match=match, cursor=cursor)
            cursor = iter_result[0]
            result_list.extend(iter_result[1])
            if cursor == 0:
                return map(bytes_to_str, result_list)

    # 获取队列长度,兼容不同类型的获取长度,同时可以批量检测
    def get_len(self, args):
        len_dict = dict()
        val = None
        if not isinstance(args, str):
            for key in args:
                kind = self.get_kind(key)
                if kind == 'hash':
                    val = self.redis.hlen(key)
                elif kind == 'zset':
                    val = self.redis.zcard(key)
                elif kind == 'list':
                    val = self.redis.llen(key)
                elif kind == 'set':
                    val = self.redis.scard(key)
                len_dict[key] = val
            return len_dict
        else:
            dict_len = self.get_len([args])
            return dict_len[args]

    # 获取队列类型,同时兼容批量检测类型
    def get_kind(self, args):
        if isinstance(args, str):
            return bytes_to_str(self.redis.type(args))
        else:
            kind_dict = dict()
            for item in args:
                kind_dict[item] = bytes_to_str(self.redis.type(item))
            return kind_dict
    #批量取,因为保证进程安全,采取迭代弹出,如果是单进程可以批量查然后批量删
    def batch_fetch(self, name, count=1, kind=None):
        if not kind:
            kind = self.get_kind(name)
        pipe = self.redis.pipeline()
        if kind == 'set':
            while True:
                [pipe.spop(name=name) for i in range(count)]
                result = pipe.execute()
                clean_result = set(result) - set([None])
                if clean_result:
                    yield clean_result
                else:
                    break

        elif kind == 'list':
            while True:
                [pipe.lpop(name=name) for i in range(count)]
                result = pipe.execute()
                clean_result = set(result) - set([None])
                if clean_result:
                    yield clean_result
                else:
                    break

    # 批量查找
    def batch_find(self, name, count, kind=None, cursor=0):
        if not kind:
            kind = self.get_kind(name)
        if kind == 'set':
            cursor, result = self.redis.sscan(name=name, count=count, cursor=cursor)
            yield result
            while cursor:
                cursor, result = self.redis.sscan(name=name, count=count, cursor=cursor)
                yield result
                if not cursor:
                    break

        elif kind == 'list':
            while True:
                start = 0
                end = count - 1
                result = self.redis.lrange(name=name, start=start, end=end)
                yield result
                if not result:
                    break

    # 批量删除
    def batch_delete(self, name, value, kind=None, count=None):
        if not kind:
            kind = self.get_kind(name)

        if kind == 'set':
            self.redis.srem(name, *value)

        elif kind == 'list':
            self.redis.ltrim(name=name, start=count, end=-1)

    # 批量插入
    def batch_insert(self, name, value, kind=None):
        if not kind:
            kind = self.get_kind(name)
        insert_num = 0
        if kind == 'set':
            insert_num = self.redis.sadd(name, *value)
        elif kind == 'list':
            insert_num = self.redis.rpush(name, *value)
        return insert_num


if __name__ == '__main__':
    redis = RedisClient()
    bb = redis.get_kind('hhh')
    name = '998_info_video_queue'
    for i in redis.batch_fetch(name=name, count=10000):
        print(i)
        # redis.batch_insert(name=name, value=i)
        time.sleep(1)

    db = redis.redis
    pipe = db.pipeline()

上一篇 下一篇

猜你喜欢

热点阅读