RocketMQ(五)消息类型--普通消息
2020-11-27 本文已影响0人
我犟不过你
Rocketmq支持四种消息类型:普通消息、顺序消息、定时/延时消息、分布式事务消息。
您在调用SDK收发消息时需注意,消息队列RocketMQ版提供的四种消息类型所对应的Topic不能混用,例如,您创建的普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息;同理,事务消息的Topic也只能收发事务消息,不能用于收发其他类型的消息,以此类推。
后面代码都是基于springboot-rocketmq-stater。
一、普通消息
无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。
普通消息又分为同步消息,异步消息,单向消息。
1.1、同步消息
每向rocketmq服务发送一条消息会收到同步响应。
![](https://img.haomeiwen.com/i16830368/fd404a2d8c43d9e3.png)
下面通过代码实现下同步发送:
测试控制器:
@Autowired
private RocketMqProducer rocketMqProducer;
/**
* 普通消息同步发送
*/
@RequestMapping("/send/sync")
public void sendMsg(){
rocketMqProducer.sendSync("普通消息-同步发送","test_sync");
}
生产者:
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息(同步发送)
*/
public void sendSync(String msgBody,String topic) {
for (int i = 0; i < 10; i++) {
try{
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
if (ObjectUtils.isNotEmpty(sendResult)){
//sendResult不空则表示消息发送成功
log.info("send success , send msg = {}, messageId = {}",msgBody, sendResult.getMsgId());
}
}catch (Exception e){
log.info("send failed, msg = {}", e);
}
}
}
结果:
2020-11-27 10:33:24.157 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293D9F001E
2020-11-27 10:33:24.186 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293DBD0020
2020-11-27 10:33:24.215 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293DDA0023
2020-11-27 10:33:24.243 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293DF70026
2020-11-27 10:33:24.272 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E140029
2020-11-27 10:33:24.301 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E30002C
2020-11-27 10:33:24.330 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E4D002F
2020-11-27 10:33:24.359 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E6A0032
2020-11-27 10:33:24.390 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293E880035
2020-11-27 10:33:24.417 INFO 12988 --- [nio-8085-exec-9] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC10020832BC18B4AAC288293EA60038
以上步骤没有消费者,下面演示增加消费者的情况:
消费者代码:
/**
* RocketMqProducer
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_sync", selectorExpression = "*", consumerGroup = "test")
public class SimpleMessageListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("receive sync message:{}", msg);
}
}
启动后会立即消费刚才发送的消息:
2020-11-27 10:39:49.543 INFO 45668 --- [MessageThread_2] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.543 INFO 45668 --- [essageThread_20] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544 INFO 45668 --- [MessageThread_1] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544 INFO 45668 --- [essageThread_16] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544 INFO 45668 --- [essageThread_13] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.543 INFO 45668 --- [MessageThread_4] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544 INFO 45668 --- [MessageThread_6] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.543 INFO 45668 --- [MessageThread_5] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544 INFO 45668 --- [MessageThread_8] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:39:49.544 INFO 45668 --- [essageThread_18] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
再次发送的结果,发送一条消费一条,符合同步响应的模型:
2020-11-27 10:41:23.837 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308F520029
2020-11-27 10:41:23.839 INFO 45668 --- [essageThread_11] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.868 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308F7E002B
2020-11-27 10:41:23.869 INFO 45668 --- [essageThread_16] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.896 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308F9C0031
2020-11-27 10:41:23.897 INFO 45668 --- [MessageThread_9] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.925 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308FB80037
2020-11-27 10:41:23.926 INFO 45668 --- [MessageThread_5] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.954 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308FD6003E
2020-11-27 10:41:23.954 INFO 45668 --- [essageThread_10] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:23.985 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288308FF20043
2020-11-27 10:41:23.986 INFO 45668 --- [essageThread_17] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:24.015 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC2883090120049
2020-11-27 10:41:24.016 INFO 45668 --- [MessageThread_4] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:24.043 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC28830902F004F
2020-11-27 10:41:24.044 INFO 45668 --- [essageThread_18] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:24.070 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC28830904B0055
2020-11-27 10:41:24.072 INFO 45668 --- [MessageThread_6] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
2020-11-27 10:41:24.099 INFO 45668 --- [nio-8085-exec-1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-同步发送, messageId = AC100208B26418B4AAC288309067005B
2020-11-27 10:41:24.100 INFO 45668 --- [MessageThread_2] c.c.b.m.r.c.SimpleMessageListener : receive sync message:普通消息-同步发送
1.2、异步消息
![](https://img.haomeiwen.com/i16830368/e76f929eb9dcd25c.png)
测试控制器:
/**
* 普通消息异步步发送
*/
@RequestMapping("/send/async")
public void sendAsync(){
rocketMqProducer.sendAsync("普通消息-异步发送","test_async");
}
生产者:
/**
* 发送普通消息(异步发送)
*/
public void sendAsync(String msgBody, String topic) {
for (int i = 0; i < 10; i++) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("send success , send msg = {}, messageId = {}", msgBody, sendResult.getMsgId());
}
@Override
public void onException(Throwable throwable) {
log.info("send failed, msg = {}", throwable);
}
});
}
}
消费者:
/**
* RocketMqProducer
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_async", selectorExpression = "*", consumerGroup = "test_async")
public class SimpleAsyncMessageListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("receive async message:{}", msg);
}
}
结果,没有像同步一样发送一条消费一条:
2020-11-27 11:09:00.483 INFO 25416 --- [ublicExecutor_7] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00005
2020-11-27 11:09:00.483 INFO 25416 --- [ublicExecutor_8] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00001
2020-11-27 11:09:00.483 INFO 25416 --- [ublicExecutor_4] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00002
2020-11-27 11:09:00.483 INFO 25416 --- [ublicExecutor_6] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00006
2020-11-27 11:09:00.483 INFO 25416 --- [ublicExecutor_5] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00004
2020-11-27 11:09:00.483 INFO 25416 --- [ublicExecutor_1] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00000
2020-11-27 11:09:00.483 INFO 25416 --- [ublicExecutor_8] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00003
2020-11-27 11:09:00.490 INFO 25416 --- [MessageThread_6] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490 INFO 25416 --- [MessageThread_4] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490 INFO 25416 --- [MessageThread_5] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490 INFO 25416 --- [MessageThread_1] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490 INFO 25416 --- [MessageThread_2] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.490 INFO 25416 --- [MessageThread_3] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.509 INFO 25416 --- [ublicExecutor_6] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D6A00010
2020-11-27 11:09:00.509 INFO 25416 --- [ublicExecutor_4] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D3C00007
2020-11-27 11:09:00.509 INFO 25416 --- [ublicExecutor_5] c.c.b.m.r.producer.RocketMqProducer : send success , send msg = 普通消息-异步发送, messageId = AC100208634818B4AAC28849D6A00011
2020-11-27 11:09:00.510 INFO 25416 --- [MessageThread_7] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.510 INFO 25416 --- [MessageThread_8] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.510 INFO 25416 --- [MessageThread_9] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
2020-11-27 11:09:00.519 INFO 25416 --- [essageThread_10] c.c.b.m.r.c.SimpleAsyncMessageListener : receive async message:普通消息-异步发送
1.3、单向发送
![](https://img.haomeiwen.com/i16830368/7bda5df7c0bf9052.png)
控制器:
/**
* 普通消息单向发送
*/
@RequestMapping("/send/oneway")
public void sendOneway(){
rocketMqProducer.sendOneWay("普通消息-单向发送","test_oneway");
}
生产者:
/**
* 发送普通消息(单向发送)
*/
public void sendOneWay(String msgBody,String topic) {
for (int i = 0; i < 10; i++) {
//没有返回值和回调方法
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
}
}
消费者
/**
* RocketMqProducer
* @date: 2020/11/26
* @author weirx
* @version 3.0
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "test_oneway", selectorExpression = "*", consumerGroup = "test_oneway")
public class SimpleOnewayMessageListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody();
String msg = new String(body);
log.info("receive async message:{}", msg);
}
}
消费者接收结果:
2020-11-27 11:16:35.700 INFO 14364 --- [MessageThread_5] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700 INFO 14364 --- [MessageThread_3] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700 INFO 14364 --- [MessageThread_4] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700 INFO 14364 --- [MessageThread_1] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700 INFO 14364 --- [MessageThread_6] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.700 INFO 14364 --- [MessageThread_2] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.729 INFO 14364 --- [MessageThread_7] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.729 INFO 14364 --- [MessageThread_9] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.730 INFO 14364 --- [MessageThread_8] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送
2020-11-27 11:16:35.731 INFO 14364 --- [essageThread_10] c.c.b.m.r.c.SimpleOnewayMessageListener : receive async message:普通消息-单向发送