玩转SpringCloud Stream
背景及痛点
现如今消息中间件(MQ)在互联网项目中被广泛的应用,特别是大数据行业应用的特别的多,现在市面上也流行这多个消息中间件框架,比如ActiveMQ
、RabbitMQ
、RocketMQ
、Kafka
等,这些消息中间件各有各的优劣,但是想要解决的问题都基本相同。由于每个框架都有它自己的使用方式,这无疑是增加了开发者的学习成本以及添加相同的业务复杂度。框架的变更或者多个中间件的混合使用使得业务逻辑代码中中间件的切换、项目的维护和开发都会变得更加繁琐。
有没有一种技术让我们不再需要关注具体MQ的使用细节,我们只需要专注业务逻辑的开发,让程序根据实际项目的使用自己去适配绑定,自动在各种MQ内切换呢?springcloud stream
便为此而生。
关于stream
我们用一句话来描述stream就是:屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型
官方定义SpringCloud Stream
是一个构建消息驱动微服务的框架,应用通过inputs
或者outputs
来与SpringCloud Stream
中的binder
对象交互,我们通过配置来绑定消息中间件,而SpringCloud Stream
的binder
对象负责与消息中间件交互,所以我们只需要搞清楚如何与SpringCloud Stream
交互即可方便的使用消息中间件。
SpringCloud Stream
通过Spring Integration
来连接消息代理中间件以实现消息事件驱动,它提供了个性化的自动化配置,引用了发布订阅
、消费组
、分区
的三个核心概念,但是目前仅支持RabbitMQ
和Kafka
设计思想
在此之前
以前的架构生产者和消费者通过消息媒介(queue等)传递信息内容(Message),消息必须通过特定的通道(MessageChannel),通过消息的发布与订阅来决定消息的发送和消费(publish/subscrib
)。
引入中间件
现在假如我们用到了RabbitMQ
和Kafka
,由于这两个消息中间件的架构上的不同,像RabbitMQ
有Exchange
,而Kafka
有topiche
和Partitions
分区
(binder中,input对于消费者,output对应生产者。)
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,但是后面因为业务需求,需要改用另外一种消息队列进行迁移,这时候无疑就是一 个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream
给我们提供了一种解耦合的方式。
屏蔽底层差异
在没有绑定器(Builder
)这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间件,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
处理架构
Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
处理架构其遵循了发布-订阅模式,主要使用的就是Topic主题进行广播,RabbitMQ就是Exchange,在Kafka中就是Topic
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
stream流程
stream流程-
Binder
:很方便的连接中间件,屏蔽差异 -
Channel
:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置 -
Source和Sink
:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入
常用api和注解
常用api和注解使用示例
基本环境
-
注册中心
:Eureka,可以是其他。 -
消息中间件
:RabbitMQrabbitmq: host: localhost port: 5672 username: guest password: guest
生产端
依赖
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
# stream 配置
stream:
binders: # 配置绑定的消息中间件的服务信息
defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定
type: rabbit # 消息组件的类型
environment: #相关环境配置,设置rabbitmq的环境
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 通道名称
destination: testExchange # 定义要使用的Exchange的名称
content-type: application/json # 设置消息类型,对象为json,文本是text/plain
binder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbit
eureka:
client:
#表示是否将自己注册进EurekaServer默认为true
register-with-eureka: true
#是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
fetch-registry: true
service-url:
#单机版
defaultZone: http://localhost:8080/eureka/
instance:
prefer-ip-address: true
instance-id: sender01
定义接口
这里需要定义一个接口并实现它,方便其他业务调用。
public interface IMessageProvider {
/**
* 发送接口
* @param msg
* @return
*/
public String send(String msg);
}
接口实现
接口实现中需要添加
@EnableBinding
注解,并引入Source.class
,为什么引入Source.class
呢?因为它是生产者,我们参考stream流程图就可以知道
import com.martain.study.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
@EnableBinding(Source.class)
public class MessageProvider implements IMessageProvider {
/**
* 注入消息发送管道
*/
@Resource
private MessageChannel output;
@Override
public String send(String msg) {
output.send(MessageBuilder.withPayload(msg).build());
System.out.println("******send message:"+msg);
return msg;
}
}
定义测试controller
@RestController
public class TestController {
@Autowired
IMessageProvider messageProvider;
@GetMapping("/sendMsg")
public String sendMsg(){
String msg = UUID.randomUUID().toString();
return messageProvider.send(msg);
}
}
启动类
@SpringBootApplication
public class StreamProviderApplication8801 {
public static void main(String[] args) {
SpringApplication.run(StreamProviderApplication8801.class,args);
}
}
服务启动之后,多次请求/sendMsg
,发送了多条消息。
消费端
依赖
<!--stream rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
与生产者类似,只是bindings中的output改成了input
server:
port: 8802
spring:
application:
name: cloud-stream-consume
cloud:
# stream 配置
stream:
binders: # 配置绑定的消息中间件的服务信息
defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定
type: rabbit # 消息组件的类型
environment: #相关环境配置,设置rabbitmq的环境
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 通道名称
destination: testExchange # 定义要使用的Exchange的名称
content-type: application/json # 设置消息类型,对象为json,文本是text/plain
binder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbit
eureka:
client:
#表示是否将自己注册进EurekaServer默认为true
register-with-eureka: true
#是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
fetch-registry: true
service-url:
#单机版
defaultZone: http://localhost:8080/eureka/
instance:
prefer-ip-address: true
instance-id: recover01
接收服务
接收服务只需要再类名前添加
@EnableBinding()
注解,并引入Sink.class
类,而实际接收的方法中需要添加@StreamListener(Sink.INPUT)
注解。
package com.martain.study.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
/**
* 获取本服务的端口
*/
@Value("${server.port}")
private String serverPort;
/**
* 这里表示监听sink的input
* @param message
*/
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("**** recv msg :"+message.getPayload()+" in port "+serverPort);
}
}
启动类
@SpringBootApplication
public class StreamConsumerApplication8802 {
public static void main(String[] args) {
SpringApplication.run(StreamConsumerApplication8802.class,args);
}
}
启动生产服务后,在启动消费服务,多次请求生产服务发送消息,我们可以发现消费者能很快的消费这些消息。
消费者消费消息消息分组
当我们有多个
消费者
时,这个时候生产者生产一条消息,会发现所有的消费者都会消费这个消息。比如在一些订单系统的场景中,如果一个订单被多个处理服务一起获取到,就容易造成数据错误,那我们如何避免这种情况呢?这时我们就可以使用Stream的消息分组
来解决重复消费问题。
如何实现Stream的消息分组呢?我们只要简单的在yml文件中配置spring.cloud.stream.bindings.input.group
即可。示例如下:
...
spring:
application:
name: cloud-stream-consume
cloud:
# stream 配置
stream:
binders: # 配置绑定的消息中间件的服务信息
defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定
type: rabbit # 消息组件的类型
environment: #相关环境配置,设置rabbitmq的环境
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 通道名称
destination: testExchange # 定义要使用的Exchange的名称
content-type: application/json # 设置消息类型,对象为json,文本是text/plain
binder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbit
group: groupA # 配置分组
...
如果没有设置该属性,当消费服务启动时,会有个随机的组名
。
如果我们将所有的消费服务的group
熟悉都设置成一致的话,这些服务就会在同一个组里面,从而能够保证消息只被应用消费一次。
同一组的消费者是竞争关系,不可以重复消费。
消息持久化
当生产者在持续生产消息,消费服务突然挂了,使得拥有许多消息并没有被消费,如果消费没有配置分组的话,消费服务重启是无法消费未消费的消息的,如果配置了分组的话,当消费服务重启之后可以自动去消费未消费的数据。