浅析rocketmq顺序消息

2022-02-16  本文已影响0人  秃秃少年小猪

背景

在一个支付场景中,需要先支开通会员,才可以短信收到开通会员对应的权益,假设开通会员的短信是在获取权益之前的,大概的流程图如下

image-20220216210723046

一但会员功能开通,为了降低系统的耦合性(高内聚,低耦合),系统可能会立马发出两个消息,一个是通知用户开通(A),第二个是通知权益到账(B),再由下游MSG系统来负责对接对接对应的渠道商。在这过程中,可能下游系统第二个比第一个消费的快,就导致权益到账先于会员开通提醒。那有没有什么好的办法来解决呢?

解决办法

问题查找

产生以上问题的原因大概率的是这两条消息不在同一个队列中,导致了并排消费,切B早于A

利用rocketmq的顺序消息

在rmq中,有一个顺序消息的概念,可以保证让发送的消息发送到一个消息队列中去(MessageQueue)

实现原理

image-20220216211622252

在rmq中的实现

生产者api

 SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

在我们发送一条消息的时候,我们可以指定一个 MessageQueueSelector (队列选择器),来指明消息需要发送到哪个队列中去

参数解释

参数名称 解释 备注
msg 消息内容
selector 队列路由规则
arg 规则参数 一般是一个不变的key

示例代码

  SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
            }, orderId); //orderId 为定值

MessageQueueSelector代码解析

1.只需要实现 MessageQueueSelector 这个接口就好了,系统也提供了一个默认的实现

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

2.默认的实现

image-20220216212621579
实现名称 说明 备注
哈希 通过hash算法路由
机房 通过机房路由
随机 随机路由

几种实现

哈希
public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode() % mqs.size();
        if (value < 0) {
            value = Math.abs(value);
        }
        return mqs.get(value);
    }
}
机房
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set<String> consumeridcs;

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return null;
    }

    public Set<String> getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set<String> consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}
随机
public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = random.nextInt(mqs.size());
        return mqs.get(value);
    }
}

总结

rmq通过对指定的key进行规则路由,然后选取一个指定队列,把需要发送的消息发送到同一个队列中去,根据队列的FIFO特性,做到顺序消费。

上一篇下一篇

猜你喜欢

热点阅读