RocketMQ 的OrderMessage demo

2018-04-27  本文已影响0人  totohui

如何保证消息的顺序消费,只要将一组需要顺序消费的消息发送到同一个broker的同一个队列上,并且消费者采用有序Listener即可。

下面的代码,发送十个订单,每个订单有创建,支付,发货状态;

@RequestMapping("/order_mq")

@ResponseBody

public Result oreder_mq() {

String[] tags = new String[] { "createTag", "payTag", "sendTag" };

for (int orderId = 0; orderId < 10; orderId++) {

for (int type = 0; type < 3; type++) {

Message msg = new Message("orderTopic", tags[type % tags.length], orderId + ":" + type, (orderId + ":" + type).getBytes());

sender.sendOrderMessage(orderId, msg);

}

}

return Result.success("Hello,world"); }

MQSender.java

public void sendOrderMessage(int orderId, Message msg) {

log.info("send message:" + msg);

rocketMQTemplate.asyncSendOrderly("orderTopic", msg, orderId + "", null, 3000);

}

MQReceiver.java需要实现RocketMQPushConsumerLifecycleListener,并且注册MessageListenerOrderly

@Service

@RocketMQMessageListener(topic = "orderTopic", consumerGroup = "my-consumer_orderTopic")

public class MQReceiver1 implements RocketMQListener, RocketMQPushConsumerLifecycleListener {

private static Logger log = LoggerFactory.getLogger(MQReceiver1.class);

public void prepareStart(DefaultMQPushConsumer consumer) { consumer.registerMessageListener(new MessageListenerOrderly(){

public ConsumeOrderlyStatus consumeMessage(

List msgs, ConsumeOrderlyContext context) {

try {

log.info(new String(msgs.get(0).getBody(), "UTF-8"));

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

return ConsumeOrderlyStatus.SUCCESS;

} }); } 

}

上一篇下一篇

猜你喜欢

热点阅读