RocketMQ 与 Spring Boot整合(五、广播消费)
在上述的示例中,我们看到的都是使用集群消费。而在一些场景下,我们需要使用广播消费。
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。
下面,我们开始本小节的示例。
5.1 Demo05Message
package com.ebadagang.springboot.rocketmq.message;
/**
* 示例 05 的 Message 消息
*/
public class Demo05Message {
public static final String TOPIC = "DEMO_05";
/**
* 编号
*/
private Integer id;
public Demo05Message setId(Integer id) {
this.id = id;
return this;
}
public Integer getId() {
return id;
}
@Override
public String toString() {
return "Demo05Message{" +
"id=" + id +
'}';
}
}
5.2 Demo05Producer
创建 [Demo04Producer]类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现同步发送消息。代码如下:
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.message.Demo05Message;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Demo05Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) {
// 创建 Demo05Message 消息
Demo05Message message = new Demo05Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSend(Demo05Message.TOPIC, message);
}
}
5.3 Demo05Consumer
创建 [Demo05Consumer]类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
package com.ebadagang.springboot.rocketmq.consumer;
import com.ebadagang.springboot.rocketmq.message.Demo05Message;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = Demo05Message.TOPIC,
consumerGroup = "demo05-consumer-group-" + Demo05Message.TOPIC,
messageModel = MessageModel.BROADCASTING // 设置为广播消费
)
public class Demo05Consumer implements RocketMQListener<Demo05Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo05Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 主要是 @RocketMQMessageListener 注解,通过设置了 messageModel = MessageModel.BROADCASTING ,表示使用广播消费。
5.4 简单测试
创建 [Demo05ProducerTest]测试类,用于测试广播消费。代码如下:
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo05ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo05Producer producer;
@Test
public void test() throws InterruptedException {
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
5.4.1 首先
执行#test()
测试方法,先启动一个消费者分组 "demo05-consumer-group-DEMO_05" 的 Consumer 节点。
5.4.2 然后
执行#testSyncSend()
测试方法,先启动一个消费者分组 "demo05-consumer-group-DEMO_05"
的Consumer
节点。同时,该测试方法,调用 Demo05ProducerTest#syncSend(id)
方法,同步发送了一条消息。控制台输出如下:
5.4.3 #testSyncSend() 方法对应的控制台
# Producer 同步发送消息成功
2020-08-04 21:56:34.739 INFO 10824 --- [ main] c.e.s.r.producer.Demo05ProducerTest : [testSyncSend][发送编号:[1596549394] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A2A4818B4AAC2142870A50000, offsetMsgId=6585E30D00002A9F0000000000039CC7, messageQueue=MessageQueue [topic=DEMO_05, brokerName=broker-a, queueId=0], queueOffset=0]]]
# Demo05Consumer 消费了该消息
2020-08-04 21:56:34.771 INFO 10824 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo05Consumer : [onMessage][线程编号:174 消息内容:Demo05Message{id=1596549394}]
5.4.4 #test() 方法对应的控制台
# 另外一个 Demo05Consumer 也消费了该消息
2020-08-04 21:56:34.755 INFO 15504 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo05Consumer : [onMessage][线程编号:184 消息内容:Demo05Message{id=1596549394}]
消费者分组 "demo05-consumer-group-DEMO_05" 的两个 Consumer 节点,都消费了这条发送的消息。符合广播消费的预期。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。