RocketMQ源码解读程序员

顺序消息

2021-01-15  本文已影响0人  93张先生

顺序消息

顺序消息是指消息消费的顺序和生产者发送消息的顺序一样的。

例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

分区有序

分区有序是指这个Topic下这个队列下的消息是有顺序的,生产者发送消息的时候,将严格按照消息的顺序,将消息们发送到一个Topic下的一个队列,从而保证了生产者分区消息有序,消费者进行消费时,进行单线程单队列消费,保证了消费有序。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个队列中严格的按照 FIFO 原则进行消息发布和消费的场景。sharding key 比如订单Id,一个订单的创建、付款、完成有序的,根据算法将这个订单的所有事件发送到同一个队列中去。

全局有序

全局有序是指某个Topic下的所有消息都要保证顺序,可以通过一个Topic只有一个消息队列,保证了全局有序,实际上市分区有序的变种。

消息顺序性保证

全局有序是分区有序的一个特列,只需要设置Topic下消息队列的个数为1即可,因此分区有序消息有序,就可以保证顺序消息。

顺序消息保证三个条件:

  1. 生产者将消息有序的发送到同一个分区队列
  2. 同一个队列的消息是顺序存储的
  3. 消费者以这个发送顺序进行消费
消费者顺序消费实现

消息消费是以消费者组为维度的,一个消费者组可以消费这个topic下的所有消息队列,要保证顺序消费,这个topic下的一个消息队列只能由消费者组中的一个消费者消费,然后消费者消费这个消息队列是单线程消费的,这样就保证了顺序消息消费。

topic下的一个消息队列只能由消费者组中的一个消费者消费,这个由Broker端对消息队列加锁来实现。加锁采用了ConcurrentHashMap。ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable,
一个消费者组对应ConcurrentHashMap<MessageQueue, LockEntry>,LockEntry包含clientId属性,clientId代表一个消费者实例,key为消费者组,一个topic的消息队列,只能由这个消费者中的clientId的消费者消费消息。

一个消息队列由一个线程消费,由一个消息队列一个锁、消费时synchronized关键字共同维护单线程消息消费的。

顺序消费步骤

并发消息消费的流程包含4个步骤:消息队列负载均衡、消息拉取、消息消费、消息消费进度存储。顺序消费略有不同,每个步骤都有加锁或并发控制。

消息队列负均衡

RebalanceService服务每隔20秒执行一次负载均衡方法,在负载均的过程中,针对顺序消息,lock()方法会向Broker端申请锁定MessageQueue,如果锁定失败,说明messageQueue正在消费者消费,不能被拉取消息,等待下次锁定。

// mqSet,为这次负载均衡之后需要消费的队列
for (MessageQueue mq : mqSet) {
    // 新的MessageQueue,新建对应的ProcessQueue
    if (!this.processQueueTable.containsKey(mq)) {
        // 顺序消息,锁定broker端的MessageQueue消息队列,锁定失败,说明messageQueue正在消费者消费,不能被拉取消息;等待下次锁定
        if (isOrder && !this.lock(mq)) {
            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
            continue;
        }
        // 清空这个消费队列原来的消费进度
        this.removeDirtyOffset(mq);
        // 新建MessageQueue对应的消息处理队列ProcessQueue队列
        ProcessQueue pq = new ProcessQueue();
        // 计算从哪里拉取message
        long nextOffset = this.computePullFromWhere(mq);
        if (nextOffset >= 0) {
            ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
            if (pre != null) {
                log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
            } else {
                log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                // 一个PullRequest对应一个MessageQueue,一个ProcessQueue
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                pullRequest.setNextOffset(nextOffset);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(pq);
                pullRequestList.add(pullRequest);
                changed = true;
            }
        } else {
            log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
        }
    }
}
Broker端MessageQueue加锁

RebalanceLockManager是处理Broker的MessageQueue加锁的类,加锁采用了ConcurrentHashMap。
ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable,
topic下的一个消息队列只能由消费者组中的一个消费者消费,一个消费者组对应ConcurrentHashMap<MessageQueue, LockEntry>,LockEntry包含clientId属性,clientId代表一个消费者实例,key为消费者组,一个topic的消息队列,只能由这个消费者中的clientId的消费者消费消息。

LockEntry判定一个MessageQueue是否被锁定,默认锁定60秒,60秒之后消息队列解锁,下次再去锁定。
负载均衡时会执行MessageQueue锁定方法,默认20秒一次负载均衡定时任务,因此下次再锁定时间间隔为20秒。

// 顺序消息,判断MessageQueue是否被锁定
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
    if (groupValue != null) {
        LockEntry lockEntry = groupValue.get(mq);
        if (lockEntry != null) {
            boolean locked = lockEntry.isLocked(clientId);
            if (locked) {
                lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
            }

            return locked;
        }
    }

    return false;
}

