Rocketmq的消息tag适用场景分析

2023-02-19  本文已影响0人  天草二十六_简村人

一、背景

类似于消息的灰度标识,消息本来是所有节点都会消费的,但是打上了灰度标识的消息,只有灰度节点才能消费。这样,就使用tag机制将消息隔离开来。

借鉴这种思路,我们在websocket集群开发过程中,ws消息发送给建立通道的节点,故对消息增加tag标签,仅让建立通道的节点才消费。

二、部署示意图

image.png

老师和学生A连接通道节点1,学生B连接的通道是节点2。
现在,老师需要发送ws消息给A和B,经查找,A和老师在同一个节点,找到A的通道,直接发送ws消息即可。但是,B在其他节点,则需要通过mq消息转发。

mq消息想要只让节点2消费,而不能被节点1等其他节点消费掉。

三、问题描述

下面的场景,都是两个节点订阅的tag不同的前提。

1、消费组相同的情况下,使用集群模式。

老师给学生A和B发送10条消息,A全部能收到;B收到的消息数量不定,总是会丢失消息。(有时4条,有时5条,有时6条)

image.png

2、消费组不同的情况下,使用集群模式。

老师给学生A和B发送10条消息,A和B都全部能收到。

3、消费组相同的情况下,使用广播模式。

messageModel默认是集群模式。

老师给学生A和B发送10条消息,A和B都全部能收到。

image.png
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void send(OutMessage message, String tag) {
        log.info("发送mq消息:{}, tag={}", JSON.toJSONString(message), tag);
        rocketMQTemplate.convertAndSend("test-topic" + ":" + tag, message);
    }
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;

@Component
@Slf4j
@RocketMQMessageListener(topic = "test-topic",
        nameServer = "${rocketmq.nameServer}",
        consumerGroup = "${rocketmq.consumer.group}",
        messageModel = MessageModel.BROADCASTING)
public class MQReceiver implements RocketMQListener<OutMessage> {
    @Override
    public void onMessage(OutMessage outMessage) {
       log.info("收到mq消息:{}", JSON.toJSONString(outMessage));
    }
}

修改注解@RocketMQMessageListener中的属性selectorExpression:

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.Map;

/**
 * 因为RocketMQMessageListener不提供动态配置功能
 * springboot初始化后rocket容器初始化前利用反射动态改变
 * RocketMQMessageListener注解selectorExpression的值
 */
@Component
public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean {

    @Override
    public void afterPropertiesSet() throws Exception {
        RocketMQMessageListener annoTable =
                MQReceiver.class.getAnnotation(RocketMQMessageListener.class);
        // 获取代理处理器
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable);
        // 获取私有 memberValues 属性
        Field f = invocationHandler.getClass().getDeclaredField("memberValues");
        f.setAccessible(true);
        // 获取实例的属性map
        Map<String, Object> memberValues = (Map<String, Object>)
                f.get(invocationHandler);
        // 修改属性值
        memberValues.put("selectorExpression", "abc");
    }

}

四、问一问chatGPT

rocketmq之消费消息.png

五、总结

回到我们的方案选择上,topic和消费组都相同,只tag不一样,要实现预期 的效果,则消息模式要指定为广播模式。

而我们原本的程序设计,也是采用rabbitmq发送广播,让所有的节点去消费一遍。这样,无法减少每个节点处理mq消息的数量,不利于通道节点的水平扩容。

如果同一topic下由多个消费组去消费,也就是说N个节点,就N个消费组。随着节点的水平扩容,消费组的数量也随着增加。消费组的名称可以取机器的Mac地址,保证其唯一性。

对于这个方法,至少损失了服务的无状态性,不是很好理解,感觉是为了做而做,比较鸡肋。

消费组的初衷是让不同的服务在订阅同一个topic而设计。

参考链接

上一篇 下一篇

猜你喜欢

热点阅读