RocketMQ笔记-Consumer(消费者订阅模式)

2018-07-22  本文已影响1173人  兴浩

1.订阅模式

分2种模式

1.1测试代码

public class DefaultMQPushConsumerTest {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //设置订阅模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

发送10个消息

CLUSTERING模式结果:2个进程各自消费5个消息

BROADCASTING模式结果:2个进程各自消费10个消息


参考:
https://help.aliyun.com/document_detail/29551.html

上一篇下一篇

猜你喜欢

热点阅读