rocketmq_顺序消息
2022-05-26 本文已影响0人
kele2018
Q:在rocketmq语境下,如何定义【顺序】这个词?
消费效果应该保持我们业务定义上的一种顺序;比如:一个订单的支付状态:待支付、已支付、已退款。
Q:为了保证这种效果,生产端应该如何做?
(1)保证消息的发送顺序;即先发待支付,再发已支付、最后再发已退款;
(2)让一个订单的消息进入同一个队列;比如,待支付消息进入1号队列,已支付消息进入2号队列,已退款消息进入3号队列,那么虽然它们发送的时候有先后顺序,但是到了队列中都是第一条消息,那么假如有三个消费者,很有可能后面发送的先消费;
Q:为了保证这种效果,消费端应该如何做?
(1)分布式锁:防止一个消费者组下的多个消费者同时消费一个队列;比如,1号队列现在负载给了消费者A,过一会儿之后又负载给了B;(因为大都用同一套负载均衡算法,只要消费者数量不变,这种情况就不会发生)。
(2)本地排他锁:防止两个任务并行,后面的消息先消费;比如,1号队列现在负载给了消费者A,A拉取一批消息,提交到线程池;接着又拉下一批消息,有提交到线程池;因为线程池中有多个线程,所以有可能下一批消息和上一批消息同时消费,甚至早于后者。
Q:生产端如何让一个订单的消息进入同一个队列?
使用队列选择器,即MessageQueueSelector,可以自己定义,也可以用现成的;
// hashKey为订单号
SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);
Q:消费端如何实现分布式锁?
消费端在拉取消息前,需要先构造拉取请求,即PullRequest对象;rocketmq会在构造这个对象前,向broker请求锁定当前对象中的队列;
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
......
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
} // 锁队列:即向broker发送一条消息,告诉他我要锁哪个队列,broker在自己的记账本上记录一下
}
}
}
Q:如何实现本地锁?
前面我们看到消费者拉取一批消息后会做成一个任务,提交到线程池;rocketmq会在任务中加锁,所以即使多个任务并行,因为大家拿的是同一把锁,所以同一时刻只会有一个任务在跑,其他任务等待;
public void run() {
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); // 每个队列都会有一把锁
synchronized (objLock) {
for (boolean continueConsume = true; continueConsume; ) {
if (!msgs.isEmpty()) {
try {
this.processQueue.getLockConsume().lock(); // 再次加锁,即使外层的对象锁没有锁住,这里的锁也可以保证有序
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
}
catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
finally {
this.processQueue.getLockConsume().unlock();
}
}
}
}
}