redis实现延时队列

2019-04-11  本文已影响0人  echo不扣

需求是想用redis做一个延时的队列,每次内容必须在一定时间后才能被取出,

比如说:有未支付订单要在一定时间内关闭,假设为30秒,存入的时候我们使用redis的有序集合进行添加,用当前时间戳加上30秒来排序(zadd),然后每次消费者轮询的时候就只取出开始时间0到当前时间这个时间段(zrangeByScore)

1.生产类 Producer.java

  import redis.clients.jedis.Jedis;

  import redis.clients.jedis.JedisPool;

  import redis.clients.jedis.JedisPoolConfig;

  import redis.clients.jedis.Transaction;

  public class Producer {

    static final String QueueName = "delay-queue";

    public static void main(String[] args)throws InterruptedException {

        JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);

        Jedis jedis = pool.getResource();

        try {

            int count = 0;

            while (true) {

                String message = "Message #" + count;

                String key = "foobar:" + count;

                System.out.println("Queueing message: " + message);

                queueMessage(jedis, QueueName, key, message, 5);

               // delete every 5th Action

                if (count != 0 && count % 5 == 0) {

                    System.out.println("Deleting msg with id " + count);

                    jedis.del(key);

                }

                count += 1;

               Thread.sleep(3000L);

            }

        } finally {

            jedis.close();

            pool.destroy();

        }

    }

    private static void queueMessage(Jedis jedis, String queue, String key, String message, Integer delay) {

        long time = System.currentTimeMillis() / 1000 + delay;//当前时间的秒数加上要延时的秒数

        Transaction t = jedis.multi();
        t.zadd(queue, time, key);
        t.set(key, message);
        t.exec();

}

}

2.消费者类 Consumer.java

代码如下:

public class Consumer {

public static void main(String[] args) throws InterruptedException {

    JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);

    Jedis jedis = pool.getResource();

    try {

        while (true) {

            getMessages(jedis, Producer.QueueName);

            Thread.sleep(1000L);

        }

    } finally {

        jedis.close();

        pool.destroy();

    }

}

private static void getMessages(Jedis jedis, String queue) {

    int startTime = 0;

    long endTime = System.currentTimeMillis() / 1000;

    Transaction t = jedis.multi();


    Response<Set<String>> setResponse = t.zrangeByScore(queue, startTime, endTime);//在startTime和endTime之间的数

    t.zremrangeByScore(queue, startTime, endTime);//移除所有startTime-endTime中的所有成员

    t.exec();

    List<String> keys = new ArrayList();

    keys.addAll(setResponse.get());//将所有的key添加到list中

    String[] keyArray = keys.toArray(new String[keys.size()]);//然后转换成数组

    if (keyArray.length > 0) {

        Transaction tMessage = jedis.multi();

        Response<List<String>>  messageResponse = tMessage.mget(keyArray);//获取多个键值对

        tMessage.del(keyArray);

        tMessage.exec();

        List<String> messages = messageResponse.get();

        for (int i = 0; i < messages.size(); i++) {

            String key = keys.get(i);

            String message = messages.get(i);

            System.out.print("Received key: " + key + ". ");

            if (message == null) {

                System.out.println("Message for key " + key + " is gone!");

            } else {

                System.out.println("Message for key " + key + " is " + message);

            }

        }

    }

}

}

上一篇下一篇

猜你喜欢

热点阅读