RocketMQ

四、RocketMQ Other Example 有序消息

2019-04-11  本文已影响3人  ASD_92f7

一、概述

参考链接:http://rocketmq.apache.org/docs/order-example/
一个OrderedProducer模拟订单的发送,OrderedConsumer 模拟顺序接收

二、OrderedProducer

如何才能保证接收顺序?RocketMQ要求我们把同一orderId的消息发送到同一个队列上,这就用到了
MessageQueueSelector这个内部类,重写 select 方法即可 从名字上也很直观,即队列选择器
MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
三个参数:
mqs:所有可用的队列
msg:发送的消息
arg:业务参数,在下面的例子中就是我们的orderId
下面的例子,模拟发送了100条数据,相同订单的发送顺序是按照 i 递增的

package com.asd.rocket.controller.chapter2;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;
/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 16:12
 */
public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        producer.setNamesrvAddr("10.1.11.155:9876");
        //Launch the instance.
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            byte[] messageBody =  ("orderId=" + orderId + ",message="+i).getBytes(RemotingHelper.DEFAULT_CHARSET);
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("qqq", tags[i % tags.length], "KEY" + i,messageBody);
            final int f_i = i;
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    System.out.println("Topic:"+tags[f_i % tags.length]+",订单:"+arg+",消息:"+f_i+" 使用的Queue为:"+mqs.get(index).getQueueId());
                    return mqs.get(index);
                }
            }, orderId);
        }
        //server shutdown
        producer.shutdown();
    }
}

三、OrderedConsumer 消费者

package com.asd.rocket.controller.chapter2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 16:17
 */
public class OrderedConsumer {
    public static void main(String[] args) throws  Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("10.1.11.155:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("qqq", "TagA || TagB || TagC");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " +
                        new String(msgs.get(0).getBody()) + ",从队列:"+msgs.get(0).getQueueId()+"获取,"+"%n");
                this.consumeTimes.incrementAndGet();
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
上一篇 下一篇

猜你喜欢

热点阅读