redis 延时队列
2018-08-30 本文已影响0人
SingleException
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
/**
* 延时队列
* @author zhang
*
*/
public class MyRedisQueue<T> {
private static final ExecutorService eService = Executors.newCachedThreadPool();
public void delay(T op,String queueKey){
eService.execute(new PushThread(op,queueKey));
}
public void pop(String queueKey){
eService.execute(new PopThread(queueKey));
}
class PopThread extends Thread{
private String queueKey ;
public PopThread(String queueKey){
this.queueKey = queueKey;
}
public void run(){
System.out.println("开始监听:"); //对消息的业务处理
while(!Thread.interrupted()){
Jedis jedis = RedisPool.getJedis();
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);//取出消息
if(values.isEmpty()){
try {
if(jedis!=null){
RedisPool.closeConn(jedis);
}
Thread.sleep(5000);
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if(jedis.zrem(queueKey, s)>0){ //抢到了
System.out.println("处理消息:"+s); //对消息的业务处理
if(jedis != null){
RedisPool.closeConn(jedis);
}
}
}
}
};
class PushThread extends Thread{
private T op ;
private String queueKey ;
public PushThread(T op,String queueKey){
this.op = op;
this.queueKey = queueKey;
}
public void run(){
Gson gson = new Gson();
String value = gson.toJson(op);
System.out.println("生产消息:"+value); //对消息的业务处理
Jedis jedis = RedisPool.getJedis();
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, value); // 塞入延时队列 ,5s 后再试
if(jedis != null){
RedisPool.closeConn(jedis);
}
}
};
}