(十四)顺序消费如何保证消息消费

2021-08-04  本文已影响0人  guessguess

集群模式下顺序消费的时候,是通过加锁的方式对队列进行占有。
其实在单个消费者单个队列的情况下,加锁感觉是多余的。但是如果要支持消费者集群,那自然免不了加锁。
顺序消费的方式,其实多个消费者还是要等,那么多个消费者的目的,应该是高可用。

先来看看几个上锁以及解锁的机制。

负载服务对于队列的锁定
public class RebalanceService extends ServiceThread {
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
        log.info(this.getServiceName() + " service end");
    }
}

负载服务对于消息队列的锁定大概是20秒一次。对重新分配到的队列申请上锁。

Broker端上锁
public class RebalanceLockManager {
    LockEntry用于记录每个客户端的上锁时间
    static class LockEntry {
        private String clientId;
        private volatile long lastUpdateTimestamp = System.currentTimeMillis();
        public boolean isExpired() {
            boolean expired =
                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;

            return expired;
        }
    }
}

REBALANCE_LOCK_MAX_LIVE_TIME为60秒。即每次上锁的有效时间为60秒。

消费者内部对于队列的锁

只有开始消费该队列的消息时才会开始上锁,代码如下

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        public ProcessQueue getProcessQueue() {
            return processQueue;
        }

        public MessageQueue getMessageQueue() {
            return messageQueue;
        }

        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
            ........
            }
}

在对队列进行消费之前,又会上锁。这里是以队列作为锁的粒度,保证在消费者中,队列同一时刻只能被一个线程消费。

开始消费要进行加锁
    class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;

        public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
            this.processQueue = processQueue;
            this.messageQueue = messageQueue;
        }

        public ProcessQueue getProcessQueue() {
            return processQueue;
        }

        public MessageQueue getMessageQueue() {
            return messageQueue;
        }

        @Override
        public void run() {
            if (this.processQueue.isDropped()) {
                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                return;
            }

            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                            boolean hasException = false;
                            try {
                                表明开始消费
                                this.processQueue.getLockConsume().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }

                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue);
                                hasException = true;
                            } finally {
                                this.processQueue.getLockConsume().unlock();
                            }
            }
}

对队列添加消费的锁,表明已经开始消费。

从源码的结构看,总共是有三把锁,用于保证顺序消费。
首先是Broker端,用于保证不被其他消费者消费。总共还是很好理解的。
其次,对于consumer中队列的锁,保证处理队列同一时刻只会被一个线程所消费。
那么处理队列的消费锁,是用来做什么的?

那么处理队列的消费锁,是用来做什么的?

其实有一种情况是,当某个队列的消息正在被消费中,如果这个时候由于负载服务,导致队列重新分配,然后将队列让出去的话,那么就无法保证顺序消费了。所以负载服务对于正在消费中的处理队列是不会移除的。
代码如下

public abstract class RebalanceImpl {
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
        final boolean isOrder) {
        boolean changed = false;

        Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = it.next();
            MessageQueue mq = next.getKey();
            ProcessQueue pq = next.getValue();

            if (mq.getTopic().equals(topic)) {
                if (!mqSet.contains(mq)) {
                    pq.setDropped(true);
                    只有当这个方法返回true的时候,处理队列才会被移除。
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) {
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY:
                            break;
                        case CONSUME_PASSIVELY:
                            pq.setDropped(true);
                            只有当这个方法返回true的时候,处理队列才会被移除。
                            if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                                it.remove();
                                changed = true;
                                log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                    consumerGroup, mq);
                            }
                            break;
                        default:
                            break;
                    }
                }
            }
        }
        。。。省略部分代码
    }

    @Override
    public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
        this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
        this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
        if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
            && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
            try {
                如果拿得到消费锁,说明消费完了
                if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
                    try {
                        需要判断是否还有消息没有消费完
                        return this.unlockDelay(mq, pq);
                    } finally {
                        pq.getLockConsume().unlock();
                    }
                } else {
                    log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
                        mq,
                        pq.getTryUnlockTimes());

                    pq.incTryUnlockTimes();
                }
            } catch (Exception e) {
                log.error("removeUnnecessaryMessageQueue Exception", e);
            }
            拿不到锁,说明正在消费,会直接返回false
            return false;
        }
        return true;
    }

}

从上面的代码可以看出,如果拿不到消费锁,说明正在消费,直接返回false, 处理队列不会被移除。
那如果拿得到锁,说明已经消费完,那么是不是需要判断是否还有消息没有消费完?

