Java-分布式框架-rocketmq

2021-07-26  本文已影响0人  蓝色_笔记本

一、消息中间件对比

kafka RocketMQ RabbitMQ
定位 设计定位 系统间的数据流管道,实时数据处理。例如常规的消息系统、监控数据、日志收集 可靠的消息传输,例如消息推送 可靠的消息传输,与RocketMQ类似。
开发语言 Scala Java Erlang
客户端语言 Java,Python,C Java Java,Python,C
注册中心 Zookeeper namespace
选举方式 自动选举 不支持自动选举
数据可靠性 很好。支持同步刷盘,同步复制,但性能差。 很好,支持同、异步刷盘,同步双写,异步复制
消息写入性能 非常好,每条10个字符测试:百万条/s,Topic数量60个左右后性能会下降 很好,每条10个字符测试:单机单broker 7w/s,单机3broker 12w/s,Topic数量支持5W条左右 好,2W/s左右
性能稳定性 队列、分区多的时候性能不稳定,明显下降,消息堆积时性能稳定 队列多的时候,消息堆积时性能稳定 消息堆积时性能不稳定
消息堆积能力 非常好 非常好 一般
消息获取 pull pull,push pull,push
顺序消费 支持 支持,局部有序 支持
定时消息 支持不好 支持,开源只支持指定级别的延迟 支持不好
事务消息 不支持 支持 不支持
消息查询 不支持 支持 不支持

二、RocketMQ架构分布图

image.png

Apache RocketMQ是一个分布式消息传递和流媒体平台,具有低延迟,高性能和可靠性, 万亿级容量和灵活的可伸缩性。 它由四个部分组成:nameserver,broker,生产者和使用者。 它们中的每一个都可以水平扩展,而没有单个故障点。

三、RocketMQ环境

环境变量
#java环境
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
#rocketmq环境
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq‐all‐4.1.0‐incubating
export PATH=$ROCKETMQ_HOME/bin:$PATH
broker配置
#rocketmq‐name服务地址,多个地址用;分开,不配置默认为localhost:9876 
namesrvAddr = 192.168.241.198:9876 
brokerClusterName = DefaultCluster 
brokerName = broker‐a 
brokerId = 0 
deleteWhen = 04 
fileReservedTime = 48 
#主从角色SYNC_MASTER,ASYNC_MASTER,SLAVE 
brokerRole = SYNC_MASTER 
flushDiskType = ASYNC_FLUSH 
#允许自动创建主题topic 
autoCreateTopicEnable=true 
#broker监听端口 
listenPort=10911 
#数据存储位置 
storePathRootDir=/root/rocketmq/store
内存的设置

rocketmq集群内存的设置是针对注册中心namesrv与broker内存的设置,分别设置rocketmq bin目录下的runserver.sh与runbroker.sh(或者runserver.cmd与runbroker.cmd)。

JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSi ze=64m ‐XX:MaxMetaspaceSize=128m"
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSi ze=64m ‐XX:MaxMetaspaceSize=128m"
单机运行
#启动注册中心
nohup sh bin/mqnamesrv ‐n 192.168.241.198:9876
#启动broker
nohup sh bin/mqbroker ‐n 192.168.241.198:9876 ‐c conf/broker.conf  &

注意:启动注册中心或者broker的时候最好指定一下IP,防止在多网卡或者dockers的环境下,IP使用错误。

多机集群部署

在主目录下的conf文件夹下提供了多种broker配置模式,分别有:2m-2s-async,2m-2s- sync,2m-noslave。若目前2台机器,分别部署1个 NameServer,同时分别部署一个Master和一个Slave,互为主备。

  1. master
#broker节点注册到多个注册中心
namesrvAddr = 192.168.241.198:9876;192.168.241.199:9876
#主节点
brokerId = 0
#SYNC_MASTER或者ASYNC_MASTER
brokerRole = SYNC_MASTER
  1. slave
#broker节点注册到多个注册中心
namesrvAddr = 192.168.241.198:9876;192.168.241.199:9876
#非0表示从节点唯一标志
brokerId = 1
#表明从节点
brokerRole = SLAVE
环境验证
sh mqadmin clusterlist ‐n 192.168.241.198:9876;192.168.241.199:9876
export NAMESRV_ADDR=192.168.241.198:9876;192.168.241.199:9876 
测试发送端 
    > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 
测试消费端 
    > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
rocketmq console

进入rocketmq-externals项目的GitHub地址,如下图,可看到RocketMQ项目的诸多扩展项目,其中就包含我们需要下载的rocketmq-console。
rocketmq-console是一个springboot项目,跑之前修改下配置。

四、基本概念

消息模型

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责 生产消息,Consumer 负责消费消息,Broker 负责存储消息。

