redis-delayqueue

2018-12-29  本文已影响0人  zhangsanzhu

redis-delayqueue

适用场景

相比于专业的消息队列,Redis的优点就是简单,适用于只有一个消费者的场景.

重点

没有ack机制保障,肯能会丢.可靠性相对于kafka差一点.
brpop/blpop 属于阻塞读,队列为空的时候,会进入sleep,队列不为空,会被唤醒.
brpop/blpop 有个问题,长时间阻塞会被认为是空闲连接,服务器会自动断开连接,要记得捕获异常,并重试.

实现原理

基本操作

127.0.0.1:6379> rpush n kkkk kkkk lll
(integer) 3
127.0.0.1:6379> lpop n
"kkkk"
127.0.0.1:6379> lpop n
"kkkk"
127.0.0.1:6379> lpop n
"lll"
127.0.0.1:6379> rpush n kkkk kkkk lll
(integer) 3
127.0.0.1:6379> rpop n
"lll"
127.0.0.1:6379> rpop n
"kkkk"
127.0.0.1:6379> rpop n
"kkkk"
127.0.0.1:6379> rpush n kkkk kkkk lll  dasfasd asdfasd ddd assaa qqq eee rrr ooo ppp
(integer) 12
127.0.0.1:6379> brpop n 1
1) "n"
2) "ppp"
127.0.0.1:6379> brpop n 1
1) "n"
2) "ooo"
127.0.0.1:6379> brpop n 1
1) "n"
2) "rrr"
127.0.0.1:6379> brpop n 1

实现

public class RedisDelayingQueue<T> {

    static class TaskItem<T> {
        public String id;
        public T msg;
    }

    private Type TaskType = new TypeReference<TaskItem<T>>() {}.getType();

    private Jedis jedis;
    private String queueKey;

    public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
    }

    public void delay(T msg) {
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
        task.msg = msg;
        String s = JSON.toJSONString(task); // fastjson 序列化
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
    }

    public void loop() {
        while (!Thread.interrupted()) {
            // 只取一条
            Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
            if (values.isEmpty()) {
                try {
                    Thread.sleep(500); // 歇会继续
                } catch (InterruptedException e) {
                    break;
                }
                continue;
            }
            String s = values.iterator().next();
            if (jedis.zrem(queueKey, s) > 0) { // 抢到了
                TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
                this.handleMsg(task.msg);
            }
        }
    }

    public void handleMsg(T msg) {
        System.out.println(msg);
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
        Thread producer = new Thread() {

            public void run() {
                for (int i = 0; i < 10; i++) {
                    queue.delay("codehole" + i);
                }
            }

        };
        Thread consumer = new Thread() {

            public void run() {
                queue.loop();
            }

        };
        producer.start();
        consumer.start();
        try {
            producer.join();
            Thread.sleep(6000);
            consumer.interrupt();
            consumer.join();
        } catch (InterruptedException e) {
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读