public boolean isLocked(final String clientId) {
    boolean eq = this.clientId.equals(clientId);
    return eq && !this.isExpired();
}

public boolean isExpired() {
    boolean expired =
        (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;

    return expired;
}
/**
 * 顺序消息broker锁定消息队列集合
 * @param group
 * @param mqs
 * @param clientId
 * @return
 */
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
    final String clientId) {

    Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
    Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());

    for (MessageQueue mq : mqs) {
        // 锁定加入到锁定队列
        if (this.isLocked(group, mq, clientId)) {
            lockedMqs.add(mq);
        } else {
            // 未锁定队列
            notLockedMqs.add(mq);
        }
    }
    // 存在未锁定队列,进行队列锁定
    if (!notLockedMqs.isEmpty()) {
        try {
            // 获取线程锁,进行锁定操作
            this.lock.lockInterruptibly();
            try {
                // 新建被锁定组的HashMap
                ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                if (null == groupValue) {
                    groupValue = new ConcurrentHashMap<>(32);
                    this.mqLockTable.put(group, groupValue);
                }
                // 进行队列锁定
                for (MessageQueue mq : notLockedMqs) {
                    LockEntry lockEntry = groupValue.get(mq);
                    if (null == lockEntry) {
                        lockEntry = new LockEntry();
                        lockEntry.setClientId(clientId);
                        groupValue.put(mq, lockEntry);
                        log.info(
                            "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
                            group,
                            clientId,
                            mq);
                    }
                    // JVM MQClientInstance 实例锁定,一个JVM实例下,两个消费者,不能属于同一个组,
                    // 要是消费者组相同,只能是两个JVM实例,构成消费者Cluster。
                    if (lockEntry.isLocked(clientId)) {
                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                        lockedMqs.add(mq);
                        continue;
                    }

                    String oldClientId = lockEntry.getClientId();
                    // 锁定实销,重新锁定
                    if (lockEntry.isExpired()) {
                        lockEntry.setClientId(clientId);
                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                        log.warn(
                            "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
                            group,
                            oldClientId,
                            clientId,
                            mq);
                        lockedMqs.add(mq);
                        continue;
                    }

                    log.warn(
                        "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
                        group,
                        oldClientId,
                        clientId,
                        mq);
                }
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }
    }

    return lockedMqs;
}

消息拉取

DefaultMQPushConsumerImpl#pullMessage为消息拉取的主要方法,在这里针对顺序消息进行了PullRequest拉取请求锁定:

  1. ProcessQueue被锁定,第一次拉取消息,pullRequest初始化为未被锁定,首先计算拉取偏移量,然后向消息服务端拉取消息。
  2. processQueue未被上锁,推迟3秒进行pullRequest提交,放入pullRequestQueue队列中,等待broker端对messageQueue进行锁定。
// ProcessQueue被锁定
if (processQueue.isLocked()) {
    // 第一次拉取消息,pullRequest初始化为未被锁定,首先计算拉取偏移量,然后向消息服务端拉取消息。
    if (!pullRequest.isLockedFirst()) {
        // 获取messageQueue的开始消费位置
        final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
        boolean brokerBusy = offset < pullRequest.getNextOffset();
        log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
            pullRequest, offset, brokerBusy);
        if (brokerBusy) {
            log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                pullRequest, offset);
        }
        // 设置pullRequest被锁定
        pullRequest.setLockedFirst(true);
        // 修正offset,从上次broker开始位置消费
        pullRequest.setNextOffset(offset);
    }
} else {
    // processQueue未被上锁,推迟3秒进行pullRequest提交,放入pullRequestQueue队列中,等待broker端对messageQueue进行锁定。
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.info("pull message later because not locked in broker, {}", pullRequest);
    return;
}

消费消费

ConsumeMessageOrderlyService是消息顺序消费的类。

MessageQueueLock messageQueueLock对象消息队列锁容器,严格保证一个消息只有一个线程消费,通过队列锁来实现,一个队列一个锁,获得锁才能进行消息消费。

start()方法每隔20秒,执行一次锁定分配给自己的消息消费队列,该值建议与一次消息负载频率设置相同。在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService 以每秒20s频率对分配给自己的消息队列进行自动加锁操作,从而消费加锁成功的消息消费队列。

持续消费消息,这个消费是以时间为维度的,每次在broker端锁定一个队列60秒,因此线程消费消息60秒。

public void start() {
    // 默认每隔20秒,执行一次锁定分配给自己的消息消费队列,该值建议与一次消息负载频率设置相同。
    // 集群模式下顺序消息消费在创建拉取任务时并未将ProcessQueue的locked状态设置为true,(在负载均衡新建ProcessQueue时,默认locked = false)
    // 在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService 以每秒20s频率对分配给自己的消息队列进行自动加锁操作,
    // 从而消费加锁成功的消息消费队列。
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                // 顺序消息,每20秒,在broker端进行一次消费队列锁定
                ConsumeMessageOrderlyService.this.lockMQPeriodically();
            }
        }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    }
}

