rocketmq-consumer

2018-08-19  本文已影响0人  划水者

rocketmq 消费消息大致有以下几种场景类型

乱序消费,消息被乱序的发送的队列,消费者在消费各个队列时是并行消费,所以不能保证消息的有序性

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TestTopic", "*");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.setConsumeTimestamp("20170422221800");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");

顺序消息,发送到同一个队列的消息需要保证有序消费

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

    consumer.subscribe("TestTopic", "*");

    consumer.setNamesrvAddr("localhost:9876");

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    consumer.setConsumeTimestamp("20170422221800");

    consumer.registerMessageListener(new MessageListenerOrderly() {

        @Override
        public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
                                            final ConsumeOrderlyContext context){
            for(MessageExt messageExt : msgs){
                System.out.println(new String(messageExt.getBody());
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();

    System.out.printf("Consumer Started.%n");

顺序消息用的比较多的是订单系统,订单状态之间的扭转需要保证有序,所以通常同一个订单ID发送相关的状态消息需要保证有序

集群消费和广播消费的区别
集群消费,同一个消费组,均匀的消费该topic下的消息,该消费组下所有的消费者消费的总消息等于该topic下的消息

广播消费,同一个消费组下的每个消费者都消费该topic下的所有消息

上一篇下一篇

猜你喜欢

热点阅读