四、RocketMQ Other Example 有序消息
2019-04-11 本文已影响3人
ASD_92f7
一、概述
参考链接:http://rocketmq.apache.org/docs/order-example/
一个OrderedProducer模拟订单的发送,OrderedConsumer 模拟顺序接收
二、OrderedProducer
如何才能保证接收顺序?RocketMQ要求我们把同一orderId的消息发送到同一个队列上,这就用到了
MessageQueueSelector这个内部类,重写 select 方法即可 从名字上也很直观,即队列选择器
MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
三个参数:
mqs:所有可用的队列
msg:发送的消息
arg:业务参数,在下面的例子中就是我们的orderId
下面的例子,模拟发送了100条数据,相同订单的发送顺序是按照 i 递增的
package com.asd.rocket.controller.chapter2;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/10 16:12
*/
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
producer.setNamesrvAddr("10.1.11.155:9876");
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
byte[] messageBody = ("orderId=" + orderId + ",message="+i).getBytes(RemotingHelper.DEFAULT_CHARSET);
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("qqq", tags[i % tags.length], "KEY" + i,messageBody);
final int f_i = i;
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
System.out.println("Topic:"+tags[f_i % tags.length]+",订单:"+arg+",消息:"+f_i+" 使用的Queue为:"+mqs.get(index).getQueueId());
return mqs.get(index);
}
}, orderId);
}
//server shutdown
producer.shutdown();
}
}
三、OrderedConsumer 消费者
package com.asd.rocket.controller.chapter2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author zhangluping@sinosoft.com.cn
* @date 2019/4/10 16:17
*/
public class OrderedConsumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setNamesrvAddr("10.1.11.155:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("qqq", "TagA || TagB || TagC");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " +
new String(msgs.get(0).getBody()) + ",从队列:"+msgs.get(0).getQueueId()+"获取,"+"%n");
this.consumeTimes.incrementAndGet();
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}