(十四)顺序消费如何保证消息消费
集群模式下顺序消费的时候,是通过加锁的方式对队列进行占有。
其实在单个消费者单个队列的情况下,加锁感觉是多余的。但是如果要支持消费者集群,那自然免不了加锁。
顺序消费的方式,其实多个消费者还是要等,那么多个消费者的目的,应该是高可用。
先来看看几个上锁以及解锁的机制。
负载服务对于队列的锁定
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
负载服务对于消息队列的锁定大概是20秒一次。对重新分配到的队列申请上锁。
Broker端上锁
public class RebalanceLockManager {
LockEntry用于记录每个客户端的上锁时间
static class LockEntry {
private String clientId;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public boolean isExpired() {
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
}
}
REBALANCE_LOCK_MAX_LIVE_TIME为60秒。即每次上锁的有效时间为60秒。
消费者内部对于队列的锁
只有开始消费该队列的消息时才会开始上锁,代码如下
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
........
}
}
在对队列进行消费之前,又会上锁。这里是以队列作为锁的粒度,保证在消费者中,队列同一时刻只能被一个线程消费。
开始消费要进行加锁
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
boolean hasException = false;
try {
表明开始消费
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
}
}
对队列添加消费的锁,表明已经开始消费。
从源码的结构看,总共是有三把锁,用于保证顺序消费。
首先是Broker端,用于保证不被其他消费者消费。总共还是很好理解的。
其次,对于consumer中队列的锁,保证处理队列同一时刻只会被一个线程所消费。
那么处理队列的消费锁,是用来做什么的?
那么处理队列的消费锁,是用来做什么的?
其实有一种情况是,当某个队列的消息正在被消费中,如果这个时候由于负载服务,导致队列重新分配,然后将队列让出去的话,那么就无法保证顺序消费了。所以负载服务对于正在消费中的处理队列是不会移除的。
代码如下
public abstract class RebalanceImpl {
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
只有当这个方法返回true的时候,处理队列才会被移除。
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
只有当这个方法返回true的时候,处理队列才会被移除。
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
。。。省略部分代码
}
@Override
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
如果拿得到消费锁,说明消费完了
if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
需要判断是否还有消息没有消费完
return this.unlockDelay(mq, pq);
} finally {
pq.getLockConsume().unlock();
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
mq,
pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
}
} catch (Exception e) {
log.error("removeUnnecessaryMessageQueue Exception", e);
}
拿不到锁,说明正在消费,会直接返回false
return false;
}
return true;
}
}
从上面的代码可以看出,如果拿不到消费锁,说明正在消费,直接返回false, 处理队列不会被移除。
那如果拿得到锁,说明已经消费完,那么是不是需要判断是否还有消息没有消费完?
public class RebalancePushImpl extends RebalanceImpl {
private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
如果还有消息遗留,最后还是要去释放broker端的锁,只不过延时20秒后去执行。
if (pq.hasTempMessage()) {
log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
@Override
public void run() {
log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
RebalancePushImpl.this.unlock(mq, true);
}
}, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
} else {
this.unlock(mq, true);
}
return true;
}
从代码看的话,如果拿到消费锁,还有消息遗留,说明消费很多次了,这个消息还没有消费完,那么为什么消息并没有被完全消费完还要去释放锁呢?
因为对于集群模式下的顺序消费而言,如果sendback失败,则会一直在内部进行重试。会导致有消息无法被完全消费。如果一直有消息无法被完全消费就让消费者一直持有锁,会导致类似死锁的情况。所以这里rocketmq在判断已经消费完,还有消息遗留的情况下,会去强制释放broker端的队列锁。
如何去延长锁的持有时间
那么如果消费持续时间很长,消费的过程会不会被打断呢?被打断是有可能导致消息无法顺序消费的。那么如何保证?
这里也与设计层面有关。
首先像上面的代码,如果队列处于消费中状态的话,处理队列是不会被移除的。
然后负载服务会不断的对处理队列对应的队列去申请上锁。
下面还是看看broker端对于上锁的处理。
public class RebalanceLockManager {
LockEntry用于记录每个客户端的上锁时间
static class LockEntry {
private String clientId;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
public boolean isLocked(final String clientId) {
boolean eq = this.clientId.equals(clientId);
return eq && !this.isExpired();
}
public boolean isExpired() {
boolean expired =
(System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
return expired;
}
}
用于记录每个队列的锁的持有者
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
用于判断某个客户端是否持有该队列的锁,支持重入---更新持有锁的时间
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (groupValue != null) {
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
某个组的消费者尝试对队列进行上锁
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
final String clientId) {
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
for (MessageQueue mq : mqs) {
if (this.isLocked(group, mq, clientId)) {
保存已经上锁的消息队列
lockedMqs.add(mq);
} else {
保存获取不到锁的消息队列
notLockedMqs.add(mq);
}
}
对获取不到锁的消息队列的处理
if (!notLockedMqs.isEmpty()) {
try {
this.lock.lockInterruptibly();
try {
如果获取不到该组的任何信息,先进行初始化
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue) {
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
}
for (MessageQueue mq : notLockedMqs) {
LockEntry lockEntry = groupValue.get(mq);
如果不存在该队列的持有者信息,直接上锁,说明没有被上锁
if (null == lockEntry) {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
}
已经持有了,进行重入,更新上锁时间
if (lockEntry.isLocked(clientId)) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
String oldClientId = lockEntry.getClientId();
如果持有者id与当前消费者id不一样,但是原先持有者已经过期了,直接进行上锁
if (lockEntry.isExpired()) {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
lockedMqs.add(mq);
continue;
}
}
} finally {
最后解锁
this.lock.unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
}
返回已经上锁的消息队列
return lockedMqs;
}
}
从代码来看,broker端是允许锁去重入的,其实就是去更新锁的持有时间。
如果一个消费者消费持续消费了某个队列很久,那么消费服务会不断的去申请上锁,就可以一直保持对该队列的持有,其他消费者也只能等待了。