ConsumeRequest消费消息请求,实现了Runnable接口,可以被提交到消息消费的线程池中,被并发消费。
这里通过messageQueueLock获取消息队列锁,保证一个消息队列一个线程消费。synchronized保证了消费过程也是单线程的。

public void run() {
    if (this.processQueue.isDropped()) {
        log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
        return;
    }
    // 获取消息队列锁,一个线程消费一个消息队列
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    synchronized (objLock) {
        // 广播模式||processQueue被锁定||processQueue没有失效
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            // 持续消费消息,这个是以时间为消费为维度的,每次锁定线程消费60秒;
            for (boolean continueConsume = true; continueConsume; ) {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    break;
                }
                // 集群模式&&processQueue未被锁定,尝试加锁,并延迟提交请求,在进行拉取消息
                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && !this.processQueue.isLocked()) {
                    log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                }
                // 集群模式&&processQueue已失效,尝试加锁,并延迟提交请求,在进行拉取消息
                if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    && this.processQueue.isLockExpired()) {
                    log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                    break;
                }
                // 顺序消息消费处理逻辑,每一个ConsumeRequest消费任务不是以消费消息条数来计算的,
                // 而是根据消费时间,默认当消费时长大于MAX_TIME_CONSUME_CONTINUOUSLY,
                // 默认60s后,本次消费任务结束,由消费组内其他线程继续消费
                // 消费时间间隔,每次消费任务最大持续时间,60s;延迟提交请求,在进行拉取消息
                long interval = System.currentTimeMillis() - beginTime;
                if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                    ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                    break;
                }
                // ConsumeRequest 中包含的消息条数,默认1条
                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                // 取出消息进行消费,并放入ProcessQueue的consumingMsgOrderlyTreeMap临时存储
                List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                // 还原真实的topic
                defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                // 有消息需要消费
                if (!msgs.isEmpty()) {
                    final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                    ConsumeOrderlyStatus status = null;
                    // 消息钩子
                    ConsumeMessageContext consumeMessageContext = null;
                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext = new ConsumeMessageContext();
                        consumeMessageContext
                            .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                        consumeMessageContext.setMq(messageQueue);
                        consumeMessageContext.setMsgList(msgs);
                        consumeMessageContext.setSuccess(false);
                        // init the consume context type
                        consumeMessageContext.setProps(new HashMap<String, String>());
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                    }

                    long beginTimestamp = System.currentTimeMillis();
                    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                    boolean hasException = false;
                    // 申请消息消费锁,如果消息队列被丢弃,放弃该消息消费队列的消费,
                    // 然后执行消息消息监听器,调用业务方具体消息监听器执行真正的消息消费处理逻辑,
                    // 并通知RocketMQ消息消费结果。
                    // processQueue 上锁
                    try {
                        this.processQueue.getLockConsume().lock();
                        if (this.processQueue.isDropped()) {
                            log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                this.messageQueue);
                            break;
                        }
                        // 消息消费
                        status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                    } catch (Throwable e) {
                        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                            RemotingHelper.exceptionSimpleDesc(e),
                            ConsumeMessageOrderlyService.this.consumerGroup,
                            msgs,
                            messageQueue);
                        hasException = true;
                    } finally {
                        // 释放锁
                        this.processQueue.getLockConsume().unlock();
                    }
                    // 日志
                    if (null == status
                        || ConsumeOrderlyStatus.ROLLBACK == status
                        || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                        log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
                            ConsumeMessageOrderlyService.this.consumerGroup,
                            msgs,
                            messageQueue);
                    }
                    // 设置返回状态
                    long consumeRT = System.currentTimeMillis() - beginTimestamp;
                    if (null == status) {
                        if (hasException) {
                            returnType = ConsumeReturnType.EXCEPTION;
                        } else {
                            returnType = ConsumeReturnType.RETURNNULL;
                        }
                    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                        returnType = ConsumeReturnType.TIME_OUT;
                    } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                        returnType = ConsumeReturnType.FAILED;
                    } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                        returnType = ConsumeReturnType.SUCCESS;
                    }

                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                    }

                    if (null == status) {
                        status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    // 执行消息钩子
                    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                        consumeMessageContext.setStatus(status.toString());
                        consumeMessageContext
                            .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                    }
                    // 消费状态统计
                    ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                        .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                    // 返回消费消息结果,是否进行持续消费
                    continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                } else {
                    // 未取到消息结束本次循环
                    continueConsume = false;
                }
            }
        } else {
            if (this.processQueue.isDropped()) {
                log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }
            // 尝试加锁,并延迟提交请求,在进行拉取消息
            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
        }
    }
}

消息消费进度存储

进行消费进度的更新,其他和并发消息一样,采用ConcurrentHashMap并发安全容器。

上一篇下一篇

猜你喜欢

热点阅读