MQ

RocketMQ 消息发送方式 -- 同步(reliable s

2018-07-05  本文已影响24人  又语

本文展示 RocketMQ 同步发送消息的 Java 代码示例。

所有示例使用 Maven 工程构建,需要添加 rocketmq-client 依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.2.0</version>
</dependency>

Java 代码

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class SyncProducer {
    
    public static void main(String[] args)
        throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException,
        MQBrokerException {
        // 初始化消息生产者,需要指定消息生产者组名称
        DefaultMQProducer producer = new DefaultMQProducer("synchronous-group");
        // 显示设置 NameServer 的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动消息生产者实例
        producer.start();
        // 循环发送消息10次
        for (int i = 0; i < 10; i++) {
            // 创建消息实例,指定 Topic 、 Tag 和消息体
            Message msg = new Message("TopicDemo", "TagSynchronous", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 调用消息生产者实例发送消息给消息代理
            SendResult sendResult = producer.send(msg);
            // 打印消息发送结果
            System.out.printf("%s%n", sendResult);
        }
        // 关闭不再使用消息生产者
        producer.shutdown();
    }
}

运行代码前先在 RocketMQ Console 上查看一下 Topic 列表,没有名称为 TopicDemo 的 Topic。

运行代码,日志打印如下:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD74B0000, offsetMsgId=0A00000700002A9F000000000002C5BA, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7680001, offsetMsgId=0A00000700002A9F000000000002C675, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD76B0002, offsetMsgId=0A00000700002A9F000000000002C730, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7730003, offsetMsgId=0A00000700002A9F000000000002C7EB, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7760004, offsetMsgId=0A00000700002A9F000000000002C8A6, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7790005, offsetMsgId=0A00000700002A9F000000000002C961, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD77D0006, offsetMsgId=0A00000700002A9F000000000002CA1C, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7800007, offsetMsgId=0A00000700002A9F000000000002CAD7, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7830008, offsetMsgId=0A00000700002A9F000000000002CB92, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7870009, offsetMsgId=0A00000700002A9F000000000002CC4D, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=2]

刷新 RocketMQ Console 的 Topic 列表,可以看到刚刚发送的消息

点击 STATUS 可以查看当前消息状态


测试环境使用单主(Master)部署,只启动了一个 NameServer 和一个 Broker,从 TopicDemo STATUS 中可以看到,一个 Topic 会被分为一个或多个 Message Queue,此处便分为了 4 个 Message Queue

上一篇:RocketMQ Console 的安装及运行
下一篇:

上一篇 下一篇

猜你喜欢

热点阅读