RocketMQRocketMQ专辑程序员

RocketMQ顺序消息

2017-09-14  本文已影响92人  运维开发笔记

顺序消息

之前我本地使用的client版本是3.6.2的,但是公司服务器上安得是3.2.6的版本。导致我测试顺序消息一直不成功。后来将client版本降低到3.2.6终于测试成功。所以在使用时,还是要注意一下版本的匹配,否则可能有诡异的错误。

producer

package com.yunsheng.orderExample;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;

import java.util.List;


public class OrderedProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
        producer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 10; i++) {
            int orderId = 0;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicOrder","TagA", "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);

            System.out.printf("%s%n", sendResult);
        }
        //server shutdown
        producer.shutdown();
    }
}

解析:
要保证消息的顺序性,在发送消息时,这一组消息必须发送到同一个queue中。(一个broker默认4个queue)。
在上面的代码中,orderId表示一个订单号。
在send方法中实现了一个选择器。这个选择器的作用就是根据orderId对queue的数量取模,保证同一个orderId的所有消息落到同一个queue上。

consumer

package com.yunsheng.orderExample;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.yunsheng.Factory;

import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

public class OrderedConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("10.135.17.26:9876;10.135.17.27:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicOrder", "TagA");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random(10);
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                       ConsumeOrderlyContext context) {
                context.setAutoCommit(true);

                for (MessageExt msg : msgs) {
                    System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + new String(msg.getBody()) + "%n");

                }

                try {
                    Thread.sleep(random.nextInt());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                return ConsumeOrderlyStatus.SUCCESS;

            }
        });

        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

解析:
上面保证了生产端的消息顺序性,那么消费端必须保证消息被顺序的消费。使用MessageListenerOrderly。作用是,必须等前面的消息消费完,后面的消息才能进行消费。
在代码里加了sleep验证。

结果:

ConsumeMessageThread_1 Receive New Messages: Hello RocketMQ 1
ConsumeMessageThread_2 Receive New Messages: Hello RocketMQ 2
ConsumeMessageThread_4 Receive New Messages: Hello RocketMQ 3
ConsumeMessageThread_6 Receive New Messages: Hello RocketMQ 4
ConsumeMessageThread_5 Receive New Messages: Hello RocketMQ 5
ConsumeMessageThread_7 Receive New Messages: Hello RocketMQ 6
ConsumeMessageThread_8 Receive New Messages: Hello RocketMQ 7
ConsumeMessageThread_9 Receive New Messages: Hello RocketMQ 8
ConsumeMessageThread_10 Receive New Messages: Hello RocketMQ 9

可以看到并不是单线程处理的,但是保证了顺序消费。

上一篇 下一篇

猜你喜欢

热点阅读