rocketmq-consumer
2018-08-19 本文已影响0人
划水者
rocketmq 消费消息大致有以下几种场景类型
乱序消费,消息被乱序的发送的队列,消费者在消费各个队列时是并行消费,所以不能保证消息的有序性
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TestTopic", "*");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
顺序消息,发送到同一个队列的消息需要保证有序消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TestTopic", "*");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeOrderlyContext context){
for(MessageExt messageExt : msgs){
System.out.println(new String(messageExt.getBody());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
顺序消息用的比较多的是订单系统,订单状态之间的扭转需要保证有序,所以通常同一个订单ID发送相关的状态消息需要保证有序
集群消费和广播消费的区别
集群消费,同一个消费组,均匀的消费该topic下的消息,该消费组下所有的消费者消费的总消息等于该topic下的消息
广播消费,同一个消费组下的每个消费者都消费该topic下的所有消息