平台架构定时任务Redis

使用Redis实现延迟短信发送

2019-11-13  本文已影响0人  了不起的贰寳

场景概述

下单成功后,N分钟客户没有付款,发送一条提醒待付款短信给下单客户。

添加短信任务

定义简单的短信发送对象(订单ID、手机号、短信内容、发送时间),转为JSON字符串作为zSet的value,发送时间(转秒数)作为score。将对象用zSet(有序列表)存入Redis。

消费短信任务

  1. 获取任务列表总数,计算最大扫描次数。
  2. 每次扫描读取前50个任务,如果当次扫描的任务全部消费完,则进入下一轮扫描,如果当次扫描任务没有被消费完,则退出本次扫描。
  3. 调度服务添加定时任务,10秒轮询请求消费任务接口。

细节问题

  1. 10秒内还没执行完本次消费任务,下一次轮询开始,可能会获取到正在执行中的队列任务如何避免。
    解决方法:redis添加一个任务锁,如果当前还有队列任务正在执行,退出本次任务。
  2. 如果分布式部署,多节点对应一个Redis缓存库的话,可能出现多个节点跳过锁验证。
    解决方法:一个节点对应一个Redis库

参考代码

  1. 定义短信任务对象
public class SmsMessage {
    
    private long id;
    private int orderId;
    private String phone;
    private String content;
    private long sendTime;// 发送时间
    
    public SmsMessage (long id, int orderId,String phone, String content,long sendTime) {
        this.id = id;
        this.orderId = orderId;
        this.phone = phone;
        this.content= content;
        this.sendTime = sendTime;
    }
    
    public long getId() {
        return id;
    }
    public void setId(long id) {
        this.id = id;
    }
    public int getOrderId() {
        return orderId;
    }
    public void setOrderId(int orderId) {
        this.orderId = orderId;
    }
    public String getPhone() {
        return phone;
    }
    public void setPhone(String phone) {
        this.phone = phone;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content= content;
    }
    public long getSendTime() {
        return sendTime;
    }
    public void setSendTime(long sendTime) {
        this.sendTime = sendTime;
    }
}
import java.math.BigDecimal;

public class NumberUtil {
    public static final int MIN = 100000;
    public static final int MAX = 999999;
    /**
     * 获取随机数
     */
    public static int getRandNum(int min, int max) {
        int randNum = min + (int) (Math.random() * ((max - min) + 1));
        return randNum;
    }
    /**
     * 将数字精确到指定位数(四舍五入)
     */
    public static double convertDouble(double number, int newScale) {
        BigDecimal b = new BigDecimal(number);
        return b.setScale(newScale, BigDecimal.ROUND_HALF_UP).doubleValue();
    }
}
  1. 添加任务
  private SmsMessage  getSmsMsg() {
      SmsMessage test = new SmsMessage();
      long currentTimeMillis = System.currentTimeMillis();
      //Id字段是为了避免Set集合相同内容被覆盖
      test.setId(Long.valueOf(currentTimeMillis + NumberUtil.getRandNum(NumberUtil.MIN, NumberUtil.MAX)));
      test.serOrderId(10000);
      test.setPhone(10086);
      tset.setContent("测试短信");
      test.setSendTime(1573702940L);
      return test;
  }

  public boolean addRedisZset(SmsRedisDelayedQueueMessage queueMessage) {
      SmsMessage smsMsg = getSmsMsg();
      long time = smsMsg.getSendTime();
      String key = "sms_redis_zset_key";
      String smsMsgStr = JSONObject.toJSONString(smsMsg);
      Map<String, Double> redisZset = new HashMap<String, Double>();
      redisQueue.put(smsMsgStr, (double) time);
      //RedisUtils.setZsetValue工具类在这个链接:https://www.jianshu.com/p/2a7298c09059
      boolean result = RedisUtils.setZsetValue(cacheKey, redisQueue);
      return result;
    }
  1. 消费任务
  public boolean smsRedisZsetConsumer() {
     String lockKey = "sms_redis_lock_key";
      //验证是否有队列正在运行
      boolean existsKey = RedisUtils.existsKey(lockKey);
      if(existsKey) {
          //还有未执行完的队列,跳过本次扫描任务!
          return false;
      }
      try {
        //开始执行定时任务
        //加锁
        RedisUtils.setValue(lockKey, "1");
        String smsZsetKey = "sms_redis_zset_key";
        //选获取Zset长度
        long zsetCount = RedisUtils.getZsetCount(smsZsetKey);
        if(zsetCount == 0) {
            return false;
        }
        //扫描队列次数
        int forCount = (int) (zsetCount / 49) + 1;
        //扫描任务
        for (int i = 0; i < forCount; i++) {
        //获得列表前50个任务
            Set<String> smsZset = RedisUtils.getZsetValuesByRange(smsZsetKey, 0, 49);
            //已消费的任务个数
            int index = 0;
            for (String s : smsZset) {
                SmsMessage  smsMessage= JSONObject.parseObject(s, SmsRedisDelayedQueueMessage.class);
                //如果还没到发送时间就忽略
                if(smsMessage.getSendTime() > (System.currentTimeMillis() / 1000)) {
                    break;
                }
                //执行发送短信任务
                boolean executeSendMsg = executeSendMsg(smsMessage);
                //如果任务消费成功
                if(executeSendMsg) {
                    RedisUtils.remZsetValue(smsZsetKey, s);
                }
                index ++;
            }
            if(index < smsZset.size()) {
                break;
            }
        }
    } catch (Exception e) {
        //扫描短信任务异常
            return false
    }finally {
        RedisUtils.delKey(lockKey);
    }
        return true;
  }

注:RedisUtils工具类在这个链接:https://www.jianshu.com/p/2a7298c09059

上一篇下一篇

猜你喜欢

热点阅读