RocketMQ系列(四):consumer
2020-06-06 本文已影响0人
范柏柏
两种模式
- 集群模式
- 广播模式
集群模式:topic下的同一条消息只允许被同一个group下的其中一个消费者消费
广播模式:topic下的同一条消息被集群内所有消费者消费
推还是拉
rocketMq可以说是推的。但这个推其实是对拉模式的一种封装。
broker和consumer保持长连接。consumer发送拉取请求。拉取请求的触发条件:
- broker有消息进来的时候,会通知consumer,让consumer来拉
- consumer在拉完一次后,会继续发出拉取动作,拉完再拉,拉完再拉
PullRequest
consumer发送拉取请求。请求体如下
PullRequest.png
consumerGroup: 消费者组
messageQueue: 待拉取消费队列
processQueue: 消息处理队列。从broker拉取到的消息,先存入processQueue,然后再提交到消费者消息线程池消费。
nextOffset: 待拉取的messageQueue偏移量
lockedFirst: 是否被锁定
请求体中有待拉取消费队列,consumer怎么知道的从哪个messageQueue拉取?
rocketMq底层,消息指定分配给消费者的实现,是通过把queue队列分配给消费者的方式完成的。
将queue队列指定给特定的consumer后,该queue中的所有消息,都由该consumer进行消费。
怎么把queue分配给consumer的呢,当然也是有策略的
/**
* 为消费者分配queue的策略算法接口
*/
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
*
* @param consumerGroup 当前 consumer群组
* @param currentCID 当前consumer id
* @param mqAll 当前topic的所有queue实例引用
* @param cidAll 当前 consumer群组下所有的consumer id set集合
* @return 根据策略给当前consumer分配的queue列表
*/
List<MessageQueue> allocate(
final String consumerGroup,
final String currentCID,
final List<MessageQueue> mqAll,
final List<String> cidAll
);
/**
* 算法名称
*
* @return The strategy name
*/
String getName();
}
当然,rocketMq也提供了默认的分配策略。
分配策略.png
算法名称 | 含义 |
---|---|
AllocateMessageQueueAveragely | 平均分配算法 |
AllocateMessageQueueAveragelyByCircle | 基于环形平均分配算法 |
AllocateMachineRoomNearby | 基于机房临近原则算法 |
AllocateMessageQueueByMachineRoom | 基于机房分配算法 |
AllocateMessageQueueConsistentHash | 一致性hash算法 |
AllocateMessageQueueByConfig | 基于配置分配算法 |
rocketMq默认使用平均分配算法
public class DefaultMQPushConsumer{
/**
* Default constructor.
*/
public DefaultMQPushConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
}
/**
* Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
*
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy message queue allocating algorithm.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
}
也不可能分配一次就不管了。rocketMq的策略是,每20s进行一次消息负载。也就是consumer和broker的重绑定。
消息体中还有offset,offset存哪
广播模式:因为所有队列都会被所有消费者消费。所以读到哪里的标记,记录在消费者那里。offset存在消费者。
集群模式:队列中的消息,只会被group内的一个consumer消费。所以,offset要存在broker上。
消息拉取流程
- consumer发送拉取请求
- broker收到请求后,根据group、queue、offset返回消息。
- consumer收到消息。