RocketMQ系列

RocketMQ系列(四):consumer

2020-06-06  本文已影响0人  范柏柏

两种模式

推还是拉

rocketMq可以说是推的。但这个推其实是对拉模式的一种封装。
broker和consumer保持长连接。consumer发送拉取请求。拉取请求的触发条件:

  1. broker有消息进来的时候,会通知consumer,让consumer来拉
  2. consumer在拉完一次后,会继续发出拉取动作,拉完再拉,拉完再拉

PullRequest

consumer发送拉取请求。请求体如下


PullRequest.png

consumerGroup: 消费者组
messageQueue: 待拉取消费队列
processQueue: 消息处理队列。从broker拉取到的消息,先存入processQueue,然后再提交到消费者消息线程池消费。
nextOffset: 待拉取的messageQueue偏移量
lockedFirst: 是否被锁定

请求体中有待拉取消费队列,consumer怎么知道的从哪个messageQueue拉取?

rocketMq底层,消息指定分配给消费者的实现,是通过把queue队列分配给消费者的方式完成的。

将queue队列指定给特定的consumer后,该queue中的所有消息,都由该consumer进行消费。

怎么把queue分配给consumer的呢,当然也是有策略的

/**
 * 为消费者分配queue的策略算法接口
 */
public interface AllocateMessageQueueStrategy {

    /**
     * Allocating by consumer id
     *
     * @param consumerGroup 当前 consumer群组
     * @param currentCID 当前consumer id
     * @param mqAll 当前topic的所有queue实例引用
     * @param cidAll 当前 consumer群组下所有的consumer id set集合
     * @return 根据策略给当前consumer分配的queue列表
     */
    List<MessageQueue> allocate(
        final String consumerGroup,
        final String currentCID,
        final List<MessageQueue> mqAll,
        final List<String> cidAll
    );

    /**
     * 算法名称
     *
     * @return The strategy name
     */
    String getName();
}

当然,rocketMq也提供了默认的分配策略。


分配策略.png
算法名称 含义
AllocateMessageQueueAveragely 平均分配算法
AllocateMessageQueueAveragelyByCircle 基于环形平均分配算法
AllocateMachineRoomNearby 基于机房临近原则算法
AllocateMessageQueueByMachineRoom 基于机房分配算法
AllocateMessageQueueConsistentHash 一致性hash算法
AllocateMessageQueueByConfig 基于配置分配算法

rocketMq默认使用平均分配算法

public class DefaultMQPushConsumer{    
    /**
     * Default constructor.
     */
    public DefaultMQPushConsumer() {
        this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
    }

    /**
     * Constructor specifying consumer group, RPC hook and message queue allocating algorithm.
     *
     * @param consumerGroup Consume queue.
     * @param rpcHook RPC hook to execute before each remoting command.
     * @param allocateMessageQueueStrategy message queue allocating algorithm.
     */
    public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }
}

也不可能分配一次就不管了。rocketMq的策略是,每20s进行一次消息负载。也就是consumer和broker的重绑定。

消息体中还有offset,offset存哪

广播模式:因为所有队列都会被所有消费者消费。所以读到哪里的标记,记录在消费者那里。offset存在消费者。

集群模式:队列中的消息,只会被group内的一个consumer消费。所以,offset要存在broker上。

消息拉取流程

  1. consumer发送拉取请求
  2. broker收到请求后,根据group、queue、offset返回消息。
  3. consumer收到消息。
上一篇下一篇

猜你喜欢

热点阅读