消息对象
主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

代理服务器(Broker Server)

消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收 从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相 关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

注册中心服务(Name Server)

注册中心服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

客户端消费
消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。 系统提供了通过Message ID和Key查询消息的功能。

顺序消息
标签(Tag)

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息, 可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连 贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消 费逻辑,实现更好的扩展性。

image.png

注意1:每个broker中都会有一个commitlog,由于记录生产者发送的消息。
注意2:每个broker中有多个Topic,每个Topic中默认有4个queue队列,每个queue对应一个持久化文件。
注意3:每个broker中会对应一个consumerOffset.json文件,用于记录队列消费的节点到哪了。
注意4:consumer、producer与broker间的通信基于Netty来实现的,默认为Netty中的epoll模式,若系统不支持epoll模式,才使用nio模式。
注意5:producer在发送消息的时候,会以轮循的方式放置于队列中(比如图上broker-master-1与broker-master-2共8个),若有顺序消息的话,会保证所有顺序消息放在同一个队列中。

没开始使用的broker内部的文件。


image.png

已经使用的broker内部的文件。


image.png

config内部结构


image.png

store:存储commitlog文件,每个broker对应一个commitlog,commitlog中存储的是topic真正的内容数据。
index:索引。
consumequeue:存储每个主题下的队列,默认每个主题4个队列,这边存储的主要是消息的tag、消息对应在commitlog的地址、空间大小等,。
topic.json: 存储所有topic的信息,主要为topic的属性信息。
consumerOffset.json:消费者偏移量信息,对应了每个主题@每个消费群组{队列1:偏移量,队列2:偏移量,队列3:偏移量,队列4:偏移量}

名称 作用
broker broker模块:c和p端消息存储逻辑
client 客户端api:produce、consumer端 接受与发送api
common 公共组件:常量、基类、数据结构
tools 运维tools:命令行工具模块
store 存储模块:消息、索引、commitlog存储
namesrv 服务管理模块:服务注册topic等信息存储
remoting 远程通讯模块:netty+fastjson
logappender 日志适配模块
example Demo列子
filtersrv 消息过滤器模块
srvutil 辅助模块
filter 过滤模块:消息过滤模块
distribution 部署、运维相关zip包中的代码
openmessaging 兼容openmessaging分布式消息模块

五、使用

1. 同步、异步、一次性

生产者

同步

