生产级rocketMQ延时消息+redis去重+最好的序列化
2020-01-03 本文已影响0人
rs汀
1.话不多说,先提问题(某互联网公司实际需求~~~~)
一生成订单后如果一个小时没有打款,就自动撤单,并做出惩罚措施。
- 本文所涉及技术RocketMQ版本:4.3.1 ,JDK1.8,protostuff版本1.1.3
2.pom文件(部分)
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.1.3</version>
</dependency>
3.直接上代码!生产者
@Autowired
private NoticeService noticeService; // 封装的一个mq service类
private RuntimeSchema<String> timeSchema = RuntimeSchema.createFrom(String.class);//序列化需要使用
public void test() {
String messages = buildMQMessage(merchantOrder.getOrderNo(), p.getInvoke(), p.getMethod(), Datas.BORROW);
this.noticeService.delayNotice(messages, this.timeSchema, "OtcTimer", "timer", p.getTimeLevel());
}
///
//构建JSON消息体,有orderNo,需要定时结束执行的反射方法,当前方法(方便日志),类型(区分业务)
protected String buildMQMessage(String orderNo, String invoke, String method, String type) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("orderNo", orderNo);
jsonObject.put("invoke", invoke);
jsonObject.put("method", method);
jsonObject.put("type", type);
return jsonObject.toJSONString();
}
//重点讲讲这个方法
/**
*messages:消息体
schema:加一个缓冲区,加快序列化速度
topic:根据topic找消费者
tags:标签
timeLevel:延迟等级messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (取小标为获取对应的延迟时间,也可自定义)
*/
public <T> void delayNotice(T messages, RuntimeSchema<T> schema, String topic, String tags, Integer timeLevel) {
String key = OtcUtil.createUUId();
try {
byte[] bytes = ProtostuffIOUtil.toByteArray(messages, schema,
LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));//加一个缓冲区,加快序列化速度
Message message = new Message(topic, tags, key, bytes);//组建消息体
message.setDelayTimeLevel(timeLevel);//设置等级(下标)
SendResult sendResult = this.defaultMQProducer.send(message);//(发送)
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
}
} catch (Exception e) {
// TODO: handle exception
logger.error("sendMq onException , key : " + key, e);
}
}
4.直接上代码!消费者
@Component
public class OtcTimerConsumer {
private final static Logger logger = LoggerFactory.getLogger(OtcTimerConsumer.class);
private RuntimeSchema<String> schema = RuntimeSchema.createFrom(String.class);
@Autowired
private RedisService redisService;//redis去重,防止重复消费
@Qualifier("borrowProcessTimerService")
@Autowired
private ProcessTimerService borrowProcessTimerService;
@EventListener(condition = "#event.topic == 'OtcTimer'")
public void rocketmqMsgListen(DefaultMQCustomerEvent event) throws Exception {
try {
// 判断key是否存在,去重
String key = event.getMsg().getKeys();
Set<Object> set = this.redisService.getRepeat(key);
if (set.size() > 0) {
return;
}
// 参数解析,反序列化解析参数
String paramter = schema.newMessage();
ProtostuffIOUtil.mergeFrom(event.getMsg().getBody(), paramter, schema);
if (StringUtils.isEmpty(paramter)) {
throw new BusinessException(Codes.CODE_500, Messages.OTC_MQ_MESSAGE_ISNULL);
}
JSONObject jsonObject = JSONObject.parseObject(paramter);
String orderNo = jsonObject.getString("orderNo");
String method = jsonObject.getString("method");
String invoke = jsonObject.getString("invoke");//需要反射的方法
String type = jsonObject.getString("type");
// 重点讲解
//getMethod(需要执行的反射方法,方法里的参数类型)
//invoke(需要执行的反射类,方法里的参数)
if (type.equals(Datas.BORROW)) {
this.borrowProcessTimerService.getClass().getMethod(invoke, String.class, String.class, String.class)
.invoke(this.borrowProcessTimerService, orderNo, method, invoke);
}
// 将消费完的key放入缓存,去重
this.redisService.setRepeat("Otc:Timer:" + orderNo + ":" + invoke, paramter,
Double.valueOf(System.nanoTime()));
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new BusinessException(Codes.CODE_500, e.getMessage());
}
}
}
5.直接上代码!反射方法,处理撤单,惩罚相关逻辑
@Service
public class BorrowProcessTimerService extends BorrowSuperService implements ProcessTimerService {
@Transactional(rollbackFor = Exception.class)
public void orderTimer(String orderNo, String method, String invoke) {
// 具体逻辑
// 我这里用到了事务,分布式锁==保证安全
// 处理异常日志
}
}
总结:
(1)无需再轮询全部订单,效率高
(2)一个订单,任务只执行一次
(3)时效性好