Spring Cloud Stream 集成 RocketMQ
Spring Cloud Stream 是什么?
它是什么
Spring Cloud Stream 是一个构建高度可扩展的事件驱动微服务的框架,与共享消息系统相连。
该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的 Spring
用法和最佳实践之上,包括支持持久化的 pub/sub
语义、消费者组和有状态分区。
绑定的一些实现
Spring Cloud Stream支持多种绑定实现,下表包括了GitHub项目的链接。
- RabbitMQ
- Apache Kafka
- Kafka Streams
- Amazon Kinesis
- Google PubSub (partner maintained)
- Solace PubSub+ (partner maintained)
- Azure Event Hubs (partner maintained)
- Apache RocketMQ (partner maintained)
Spring Cloud Stream的核心构件是:
- Destination Binders: 负责提供与外部消息系统集成的组件。
- Destination Bindings: 作为消息中间件与应用程序的提供者和消费者之间的桥梁。
- Message: 生产者和消费者用于与目的地装订器沟通的典型数据结构(从而通过外部消息系统与其他应用程序进行通信的典型数据结构)。
为什么用 Cloud Stream?
- 解耦。使用了 SCS 之后,我们只需要在配置文件中配置下对应的中间件服务器地址等信息,然后就可使用,使得业务中不需要出现具体的消息中间件。
- 便于迁移。例如项目中一开始使用的是 rabbitmq,后期要想迁移成 kafka 的话,如果使用传统方式,在使用的地方使用具体消息中间件的话,那么迁移的成本会很高,而使用 SCS 的话,只需要更改配置文件即可。
如何使用?(集成 rocket mq )
配置 JAVA_HOME 路径(以下为 mac 环境下的配置)
-
chmod 777 /etc/profile
-
sudo vim /etc/profile
-
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home
export PATH=$JAVA_HOME/bin:$PATH
-
source /etc/profile
安装启动 Rocket MQ
- 从官网 下载二进制文件
- 启动 nameserver
nohup sh bin/mqnamesrv &
- 启动 broker
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
- 设置 nameserver 地址
export NAMESRV_ADDR=localhost:9876
- 生产者发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
- 消费者消费消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 关闭 broker
sh bin/mqshutdown broker
- 关闭 nameserver
sh bin/mqshutdown namesrv
提示:
如果没有执行
export NAMESRV_ADDR=localhost:9876
会导致
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
集成 SCS
-
通过 https://start.spring.io/ 创建一个初始化项目
-
这里贴出
pom
文件配置<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>mq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.2.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
-
创建
CustomerChannel
public interface CustomerChannel { /** * 这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName> */ String OUTPUT = "my-output"; String INPUT = "my-input"; @Output(CustomerChannel.OUTPUT) MessageChannel output(); @Input(CustomerChannel.INPUT) SubscribableChannel input(); }
-
定义
TestController
@RestController @EnableBinding({CustomerChannel.class}) public class TestController { private final CustomerChannel customerChannel; public TestController(CustomerChannel customerChannel) { this.customerChannel = customerChannel; } /** * 使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤 */ @PostMapping("/message-send") public String testCustomInterfaceSendMsg() { Message<String> message = MessageBuilder.withPayload("send message") .setHeader(RocketMQHeaders.TAGS, "tag2") .setHeader("mytag", "my-tag") .build(); this.customerChannel.output().send(message); Message<String> message2 = MessageBuilder.withPayload("send message") .setHeader(RocketMQHeaders.TAGS, "tag3") .setHeader("mytag", "your-tag") .build(); this.customerChannel.output().send(message2); return "success"; } /** * 使用@StreamListener来监听消息 */ @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='my-tag'") public void testCustomListener(Message message) { System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString()); } /** * 使用@StreamListener来监听消息 */ @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='your-tag'") public void testCustomListenerFilter(Message message) { System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString()); } }
-
配置
application.yml
spring: cloud: stream: rocketmq: binder: name-server: localhost:9876 enable-msg-trace: true bindings: my-input: consumer: tags: tag2 || tag1 || tag3 || tag4 # tag 为 tag1/tag2/tag3/tag4 bindings: my-input: destination: my-stream-topic # 相当于 rocketmq 的 topic group: my-stream-group binder: rocketmq # consumer: instanceCount: 1 # 指定实例数量 my-output: destination: my-stream-topic # 相当于 rocketmq 的 topic
-
运行
MQApplication
,使用 POST 方法请求 localhost:8989/message-send
核心原理
消息发送和消费的流程:
-
消息通过
MessageChannel(output)
进行发送,AbstractMessageChannel
实现了MessageChannel
,AbstractSubscribableChannel
继承了AbstractMessageChannel
并且实现了SubscribableChannel
,重写了其中的subscribe
方法,subscribe()
指定了MessageHandler
,最终会调用RocketMQMessageHandler
发送消息 -
消息发出之后,对应的消息中间件内部会有通道适配器,将中间件特有的消息格式转换为 SpringMessage,然后发送到
MessageChannel(input)
-
StreamListener
订阅了对应的 input ,根据一定的条件,就能收到消费者发出的消息。
本文参考
https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en