Java 核心技术

生产级rocketMQ延时消息+redis去重+最好的序列化

2020-01-03  本文已影响0人  rs汀

1.话不多说,先提问题(某互联网公司实际需求~~~~)

一生成订单后如果一个小时没有打款,就自动撤单,并做出惩罚措施。

2.pom文件(部分)

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.3.1</version>
        </dependency>
        <dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.1.3</version>
        </dependency>
        <dependency>
            <groupId>com.dyuproject.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.1.3</version>
        </dependency>

3.直接上代码!生产者

    @Autowired
    private NoticeService noticeService; // 封装的一个mq service类
    private RuntimeSchema<String> timeSchema = RuntimeSchema.createFrom(String.class);//序列化需要使用
    public void test() {
      String messages = buildMQMessage(merchantOrder.getOrderNo(), p.getInvoke(), p.getMethod(), Datas.BORROW);
      this.noticeService.delayNotice(messages, this.timeSchema, "OtcTimer", "timer", p.getTimeLevel());
    }
///
  //构建JSON消息体,有orderNo,需要定时结束执行的反射方法,当前方法(方便日志),类型(区分业务)
   protected String buildMQMessage(String orderNo, String invoke, String method, String type) {
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("orderNo", orderNo);
        jsonObject.put("invoke", invoke);
        jsonObject.put("method", method);
        jsonObject.put("type", type);
        return jsonObject.toJSONString();
    }
//重点讲讲这个方法
/**
*messages:消息体
schema:加一个缓冲区,加快序列化速度
topic:根据topic找消费者
tags:标签
timeLevel:延迟等级messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (取小标为获取对应的延迟时间,也可自定义)
*/
    public <T> void delayNotice(T messages, RuntimeSchema<T> schema, String topic, String tags, Integer timeLevel) {
        String key = OtcUtil.createUUId();
        try {
            byte[] bytes = ProtostuffIOUtil.toByteArray(messages, schema,
                    LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));//加一个缓冲区,加快序列化速度
            Message message = new Message(topic, tags, key, bytes);//组建消息体
            message.setDelayTimeLevel(timeLevel);//设置等级(下标)
            SendResult sendResult = this.defaultMQProducer.send(message);//(发送)
            if (sendResult.getSendStatus() == SendStatus.SEND_OK) {

            }
        } catch (Exception e) {
            // TODO: handle exception
            logger.error("sendMq onException , key : " + key, e);
        }
      }
       
                 

4.直接上代码!消费者

@Component
public class OtcTimerConsumer {

    private final static Logger logger = LoggerFactory.getLogger(OtcTimerConsumer.class);

    private RuntimeSchema<String> schema = RuntimeSchema.createFrom(String.class);

    @Autowired
    private RedisService redisService;//redis去重,防止重复消费

    @Qualifier("borrowProcessTimerService")
    @Autowired
    private ProcessTimerService borrowProcessTimerService;

    @EventListener(condition = "#event.topic == 'OtcTimer'")
    public void rocketmqMsgListen(DefaultMQCustomerEvent event) throws Exception {
        try {
            // 判断key是否存在,去重
            String key = event.getMsg().getKeys();
            Set<Object> set = this.redisService.getRepeat(key);
            if (set.size() > 0) {
                return;
            }
            // 参数解析,反序列化解析参数
            String paramter = schema.newMessage();
            ProtostuffIOUtil.mergeFrom(event.getMsg().getBody(), paramter, schema);
            if (StringUtils.isEmpty(paramter)) {
                throw new BusinessException(Codes.CODE_500, Messages.OTC_MQ_MESSAGE_ISNULL);
            }
            JSONObject jsonObject = JSONObject.parseObject(paramter);
            String orderNo = jsonObject.getString("orderNo");
            String method = jsonObject.getString("method");
            String invoke = jsonObject.getString("invoke");//需要反射的方法
            String type = jsonObject.getString("type");
            // 重点讲解
            //getMethod(需要执行的反射方法,方法里的参数类型)
            //invoke(需要执行的反射类,方法里的参数)
           if (type.equals(Datas.BORROW)) {
                this.borrowProcessTimerService.getClass().getMethod(invoke, String.class, String.class, String.class)
                        .invoke(this.borrowProcessTimerService, orderNo, method, invoke);
            }
            // 将消费完的key放入缓存,去重
            this.redisService.setRepeat("Otc:Timer:" + orderNo + ":" + invoke, paramter,
                    Double.valueOf(System.nanoTime()));
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new BusinessException(Codes.CODE_500, e.getMessage());
        }
    }

}

5.直接上代码!反射方法,处理撤单,惩罚相关逻辑

@Service
public class BorrowProcessTimerService extends BorrowSuperService implements ProcessTimerService {
    @Transactional(rollbackFor = Exception.class)
    public void orderTimer(String orderNo, String method, String invoke) {
            // 具体逻辑
            // 我这里用到了事务,分布式锁==保证安全
            // 处理异常日志
        }
}

总结:
(1)无需再轮询全部订单,效率高
(2)一个订单,任务只执行一次
(3)时效性好

IMG_2423.JPG
上一篇下一篇

猜你喜欢

热点阅读