redis 延时队列

2018-08-30  本文已影响0人  SingleException
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.gson.Gson;
import redis.clients.jedis.Jedis;

/**
 * 延时队列
 * @author zhang
 *
 */
public class MyRedisQueue<T> {

    
    private static final ExecutorService  eService = Executors.newCachedThreadPool();
    
    
    public void delay(T op,String queueKey){
        eService.execute(new PushThread(op,queueKey));
    }
    public void pop(String queueKey){
        eService.execute(new PopThread(queueKey));
    }
    
    class PopThread extends Thread{
        private String queueKey ;
        public PopThread(String queueKey){
            this.queueKey = queueKey;
        }
        public void run(){
            System.out.println("开始监听:");  //对消息的业务处理
            while(!Thread.interrupted()){
                 Jedis jedis = RedisPool.getJedis();
                 Set<String> values =  jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);//取出消息
                 if(values.isEmpty()){
                     try {
                        if(jedis!=null){
                            RedisPool.closeConn(jedis);
                        }
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        break;
                    }
                     continue;
                 }
                 String s = values.iterator().next();
                 if(jedis.zrem(queueKey, s)>0){ //抢到了
                    System.out.println("处理消息:"+s);  //对消息的业务处理
                    if(jedis != null){
                        RedisPool.closeConn(jedis);
                    }
                 }
            }
        }
    };
    class PushThread extends Thread{
        private T op ;
        private String queueKey ;
        public PushThread(T op,String queueKey){
            this.op = op;
            this.queueKey = queueKey;
        }
        public void run(){
            Gson gson = new Gson();
            String value = gson.toJson(op);
            System.out.println("生产消息:"+value);  //对消息的业务处理
            Jedis jedis = RedisPool.getJedis();
            jedis.zadd(queueKey, System.currentTimeMillis() + 5000, value); // 塞入延时队列 ,5s 后再试
            if(jedis != null){
                RedisPool.closeConn(jedis);
            }
        }
    };
}


上一篇下一篇

猜你喜欢

热点阅读