DelayQueue之限时订单实现方式
2022-03-14 本文已影响0人
xiaohei_e853
支付宝押金支付时,生成押金订单,用户超过15分钟未支付,订单状态改为超时取消。(生活中还有很多场景:在淘宝购物下单后没有付款,会提示多长时间订单失效;春季过年回家买火车票,下了订单后半个小时不付款改订单就会取消;点外卖。。。)
解决方式一
轮询数据库:到实现一个定时器,每隔一段时间去检查一遍数据库里的所有订单,查看其状态是否是未支付并且已经期。并修改这些数据的状态为已过期。
优点:方法简单,容易实现
缺点:订单状态处理不及时,轮询数据库的次数中可能很多都并没有修改订单,数据库频繁多次被连接浪费数据库资源开销,因为数据库资源非常宝贵。
因此以上方式实际开发中基本不予采用。
采用延时队列并且与时间有关系的延时队列DelayQueue。
原理
捕获.PNG
1、用户下单,保存订单到数据库的同时,将该订单以及订单的过期时间推入DelayQueue;
2、启动一个检查订单到期的线程,该线程使用delayQueue的take()方法获取到期订单,该方法为阻塞方法,如果当前没有到期订单,该方法会一直阻塞等待,直到获取到订单后继续往下执行;
3、当take()获取到一个到期订单后,该线程按获取到的订单的id去数据库查询订单并去检查订单状态,如果为未支付,则将状态修改为已关闭;
4、当项目重启后,DelayQueue中的信息都没有了。所以项目启动扫描所有过期未支付的订单并修改为已关闭状态,扫描所有未过期未支付的订单到DelayQueue中。
代码如下
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* 延时队列实体Delayed
*/
public class DelayedVo<T> implements Delayed {
/**
* 过期时长/单位毫秒
*/
private Long expireTime;
/**
* 目标对象
*/
private T target;
public DelayedVo(Long expireTime, T target) {
super();
this.expireTime = expireTime + System.currentTimeMillis();
this.target = target;
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public T getTarget() {
return this.target;
}
}
延迟订单
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.isuinfo.admin.modules.apppay.entity.DepositStateEnum;
import org.isuinfo.admin.modules.apppay.entity.FeYltDeposit;
import org.isuinfo.admin.modules.apppay.mapper.FeYltDepositMapper;
import org.isuinfo.admin.queue.DelayedVo;
import org.isuinfo.admin.utils.ConstantUtil;
import org.isuinfo.admin.utils.DateUtils;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.DelayQueue;
/**
* 延迟订单
*/
@Service
@Slf4j
public class DelayDepositService {
private DelayQueue<DelayedVo<FeYltDeposit>> delayQueue = new DelayQueue<>();
@Resource
private FeYltDepositMapper feYltDepositMapper;
/**
* 添加订单到DelayQueue
*
* @param feYltDeposit
* @param expireTime
*/
public void save(FeYltDeposit feYltDeposit, Long expireTime) {
DelayedVo<FeYltDeposit> delayedVo = new DelayedVo<>(expireTime, feYltDeposit);
delayQueue.put(delayedVo);
log.info("订单【超时时间:{}毫秒】被推入延时队列,订单详情:{}", expireTime, feYltDeposit);
}
/**
* 异步线程处理DelayQueue
*/
class DepositTask implements Runnable {
@Override
public void run() {
try {
//noinspection InfiniteLoopStatement
while (true) {
DelayedVo<FeYltDeposit> delayedVo = delayQueue.take();
FeYltDeposit feYltDeposit = delayedVo.getTarget();
FeYltDeposit selDeposit = feYltDepositMapper.selectById(feYltDeposit.getId());
//判断数据库中订单是否未支付
if (selDeposit.getState().equals(DepositStateEnum.un_auth.getKey())) {
selDeposit.setState(DepositStateEnum.timeout_cancel.getKey());
log.info("订单关闭:order={}", selDeposit);
feYltDepositMapper.updateById(selDeposit);
} else {
log.info("订单已处理:feYltDeposit={}", selDeposit);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 启动异步线程
*/
@PostConstruct
public void init() {
new Thread(new DepositTask()).start();
}
/**
* 启动修改过期未支付订单为已关闭状态
* 启动扫描数据库中的订单未过期未支付到DelayQueue
*/
@PostConstruct
public void initDelayOrder() {
//1. 处理过期未支付的订单...
Integer count = feYltDepositMapper.updateCrontab(ConstantUtil.EXPIRE_TIME);
log.info("系统启动,扫描处理【{}】个过期未支付的订单...", count);
//2. 获取未过期未支付的订单
QueryWrapper<FeYltDeposit> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("state", DepositStateEnum.un_auth.getValue());
List<FeYltDeposit> orders = feYltDepositMapper.selectList(queryWrapper);
log.info("系统启动,发现【{}】个未过期未支付的订单...", orders.size());
//3. 未过期未支付的订单推入延时队列
if (!orders.isEmpty()) {
for (FeYltDeposit deposit : orders) {
//计算剩余的过期时间
long expireTime = DateUtils.LocalDateTime2timestamp(deposit.getCreateTime().plusMinutes(ConstantUtil.EXPIRE_TIME))
- DateUtils.LocalDateTime2timestamp(LocalDateTime.now());
if (expireTime > 0) {
DelayedVo<FeYltDeposit> delayedVo = new DelayedVo<>(expireTime, deposit);
delayQueue.put(delayedVo);
log.info("订单【超时时间:{}毫秒】被推入延时队列,订单详情:{}", expireTime, deposit);
} else {
log.warn("订单:{},没有加入延时队列", deposit.getOutOrderNo());
}
}
}
}
}
@Transactional(propagation = Propagation.REQUIRED)
public void saveDeposit(FeYltDeposit feYltDeposit) {
this.baseMapper.insert(feYltDeposit);
//加入延迟队列
delayDepositService.save(feYltDeposit, ConstantUtil.EXPIRE_TIME_LONG);
}
注意:blockingqueue接口中,只有take和put方法才会阻塞,offer,add,remove这些都不是阻塞方法