rocketmq_顺序消息

2022-05-26  本文已影响0人  kele2018
Q:在rocketmq语境下,如何定义【顺序】这个词?
消费效果应该保持我们业务定义上的一种顺序;比如:一个订单的支付状态:待支付、已支付、已退款。
Q:为了保证这种效果,生产端应该如何做?
  (1)保证消息的发送顺序;即先发待支付,再发已支付、最后再发已退款;
  (2)让一个订单的消息进入同一个队列;比如,待支付消息进入1号队列,已支付消息进入2号队列,已退款消息进入3号队列,那么虽然它们发送的时候有先后顺序,但是到了队列中都是第一条消息,那么假如有三个消费者,很有可能后面发送的先消费;
Q:为了保证这种效果,消费端应该如何做?
  (1)分布式锁:防止一个消费者组下的多个消费者同时消费一个队列;比如,1号队列现在负载给了消费者A,过一会儿之后又负载给了B;(因为大都用同一套负载均衡算法,只要消费者数量不变,这种情况就不会发生)。
  (2)本地排他锁:防止两个任务并行,后面的消息先消费;比如,1号队列现在负载给了消费者A,A拉取一批消息,提交到线程池;接着又拉下一批消息,有提交到线程池;因为线程池中有多个线程,所以有可能下一批消息和上一批消息同时消费,甚至早于后者。
Q:生产端如何让一个订单的消息进入同一个队列?
使用队列选择器,即MessageQueueSelector,可以自己定义,也可以用现成的;
// hashKey为订单号
SendResult sendResult = this.producer.send(rocketMsg, this.messageQueueSelector, hashKey, timeout);
Q:消费端如何实现分布式锁?

消费端在拉取消息前,需要先构造拉取请求,即PullRequest对象;rocketmq会在构造这个对象前,向broker请求锁定当前对象中的队列;

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
       ......
        List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    continue;
                } // 锁队列:即向broker发送一条消息,告诉他我要锁哪个队列,broker在自己的记账本上记录一下
           }    
        } 
    }

Q:如何实现本地锁?

前面我们看到消费者拉取一批消息后会做成一个任务,提交到线程池;rocketmq会在任务中加锁,所以即使多个任务并行,因为大家拿的是同一把锁,所以同一时刻只会有一个任务在跑,其他任务等待;

public void run() {
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); // 每个队列都会有一把锁
            synchronized (objLock) {
                    for (boolean continueConsume = true; continueConsume; ) {                    
                        if (!msgs.isEmpty()) {                
                            try {
                                this.processQueue.getLockConsume().lock(); // 再次加锁,即使外层的对象锁没有锁住,这里的锁也可以保证有序
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } 
                            catch (Throwable e) {
                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue);
                                hasException = true;
                            } 
                            finally {
                                this.processQueue.getLockConsume().unlock();
                            } 
                            
                            
                    } 
                } 
               
        }
}       
上一篇 下一篇

猜你喜欢

热点阅读