Spring Cloud 学习笔记 - No.7 消息驱动 St

2018-07-18  本文已影响134人  专职跑龙套

请先阅读之前的内容:

Spring Cloud Stream

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架,为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅消费组以及消息分区这三个核心概念。
简单的说,Spring Cloud Stream 本质上就是整合了 Spring Boot 和 Spring Integration,实现了一套轻量级的消息驱动的微服务框架。
通过使用 Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。目前为止 Spring Cloud Stream 只支持下面两个消息中间件的自动化配置:

构建一个 Spring Cloud Stream 消费者

我们利用之前创建的 eureka-consumer 项目。
首先在 pom.xml 中添加如下的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

其中 spring-cloud-starter-stream-rabbit 是 Spring Cloud Stream 对 RabbitMQ 支持的封装,其中包含了对 RabbitMQ 的自动化配置等内容。

随后创建用于接收来自 RabbitMQ 消息的消费者 SinkReceiver

@EnableBinding(Sink.class)
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        logger.info("Received: " + payload);
    }

}

重启项目,从日志中可以看到声明了一个名为 input.anonymous.cWlqMyH9Tm--INXERE6nhQ 的队列,并通过 RabbitMessageChannelBinder 将自己绑定为它的消费者。

c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.cWlqMyH9Tm--INXERE6nhQ, bound to: input

这些信息我们也能在 RabbitMQ 的控制台中发现它们:


RabbitMQ 的控制台

点击进去,通过 Publish Message 功能来发送一条消息到该队列中:


通过 Publish Message 功能来发送一条消息到该队列中

从下面的日志可以看出 SinkReceiver 读取了消息队列中的内容,由于我们没有对消息进行序列化,所以输出的只是该对象的引用:

[m--INXERE6nhQ-1] com.example.SinkReceiver                 : Received: [B@beb7ce8

在上面的操作中,我们并没有手动去配置 RabbitMQ 的信息,比如 IP,端口等等,这是基于 Spring Boot 的设计理念,提供了对 RabbitMQ 默认的自动化配置。当然,我们可以手动在 application.properties 文件中去配置,例如:

spring.cloud.stream.bindings.input.destination=my_destination

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

编写消费消息的单元测试用例

@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkReceiverTests.SinkSender.class})
public class SinkReceiverTests {

    @Autowired
    private SinkSender sinkSender;

    @Test
    public void sinkSenderTester() {
        sinkSender.output().send(MessageBuilder.withPayload("Testing Message").build());
    }

    public interface SinkSender {

        String OUTPUT = "input";

        @Output(SinkSender.OUTPUT)
        MessageChannel output();

    }

}

在上面的单元测试中,我们通过 @Output(SinkSender.OUTPUT) 定义了一个输出通过,而该输出通道的名称为 input,与前文中的 Sink 中定义的消费通道同名,所以这里的单元测试与前文的消费者程序组成了一对生产者与消费者。
运行该单元测试,日志可以看出 SinkReceiver 读取了消息队列中的内容:

