redis实现延时队列
2019-04-11 本文已影响0人
echo不扣
需求是想用redis做一个延时的队列,每次内容必须在一定时间后才能被取出,
比如说:有未支付订单要在一定时间内关闭,假设为30秒,存入的时候我们使用redis的有序集合进行添加,用当前时间戳加上30秒来排序(zadd),然后每次消费者轮询的时候就只取出开始时间0到当前时间这个时间段(zrangeByScore)
1.生产类 Producer.java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Transaction;
public class Producer {
static final String QueueName = "delay-queue";
public static void main(String[] args)throws InterruptedException {
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);
Jedis jedis = pool.getResource();
try {
int count = 0;
while (true) {
String message = "Message #" + count;
String key = "foobar:" + count;
System.out.println("Queueing message: " + message);
queueMessage(jedis, QueueName, key, message, 5);
// delete every 5th Action
if (count != 0 && count % 5 == 0) {
System.out.println("Deleting msg with id " + count);
jedis.del(key);
}
count += 1;
Thread.sleep(3000L);
}
} finally {
jedis.close();
pool.destroy();
}
}
private static void queueMessage(Jedis jedis, String queue, String key, String message, Integer delay) {
long time = System.currentTimeMillis() / 1000 + delay;//当前时间的秒数加上要延时的秒数
Transaction t = jedis.multi();
t.zadd(queue, time, key);
t.set(key, message);
t.exec();
}
}
2.消费者类 Consumer.java
代码如下:
public class Consumer {
public static void main(String[] args) throws InterruptedException {
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);
Jedis jedis = pool.getResource();
try {
while (true) {
getMessages(jedis, Producer.QueueName);
Thread.sleep(1000L);
}
} finally {
jedis.close();
pool.destroy();
}
}
private static void getMessages(Jedis jedis, String queue) {
int startTime = 0;
long endTime = System.currentTimeMillis() / 1000;
Transaction t = jedis.multi();
Response<Set<String>> setResponse = t.zrangeByScore(queue, startTime, endTime);//在startTime和endTime之间的数
t.zremrangeByScore(queue, startTime, endTime);//移除所有startTime-endTime中的所有成员
t.exec();
List<String> keys = new ArrayList();
keys.addAll(setResponse.get());//将所有的key添加到list中
String[] keyArray = keys.toArray(new String[keys.size()]);//然后转换成数组
if (keyArray.length > 0) {
Transaction tMessage = jedis.multi();
Response<List<String>> messageResponse = tMessage.mget(keyArray);//获取多个键值对
tMessage.del(keyArray);
tMessage.exec();
List<String> messages = messageResponse.get();
for (int i = 0; i < messages.size(); i++) {
String key = keys.get(i);
String message = messages.get(i);
System.out.print("Received key: " + key + ". ");
if (message == null) {
System.out.println("Message for key " + key + " is gone!");
} else {
System.out.println("Message for key " + key + " is " + message);
}
}
}
}
}