rocketmq

顺序消费

2021-06-17  本文已影响0人  念䋛

生产者
根据自定义的规则,将某一类的消息发送到同一个queue中,可以hash与队列的个数取余

public static void main(String[] args) throws UnsupportedEncodingException {
    try {
        //默认情况下,broker为每一个topic创建4个queue,生产者把要顺序生产的消息一次的发送到同一个队列
        DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name");
        producer.setNamesrvAddr ("192.168.44.145:9876");
        producer.start ();
        for (int i = 0; i < 10; i++) {
            int orderId = i;
            for (int j = 0; j <= 5; j++) {
                Message msg =
                        new Message ("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
                                ("order_" + orderId + " step " + j).getBytes (RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send (msg, new MessageQueueSelector () {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = ( Integer ) arg;
                        int index = id % mqs.size ();
                        //返回就是消息发往到queue的id
                        return mqs.get (index);
                    }
                  //orderId就是select的arg变量
                }, orderId);
                System.out.printf ("%s%n", sendResult);
            }
        }

        producer.shutdown ();
    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
        e.printStackTrace ();
    }
}

消费者
消费者要注意使用的监听器MessageListenerOrderly,处理同一个queue的消息使用的是一个线程,单线程获取一个queue的消息保证了消息的顺序消费

public static void main(String[] args) throws MQClientException {
    //顺序消费不是全局顺序,只是分区顺序。要全局顺序只能分配一个queue。
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("please_rename_unique_group_name_3");
    consumer.setNamesrvAddr ("192.168.44.145:9876");
    consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe ("OrderTopicTest", "*");
    //顺序消费,就需要保证消费端用同一个线程处理一个queue的消息
    consumer.registerMessageListener (new MessageListenerOrderly () {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            context.setAutoCommit (true);
            for (MessageExt msg : msgs) {
                System.out.println ("收到消息内容 " + new String (msg.getBody ())+"消息队列id-"+msg.getQueueId ());
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start ();
    System.out.printf ("Consumer Started.%n");
}

消费者收到的消息, order_X代表同一类的消息,step X 是该类消息的步骤,
可以看出来,同一类的消息的步骤是按照顺序的,但是不同类之间可能会相互的穿插,证明了顺序消费是局部消费,不是全局消费.如果要实现全局消费的话,topic创建的时候只创建一个queue(默认为4个)
收到消息内容 order_3 step 0消息队列id-3
收到消息内容 order_2 step 0消息队列id-2
收到消息内容 order_0 step 0消息队列id-0
收到消息内容 order_1 step 0消息队列id-1
收到消息内容 order_1 step 1消息队列id-1
收到消息内容 order_3 step 1消息队列id-3
收到消息内容 order_2 step 1消息队列id-2
收到消息内容 order_0 step 1消息队列id-0
收到消息内容 order_3 step 2消息队列id-3
收到消息内容 order_1 step 2消息队列id-1
收到消息内容 order_0 step 2消息队列id-0
收到消息内容 order_2 step 2消息队列id-2
收到消息内容 order_1 step 3消息队列id-1
收到消息内容 order_3 step 3消息队列id-3
收到消息内容 order_2 step 3消息队列id-2
收到消息内容 order_0 step 3消息队列id-0
收到消息内容 order_3 step 4消息队列id-3
收到消息内容 order_1 step 4消息队列id-1

上一篇 下一篇

猜你喜欢

热点阅读