[m--INXERE6nhQ-1] com.example.SinkReceiver                 : Received: [B@89040a9

Spring Cloud Stream 应用模型

图片引自:https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/

Spring Cloud Stream 应用模型

绑定器

Spring Cloud Stream 构建的应用程序与消息中间件之间是通过绑定器: Binder 相关联的,绑定器对于应用程序而言起到了隔离作用,它使得不同消息中间件的实现细节对应用程序来说是透明的。
当我们需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是更换它们对应的 Binder 绑定器而不需要修改任何Spring Boot的应用逻辑。

所以对于每一个 Spring Cloud Stream 的应用程序来说,它不需要知晓消息中间件的通信细节,它只需要知道 Binder 对应用程序提供的概念去实现即可,而这个概念就是消息通道:Channel

发布-订阅模式

消息会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。

这里所提到的 Topic 主题是 Spring Cloud Stream 中的一个抽象概念,用来代表发布共享消息给消费者的地方。
在不同的消息中间件中,Topic 可能对应着不同的概念,比如:在 RabbitMQ 中的它对应了 Exchange、而在 Kakfa 中则对应了 Kafka 中的 Topic。

在上面的例子中,应用启动的时候,在 RabbitMQ 的 Exchange 中也创建了一个名为 input 的 Exchange交换器。例如我们分别以 3001 和 3002 两个端口启动 eureka-consumer 项目。
可以看到 Queues 中有两个 Queue:

Queues 中有两个 Queue

可以看到 Channels 中有两个 Channel:


Channels 中有两个 Channel

可以看出 Exchanges 中只有一个名称为 input 的 Exchange,即 Topic 主题。但是点进去,可以看出这个名称为 input 的 Exchange 有绑定了两个消息队列:

Exchanges 中只有一个名称为 input 的 Exchange
这个名称为 input 的 Exchange 有两个 Bindings
如果我们通过 Exchange 页面的 Publish Message 来发布消息,可以发现两个启动的应用程序都输出了消息内容。

图片引自 http://blog.didispace.com/spring-cloud-starter-dalston-7-2/

发布-订阅模式
相对于点对点队列实现的消息通信来说,Spring Cloud Stream 采用的发布-订阅模式可以有效的降低消息生产者与消费者之间的耦合,当我们需要对同一类消息增加一种处理方式时,只需要增加一个应用程序并将输入通道绑定到既有的 Topic 中就可以实现功能的扩展,而不需要改变原来已经实现的任何内容。

消费组

很多情况下,消息生产者发送消息给某个具体微服务时,只希望被消费一次,但是上面我们启动两个应用(3001 和 3002 两个端口),这个消息出现了被重复消费两次的情况。
为了解决这个问题,在 Spring Cloud Stream 中提供了消费组的概念。

如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过spring.cloud.stream.bindings.<channelName>.group 属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员真正的收到消息并进行处理。

例如,我们在 eureka-consumer 项目的配置中增加:

spring.cloud.stream.bindings.input.group=eureka-consumer-input-group

重启两个端口的实例,随后通过 Exchange 页面的 Publish Message 来发布消息,可以发现只有一个启动的应用程序都输出了消息内容。并且有时候是 3001 端口的实例处理,有时候是 3002 端口的实例处理。
也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的。

消息分区

在上面的实验中可以看到,消费组并无法控制消息具体被哪个实例消费。但是对于一些业务场景,就需要对于一些具有相同特征的消息每次都可以被同一个消费实例处理。比如:一些用于监控服务,为了统计某段时间内消息生产者发送的报告内容,监控服务需要在自身内容聚合这些数据,那么消息生产者可以为消息增加一个固有的特征 ID 来进行分区,使得拥有这些 ID 的消息每次都能被发送到一个特定的实例上实现累计统计的效果,否则这些数据就会分散到各个不同的节点导致监控结果不一致的情况。

而消息分区概念的引入就是为了解决这样的问题:当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。

例如,我们在 eureka-consumer 项目的配置中增加:

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0

Spring Cloud Stream VS Spring Cloud Bus

我们在 Spring Cloud 学习笔记 - No.3 分布式配置 Config 中使用了 Spring Cloud Bus(结合了 RabbitMQ),那么 Stream 和 Bus 的区别是什么?

RabbitMQ 负载均衡

在上面的例子中,我们始终只有一个 RabbitMQ 实例。在生产环境中,我们可能需要多个 RabbitMQ 实例来实现高并发和高可用。
参见:


引用:
程序猿DD Spring Cloud基础教程
Spring Cloud构建微服务架构:消息驱动的微服务(入门)【Dalston版】
Spring Cloud构建微服务架构:消息驱动的微服务(核心概念)【Dalston版】
Spring Cloud Dalston中文文档

上一篇下一篇

猜你喜欢

热点阅读