public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("tl_msg_student_group");

        producer.setNamesrvAddr("192.168.241.198:9876");
        //producer.setSendMsgTimeout(10000);

        producer.start();
        Message msg = new Message("TopicStudent"  ,
                "TagStudent"  ,
                 "tag" ,
                ("Hello tuling RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    }

异步

public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");
        producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
        producer.start();

        //设置发送失败重试机制
        producer.setRetryTimesWhenSendAsyncFailed(5);

        int messageCount = 1;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            final int index = i;
            Message msg = new Message("TopicTest",
                    "TagSendOne",
                    "OrderID188",
                    "I m sending msg content is yangguo".getBytes(RemotingHelper.DEFAULT_CHARSET));
            //消息发送成功后,执行回调函数
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    countDownLatch.countDown();
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    countDownLatch.countDown();
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        //防止回调未回,producer就已经删除
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }

一次性

DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.setSendMsgTimeout(10000);
producer.start();
for (int i = 0; i < 1; i++) {
    Message msg = new Message("TopicTest" /* Topic */,
                    "TagSendOne" /* Tag */,
                    "OrderID198",
                    ("Hello RocketMQ test i " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
消费者
// tl_msg_student_group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tl_student_group");
// ;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicStudent", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs){
                System.out.println(new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

2. 广播消息

生产者
DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group");
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();

for (int i = 0; i < 4; i++){
    Message msg = new Message("TopicTest",
            "TagA",
            "OrderID188",
            ("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg);
    System.out.printf("%s%n", sendResult);
}

producer.shutdown();
消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//广播,全量消费
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        for (MessageExt ext : msgs){
            System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n");
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.start();
System.out.printf("Broadcast Consumer Started.%n");

3. 批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应 超过4MB。rocketmq建议每次批量消息大小大概在1MB。 当消息大小超过4MB时,需要将消息进行分割。

生产者
public class ListSplitter implements Iterator<List<Message>> {

    private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MB
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        //遍历消息准备拆分
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; //for log overhead
            if (tmpSize > SIZE_LIMIT) {

                if (nextIndex - currIndex == 0) {
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > SIZE_LIMIT) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List<Message> subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }
}
/**
 * rocketMq 支持消息批量发送
 * 同一批次的消息应具有:相同的主题,相同的waitStoreMsgOK,并且不支持定时任务。
 * <strong> 同一批次消息建议大小不超过~1M </strong>,消息最大不能超过4M,需要
 * 对msg进行拆分
 */
DefaultMQProducer producer = new DefaultMQProducer("batch_group");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));

ListSplitter splitter = new ListSplitter(messages);

/**
 * 对批量消息进行拆分
 */
while (splitter.hasNext()) {
    try {
        List<Message>  listItem = splitter.next();
        producer.send(listItem);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

producer.shutdown();
消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");

// ;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876");

consumer.subscribe("BatchTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs){
            System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.start();

System.out.printf("Consumer Started.%n");

4. 过滤消息

RocketMq在消息过滤这块做得很强大,它可以通过Tag过滤消息,可以通过SQL表达式筛选消息,它也可以支持java脚本过滤。

其中通过SQL表达式筛选 和 java脚本过滤 需要在broker的配置文件中把对应的配置打开。

enablePropertyFilter=true

Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以说是二级分类,关系如图所示。


image.png
生产者
/***
 * TAG-FILTER-1000 ---> 布隆过滤器
 * 过滤掉的那些消息。直接就跳过了么。下次就不会继续过滤这些了。是么。
 * @param args
 * @throws Exception
 */
public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
    producer.setNamesrvAddr("192.168.241.198:9876");
    producer.start();

    for (int i = 0; i < 3; i++) {
        Message msg = new Message("TopicFilter",
                "TAG-FILTER",
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        msg.putUserProperty("a",String.valueOf(i));
        if(i % 2 == 0){
            msg.putUserProperty("b","yangguo");
        }else{
            msg.putUserProperty("b","xiaolong girl");
        }
        producer.send(msg);
    }

    producer.shutdown();
}
消费者
public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
    /**
     * 注册中心
     */
    consumer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
    /**
     * 订阅主题
     * 一种资源去换取另外一种资源
     */
    consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'yangguo'"));
    /**
     * 注册监听器,监听主题消息
     */
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs){
                try {
                    System.out.println("consumeThread=" + Thread.currentThread().getName()
                            + ", queueId=" + msg.getQueueId() + ", content:"
                            + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();

    System.out.printf("Filter Consumer Started.%n");
}

5. 延迟消息

定时消息是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点 或者等待特定的时间后才能被消费。
使用场景:如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的 状态,如果还是未付款就取消订单释放库存。
当前支持的延迟时间

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 
分别对应级别
1 2 3....................
生产者
public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");
    //;192.168.241.199:9876
    producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
    producer.start();
    int totalMessagesToSend = 3;
    for (int i = 0; i < totalMessagesToSend; i++) {
        Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
        //延时消费
        message.setDelayTimeLevel(6);
        // Send the message
        producer.send(message);
    }

    System.out.printf("message send is completed .%n");
    producer.shutdown();
}
消费者
public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
    //;192.168.241.199:9876
    consumer.setNamesrvAddr("192.168.241.198:9876");
    consumer.subscribe("TestTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
            for (MessageExt message : messages) {
                // Print approximate delay time period
                System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                        + "message content is :" + new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    //System.out.printf("Consumer Started.%n");
}

注意1:延迟消息发送到broker的时候,broker会专门新建一个中转主题SCHEDULED_TOPIC_XXXX来存放消息,目前开原版只支持18个级别,相当于中转主题下存在18个队列文件分别存储这18个级别。同时broker后台开启个线程,只要延迟消息的时间到了,才会把延迟消息放置于真正的topic下。

注意2:开源版下的延迟消息并不适合高并发的延迟消息,若业务存在高并发的延迟消息,需要考虑使用商业版的RocketMQ。

注意3:客户端集群消息的消费来源于pullRequestQueue,pullRequestQueue中的消息来源在于客户端中存在一个线程从broker中主动pull。

注意4:客户端从namesrv同步信息周期30s,客户端与broker心跳周期30s,客户端心跳消费偏移量同步周期5s。

注意5:客户端执行失败的消息,客户端会发回到broker中,broker端会新建一个RETRY_TOPIC_XXXX来存储,大概10S后会再次发给客户端消费,默认16次。

6. 顺序消息

生产者
public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");
    producer.setNamesrvAddr("192.168.241.198:9876");
    producer.start();
    String[] tags = new String[]{"TagA", "TagC", "TagD"};

    // 订单列表
    List<OrderStep> orderList = buildOrders();

    Date date = new Date();
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String dateStr = sdf.format(date);
    for (int i = 0; i < 10; i++) {
        // 加个时间前缀
        String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);
        Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                body.getBytes(RemotingHelper.DEFAULT_CHARSET));

        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Long id = (Long) arg;  //根据订单id选择发送queue
                long index = id % mqs.size();
                return mqs.get((int) index);
            }
        }, orderList.get(i).getOrderId());//订单id

        System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                sendResult.getSendStatus(),
                sendResult.getMessageQueue().getQueueId(),
                body));
    }
    producer.shutdown();
}
消费者
public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");
    consumer.setNamesrvAddr("192.168.241.198:9876");
    /**
     * 设置消费位置
     */
   consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener(new MessageListenerOrderly() {
        AtomicLong consumeTimes = new AtomicLong(0);
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {
            context.setAutoCommit(true);
            for (MessageExt msg : msgs) {
                // 可以看到每个queue有唯一的consume来消费, 订单对每个queue(分区)有序
                try {
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

7. 事务消息

半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记。半事务消息会单独存储在HALF_TOPIC中。

消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确 认丢失,消息队列 MQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要 主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即 消息回查。

注意:事务消息中的实现在于product端与broker端是双向通信的,互为客户端和服务端

image.png
生产者
private void testTransaction() throws MessagingException {
    String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 10; i++) {
        try {

            Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
                    setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build();
            /**
             * TX_PGROUP_NAME 必须同 {@link TransactionListenerImpl} 类的注解 txProducerGroup
             * @RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup")
             */
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
                    springTransTopic + ":" + tags[i % tags.length], msg, null);
            System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
                    msg.getPayload(), sendResult.getSendStatus());

            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
监听
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String transId = (String)msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
        System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
                transId);
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(transId, status);
        if (status == 0) {
            // 事务提交
            System.out.printf("    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
            return RocketMQLocalTransactionState.COMMIT;
        }

        if (status == 1) {
            // 本地事务回滚
            System.out.printf("    # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        // 事务状态不确定,待Broker发起 ASK 回查本地事务状态
        System.out.printf("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
        return RocketMQLocalTransactionState.UNKNOWN;
    }

    /**
     * 在{@link TransactionListenerImpl#executeLocalTransaction(org.springframework.messaging.Message, java.lang.Object)}
     * 中执行本地事务时可能失败,或者异步提交,导致事务状态暂时不能确定,broker在一定时间后
     * 将会发起重试,broker会向producer-group发起ask回查,
     * 这里producer->相当于server端,broker相当于client端,所以由此可以看出broker&producer-group是
     * 双向通信的。
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String transId = (String)msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
        RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
        Integer status = localTrans.get(transId);
        if (null != status) {
            switch (status) {
                case 0:
                    retState = RocketMQLocalTransactionState.UNKNOWN;
                    break;
                case 1:
                    retState = RocketMQLocalTransactionState.COMMIT;
                    break;
                case 2:
                    retState = RocketMQLocalTransactionState.ROLLBACK;
                break;
            }
        }
        System.out.printf("------ !!! checkLocalTransaction is executed once," +
                        " msgTransactionId=%s, TransactionState=%s status=%s %n",
                transId, retState, status);
        return retState;
    }
}

七、消息存储整体架构

image.png

消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容, 消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始 偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一 个文件;

消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据 ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset, 消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的 commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层 组织结构,具体存储路径为: $HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文 件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节 的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访 问每一个条目,每个ConsumeQueue文件大小约5.72M;

IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方 法。Index文件的存储位置是:HOME \store\index{fileName},文件名fileName是以 创建时的时间戳命名的,固定的单个IndexFile文件大小:40+500Wx4+2000Wx20= 420000040个字节大小,约为400M,一个IndexFile可以保存 2000W个索引,IndexFile 的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现 为hash索引。

零拷贝刷盘

以文件下载为例,服务端的主要任务是:将服务端主机磁盘中的文件不做修改地从已连接的socket发出去。操作系统底层I/O过程如下图所示:


image.png

过程共产生了四次数据拷贝,在此过程中,我们没有对文件内容做任何修改,那么在内核空 间和用户空间来回拷贝数据无疑就是一种浪费,而零拷贝主要就是为了解决这种低效性。

什么是零拷贝技术?

零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,主要就是利用各种零拷贝技术,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件 来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务。这样就可以让系统资源的利用更加有效。

原理是磁盘上的数据会通过DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲 区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用 write(),操作系统直接将内核缓冲区的内容拷贝到socket缓冲区中,这一切都发生在内核 态,最后,socket缓冲区再把数据发到网卡去。

image.png

注意:连续的磁盘空间才不用经过用户空间的整合,而直接实现页缓存与socket缓冲区的共享,从而减少了内核空间到用户空间状态的转换,并且减少了2次内核空间与用户空间复制操作,进而提高了整个系统的性能。这就是rocketmq开辟磁盘空间的时候为什么选择直接开启足够大的磁盘空间文件进行存储消息的原因(CommitLog IndexFile)。

上一篇 下一篇

猜你喜欢

热点阅读