public class RebalancePushImpl extends RebalanceImpl {
    private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
        如果还有消息遗留,最后还是要去释放broker端的锁,只不过延时20秒后去执行。
        if (pq.hasTempMessage()) {
            log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
            this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
                @Override
                public void run() {
                    log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
                    RebalancePushImpl.this.unlock(mq, true);
                }
            }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
        } else {
            this.unlock(mq, true);
        }
        return true;
    }

从代码看的话,如果拿到消费锁,还有消息遗留,说明消费很多次了,这个消息还没有消费完,那么为什么消息并没有被完全消费完还要去释放锁呢?
因为对于集群模式下的顺序消费而言,如果sendback失败,则会一直在内部进行重试。会导致有消息无法被完全消费。如果一直有消息无法被完全消费就让消费者一直持有锁,会导致类似死锁的情况。所以这里rocketmq在判断已经消费完,还有消息遗留的情况下,会去强制释放broker端的队列锁。

如何去延长锁的持有时间

那么如果消费持续时间很长,消费的过程会不会被打断呢?被打断是有可能导致消息无法顺序消费的。那么如何保证?
这里也与设计层面有关。
首先像上面的代码,如果队列处于消费中状态的话,处理队列是不会被移除的。
然后负载服务会不断的对处理队列对应的队列去申请上锁。
下面还是看看broker端对于上锁的处理。

public class RebalanceLockManager {
    LockEntry用于记录每个客户端的上锁时间
    static class LockEntry {
        private String clientId;
        private volatile long lastUpdateTimestamp = System.currentTimeMillis();

        public String getClientId() {
            return clientId;
        }

        public void setClientId(String clientId) {
            this.clientId = clientId;
        }

        public long getLastUpdateTimestamp() {
            return lastUpdateTimestamp;
        }

        public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
            this.lastUpdateTimestamp = lastUpdateTimestamp;
        }

        public boolean isLocked(final String clientId) {
            boolean eq = this.clientId.equals(clientId);
            return eq && !this.isExpired();
        }

        public boolean isExpired() {
            boolean expired =
                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;

            return expired;
        }
    }
    用于记录每个队列的锁的持有者
    private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
   
    用于判断某个客户端是否持有该队列的锁,支持重入---更新持有锁的时间
    private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
        ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
        if (groupValue != null) {
            LockEntry lockEntry = groupValue.get(mq);
            if (lockEntry != null) {
                boolean locked = lockEntry.isLocked(clientId);
                if (locked) {
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                }
                return locked;
            }
        }
        return false;
    }

    某个组的消费者尝试对队列进行上锁
    public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
        final String clientId) {
        Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
        Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());

        for (MessageQueue mq : mqs) {
            if (this.isLocked(group, mq, clientId)) {
                保存已经上锁的消息队列
                lockedMqs.add(mq);
            } else {
                保存获取不到锁的消息队列
                notLockedMqs.add(mq);
            }
        }

        对获取不到锁的消息队列的处理
        if (!notLockedMqs.isEmpty()) {
            try {
                this.lock.lockInterruptibly();
                try {
                    如果获取不到该组的任何信息,先进行初始化
                    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                    if (null == groupValue) {
                        groupValue = new ConcurrentHashMap<>(32);
                        this.mqLockTable.put(group, groupValue);
                    }
                    
                    for (MessageQueue mq : notLockedMqs) {
                        LockEntry lockEntry = groupValue.get(mq);
                        如果不存在该队列的持有者信息,直接上锁,说明没有被上锁
                        if (null == lockEntry) {
                            lockEntry = new LockEntry();
                            lockEntry.setClientId(clientId);
                            groupValue.put(mq, lockEntry);
                        }
                        已经持有了,进行重入,更新上锁时间
                        if (lockEntry.isLocked(clientId)) {
                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                            lockedMqs.add(mq);
                            continue;
                        }

                        String oldClientId = lockEntry.getClientId();
                        如果持有者id与当前消费者id不一样,但是原先持有者已经过期了,直接进行上锁
                        if (lockEntry.isExpired()) {
                            lockEntry.setClientId(clientId);
                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                            lockedMqs.add(mq);
                            continue;
                        }
                    }
                } finally {
                    最后解锁
                    this.lock.unlock();
                }
            } catch (InterruptedException e) {
                log.error("putMessage exception", e);
            }
        }
       返回已经上锁的消息队列
        return lockedMqs;
    }

}

从代码来看,broker端是允许锁去重入的,其实就是去更新锁的持有时间。
如果一个消费者消费持续消费了某个队列很久,那么消费服务会不断的去申请上锁,就可以一直保持对该队列的持有,其他消费者也只能等待了。

上一篇下一篇

猜你喜欢

热点阅读