RocketMQ 消息的类型

2019-02-07  本文已影响56人  黄靠谱

参考资料

阿里云官方英文、最新的Demo和Guidence
http://rocketmq.apache.org/docs/transaction-example/

阿里云的帮助文档啊,超级详细而且有Demo
https://help.aliyun.com/document_detail/29551.html

阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
https://github.com/AliwareMQ/mq-demo

消息的类型

http://rocketmq.apache.org/docs/simple-example/

按照发送的特点分:
https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.2.13.496e2379jnaAlt

  1. 同步消息
  2. 异步消息
  3. 单向消息

按照使用功能特点分:

  1. 普通消息(订阅)
  2. 顺序消息
  3. 广播消息
  4. 延时消息
  5. 批量消息
  6. 事务消息

同步发送(可靠)

  1. 同步发送,线程阻塞,投递completes阻塞结束
  2. 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
  3. 投递completes不代表投递成功,要check SendResult.sendStatus来判断是否投递成功
//Send message in synchronous mode. This method returns only when the sending procedure totally completes.
SendResult sendResult = producer.send(msg);
  1. SendResult里面有发送状态的枚举:SendStatus,同步的消息投递有一个状态返回值的
public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}
  1. retry的实现原理:只有ack的SendStatus=SEND_OK才会停止retry
 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  for (; times < timesTotal; times++) {
        trySend();
        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
        switch (communicationMode) {
                case ASYNC:
                    return null;
                case ONEWAY:
                    return null;
                case SYNC:
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                            continue;
                        }
                    }
                    return sendResult;
                default:
                    break;
            }
    }

注意事项:发送同步消息且Ack为SEND_OK,只代表该消息成功的写入了MQ当中,并不代表该消息成功的被Consumer消费了

异步发送(可靠)

  1. 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开了
  2. 异步消息不retry,投递失败回调onException()方法,只有同步消息才会retry,源码参考 DefaultMQProducerImpl.class
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  1. 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
            System.out.printf("%-10d Exception %s %n", index, e);
            e.printStackTrace();
        }
    });               

Oneway(不可靠,类似于UPD)

  1. 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
  2. 此方式发送消息的过程耗时非常短,一般在微秒级别
    for (int i = 0; i < 50; i++) {
         Message msg = new Message("TopicTest2" ,"TagA" ,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         producer.sendOneway(msg);
    }

延迟发送

艿艿的博客
https://blog.csdn.net/github_38592071/article/details/72230984
延迟的机制是在 服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送
服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s

 msg.setDelayTimeLevel(2);
 SendResult sendResult = producer.send(msg);

实现原理:

  1. 发送消息的时候如果消息设置了DelayTimeLevel,那么该消息会被丢到ScheduleMessageService.SCHEDULE_TOPIC这个Topic里面
  2. 根据DelayTimeLevel选择对应的queue
  3. 再把真实的topic和queue信息封装起来,set到msg里面
  4. 然后每个SCHEDULE_TOPIC_XXXX的每个DelayTimeLevelQueue,有定时任务去刷新,是否有待投递的消息
  5. 每 10s 定时持久化发送进度

源码参考 :rocketmq-store.4.4.0.jar
CommitLog.putMessage()

  1. 存储延迟消息
if (msg.getDelayTimeLevel() > 0) {
    topic = ScheduleMessageService.SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    // Backup real topic, queueId
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
 }

result = mappedFile.appendMessage(msg, this.appendMessageCallback);
  1. MQ对SCHEDULE_TOPIC_XXXX 每条消费队列对应单独一个定时任务进行轮询,发送 到达投递时间【计划消费时间】 的消息。
    ScheduleMessageService.java类里面有个内部类 DeliverDelayedMessageTimerTask,定时执行check待执行消息的功能,每个类有delayLevel、offset2个变量
  class DeliverDelayedMessageTimerTask extends TimerTask {
        private final int delayLevel;
        private final long offset;

        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
            this.delayLevel = delayLevel;
            this.offset = offset;
        }

        @Override
        public void run() {
                this.executeOnTimeup();
        }
}

批量发送

    String topic = "TopicTest";
    List<Message> messages = new ArrayList<>();
    messages.add(new Message(topic, "TagA", "Order1", "Hello world 0".getBytes()));
    messages.add(new Message(topic, "TagA", "Order2", "Hello world 1".getBytes()));
    messages.add(new Message(topic, "TagA", "Order3", "Hello world 2".getBytes()));
    producer.send(messages);

广播、集群订阅(只需要对接收端做控制即可)

通过在Consumer做配置,实现广播消息,或者集群订阅消息(默认)

consumer.setMessageModel(MessageModel.CLUSTERING); //默认
consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
上一篇下一篇

猜你喜欢

热点阅读