产生消息
优点:解耦,削峰、数据分发
缺点:
系统可用性降低(mq高可用)、
系统复杂度提高(消息丢失、消息顺序、保证没有重复消费)、
一致性问题
RabbitMQ:erlang、万级(单机吞吐量)、us级(时效性)、高(主从架构)(可用性)、基于erlang开发,所以并发能力很强,性能及其好,延时很低,管理界面较丰富
RocketMQ:java、10万级、ms级、非常高(分布式架构)、MQ功能比较完备,扩展性佳
Kafka:scala、10万级、ms级以内、非常高(分布式架构)、只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广
notepad安装NPPFTP插件可以直接去修改linux中的文件
进入运行的容器
# 首先使用下面的命令,查看容器ID(CONTAINER ID):
docker ps -a
# 然后用下面的命令进入容器,就可以使用bash命令浏览容器里的文件:
docker exec -it [CONTAINER ID] bash
# 有的镜像没有bash命令,可以用对应的shell,比如sh
docker exec -it [CONTAINER ID] sh
角色介绍
Producer:消息的发送者
Consumer:消息的接收者
Broker:暂存和传输消息,邮局
NameServer:管理Broker,邮局管理机构
Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消费的接收者可以订阅一个或者多个Topic消息
Message Queue:相当于是Topic的分区,用于并行发送和接收消息
集群
Producer、Consumer、NameServer没有数据同步关系,较为简单
Brocker集群读写、主从
Master可以部署多台,一个Master可以对应多个Slavor、但是一个Slavor只能对应一个Master
集群模式
单Master模式:风险较大,本地测试可以
多Master模式:配置简单
多Master多Slave模式(异步):Master宕机,丢失少量消息
多Master多Slave模式(同步):性能较低10%,安全性高使用这个模式
消息发送者步骤分析
- 创建消息生产者Producer,并制定生产者组名
- 指定NameServer
- 启动producer
- 创建消息对象,指定主题Topic,Tag和消息体
- 发送消息
- 关闭生产者producer
消息消费者步骤分析
- 创建消费者Consumer,制定消费者组名
- 自定NameServer地址
- 订阅主题Topic和Tag
- 设置回调参数,处理消息
- 启动消费者consumer
发送同步消息
package com.suntong.myshop.service.rocketmq.provider.tongbu;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* 发送同步消息
*/
public class SyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1. 创建消息生产者Producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2. 指定NameServer
producer.setNamesrvAddr("192.168.79.130:9876");
// 3. 启动producer
producer.start();
// 4. 创建消息对象,指定主题Topic,Tag和消息体
for (int i=0;i<10;i++){
//参数一:消息主题topic
//参数二:消息tag
//参数三:消息内容
Message message = new Message("base","Tag1",("Hello world"+i).getBytes());
// 5. 发送消息
SendResult result = producer.send(message);
//发送状态
SendStatus sendStatus = result.getSendStatus();
//消息Id
String msgId = result.getMsgId();
//消息接受队列Id
int queueId = result.getMessageQueue().getQueueId();
System.out.println("发送状态: "+ result + ", 消息Id" + msgId + ", 队列" + queueId);
//睡一秒再发送下一个
TimeUnit.SECONDS.sleep(1);
}
// 6. 关闭生产者producer
producer.shutdown();
}
}
打印
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FDC640000, offsetMsgId=C0A84F8200002A9F0000000000000000, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=0], 消息IdC0A82B9C26A418B4AAC2793FDC640000, 队列1
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FE0670001, offsetMsgId=C0A84F8200002A9F00000000000000A9, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=0], 消息IdC0A82B9C26A418B4AAC2793FE0670001, 队列2
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FE4520002, offsetMsgId=C0A84F8200002A9F0000000000000152, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=0], 消息IdC0A82B9C26A418B4AAC2793FE4520002, 队列3
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FE83E0003, offsetMsgId=C0A84F8200002A9F00000000000001FB, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=0], queueOffset=0], 消息IdC0A82B9C26A418B4AAC2793FE83E0003, 队列0
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FEC2A0004, offsetMsgId=C0A84F8200002A9F00000000000002A4, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=1], 消息IdC0A82B9C26A418B4AAC2793FEC2A0004, 队列1
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FF0170005, offsetMsgId=C0A84F8200002A9F000000000000034D, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=1], 消息IdC0A82B9C26A418B4AAC2793FF0170005, 队列2
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FF4020006, offsetMsgId=C0A84F8200002A9F00000000000003F6, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=1], 消息IdC0A82B9C26A418B4AAC2793FF4020006, 队列3
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FF7EF0007, offsetMsgId=C0A84F8200002A9F000000000000049F, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=0], queueOffset=1], 消息IdC0A82B9C26A418B4AAC2793FF7EF0007, 队列0
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FFBDD0008, offsetMsgId=C0A84F8200002A9F0000000000000548, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=2], 消息IdC0A82B9C26A418B4AAC2793FFBDD0008, 队列1
发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FFFC90009, offsetMsgId=C0A84F8200002A9F00000000000005F1, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=2], 消息IdC0A82B9C26A418B4AAC2793FFFC90009, 队列2
13:03:58.404 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
13:03:58.409 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true
13:03:58.409 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true
消息来了
异步消息
package com.suntong.myshop.service.rocketmq.provider.yibu;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* 发送异步消息
*/
public class AsyncProducer {
public static void main(String[] args) throws InterruptedException, MQClientException, RemotingException, MQBrokerException {
// 1. 创建消息生产者Producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2. 指定NameServer
producer.setNamesrvAddr("192.168.79.130:9876");
// 3. 启动producer
producer.start();
// 4. 创建消息对象,指定主题Topic,Tag和消息体
for (int i=0;i<10;i++){
//参数一:消息主题topic
//参数二:消息tag
//参数三:消息内容
Message message = new Message("base","Tag2",("Hello world"+i).getBytes());
// 5. 发送异步消息
producer.send(message, new SendCallback(){
/**
* 发送成功回调函数
* @param sendResult
*/
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果: "+ sendResult);
}
/**
* 发送失败回调参数
* @param throwable
*/
@Override
public void onException(Throwable throwable) {
System.out.println("发送异常: "+ throwable);
}
});
//睡一秒再发送下一个
TimeUnit.SECONDS.sleep(1);
}
// 6. 关闭生产者producer
producer.shutdown();
}
}
结果
E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=64399:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.yibu.AsyncProducer
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC2794847D60001, offsetMsgId=C0A84F8200002A9F0000000000000743, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=3]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC2794847D60002, offsetMsgId=C0A84F8200002A9F000000000000069A, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=3]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC2794847D60000, offsetMsgId=C0A84F8200002A9F00000000000007EC, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=4]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC279484AB80003, offsetMsgId=C0A84F8200002A9F0000000000000895, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=2]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC279484EA10004, offsetMsgId=C0A84F8200002A9F000000000000093E, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=4]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC27948528A0005, offsetMsgId=C0A84F8200002A9F00000000000009E7, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=0], queueOffset=2]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC2794856720006, offsetMsgId=C0A84F8200002A9F0000000000000A90, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=3]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC279485A5A0007, offsetMsgId=C0A84F8200002A9F0000000000000B39, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=5]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC279485E420008, offsetMsgId=C0A84F8200002A9F0000000000000BE2, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=0], queueOffset=3]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC27948622B0009, offsetMsgId=C0A84F8200002A9F0000000000000C8B, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=5]
13:13:07.878 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true
13:13:07.883 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true
13:13:07.883 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
消息也来了
单项消息
package com.suntong.myshop.service.rocketmq.provider.danxiang;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
/**
* 发送单向消息
*/
public class OneWayProducer {
public static void main(String[] args) throws RemotingException, MQClientException, InterruptedException {
// 1. 创建消息生产者Producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2. 指定NameServer
producer.setNamesrvAddr("192.168.79.130:9876");
// 3. 启动producer
producer.start();
// 4. 创建消息对象,指定主题Topic,Tag和消息体
for (int i=0;i<10;i++){
//参数一:消息主题topic
//参数二:消息tag
//参数三:消息内容
Message message = new Message("base","Tag3",("Hello world 单项消息 "+i).getBytes());
// 5. 发送单向消息
producer.sendOneway(message);
//睡一秒再发送下一个
TimeUnit.SECONDS.sleep(1);
}
// 6. 关闭生产者producer
producer.shutdown();
}
}
打印
E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=64562:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.danxiang.OneWayProducer
13:25:25.124 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
13:25:25.128 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true
13:25:25.128 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true
没有结果接受,适合发日志啥的
消息也来了