jdk实现延迟队列

2020-11-16  本文已影响0人  周六不算加班

1、队列初始化

@Component
 public class DelayQueueManager implements CommandLineRunner {

@Resource
private AltcPsControlService altcPsControlService;

private final Logger logger = LoggerFactory.getLogger(DelayQueueManager.class);
private DelayQueue<DelayTask> delayQueue = new DelayQueue<>();


/**
 * 加入到延时队列中
 * @param task
 */
public void put(DelayTask task) {
    logger.info("加入延时任务:{}", task);
    delayQueue.put(task);
}

/**
 * 取消延时任务
 * @param task
 * @return
 */
public boolean remove(DelayTask task) {
    logger.info("取消延时任务:{}", task);
    return delayQueue.remove(task);
}


@Override
public void run(String... args) throws Exception {
    logger.info("初始化延时队列");
    Executors.newSingleThreadExecutor().execute(new Thread(this::excuteThread));
    this.initAltcPsControl();
}
/**
 * 延时任务执行线程
 */
private void excuteThread() {
    while (true) {
        try {
            DelayTask task = delayQueue.take();
            processTask(task);
        } catch (InterruptedException e) {
            break;
        }
    }
}

/**
 * 初始化扫描布控表加入延迟队列
 * @Author caody
 * @Date 2020/11/4 15:39
 * @Param
 * @return {@link }
 **/
private void initAltcPsControl(){
    //延迟1分钟后执行
    try {
        Thread.sleep(1000*60);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    logger.info("初始化扫描布控表加入延迟队列");
    List<AltcPsControlEntity> controlEntityList = altcPsControlService.selectInitDelayQueue();
    if (controlEntityList.size()>0){
        for (AltcPsControlEntity psControlEntity : controlEntityList){
            String identifier = psControlEntity.getId().toString();
            long expire = psControlEntity.getEndTime().getTime() - psControlEntity.getBeginTime().getTime();
            TaskBase taskBase = new TaskBase(identifier);
            DelayTask delayTask = new DelayTask(taskBase,expire);
            this.put(delayTask);
        }
    }
}

/**
 * 内部执行延时任务
 * @param task
 */
private void processTask(DelayTask task) {
    logger.info("执行延时任务:{}", task);
   //根据task中的data自定义数据来处理相关逻辑,例 if (task.getData() instanceof XXX) {}
    TaskBase taskBase = task.getData();
    Long id = Long.valueOf(taskBase.getIdentifier());
    AltcPsControlEntity altcPsControlEntity = new AltcPsControlEntity();
    altcPsControlEntity.setId(id);
    altcPsControlEntity.setStatus((byte) 3);
    boolean psControl = altcPsControlService.updateAltcPsControl(altcPsControlEntity);
    if (!psControl){
        logger.info("任务执行失败,重新放入队列中,一小时后重新执行:{}", task);
        Long expire = 1000*60*60L;
        DelayTask delayTask = new DelayTask(taskBase,expire);
        this.put(delayTask);
    }
}
}

2、消息体结构

  public class DelayTask implements Delayed {

final private TaskBase data;
final private long expire;

public DelayTask(TaskBase data, long expire) {
    super();
    this.data = data;
    this.expire = expire + System.currentTimeMillis();
}

public TaskBase getData() {
    return data;
}

public long getExpire() {
    return expire;
}

@Override
public boolean equals(Object obj) {
    if (obj instanceof DelayTask) {
        return this.data.getIdentifier().equals(((DelayTask) obj).getData().getIdentifier());
    }
    return false;
}

@Override
public String toString() {
    return "{" + "data:" + data.toString() + "," + "expire:" + new Date(expire) + "}";
}

@Override
public long getDelay(TimeUnit unit) {
    return unit.convert(this.expire - System.currentTimeMillis(), unit);
}

@Override
public int compareTo(Delayed o) {
    long delta = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
    return (int) delta;
}
}

3、消息内容

public class TaskBase {
private String identifier;

public TaskBase(String identifier) {
    this.identifier = identifier;
}


public String getIdentifier() {
    return identifier;
}

public void setIdentifier(String identifier) {
    this.identifier = identifier;
}

@Override
public String toString() {
    return JSON.toJSONString(this);
}
}
上一篇 下一篇

猜你喜欢

热点阅读