MQSpring Cloud

RocketMQ 与 Spring Cloud Stream整合

2020-08-22  本文已影响0人  梅西爱骑车

一、 概述

本文我们来学习 Spring Cloud Alibaba 提供的 Spring Cloud Stream RocketMQ 组件,基于 Spring Cloud Stream 的编程模型,接入 RocketMQ 作为消息中间件,实现消息驱动的微服务。
在开始本文之前,要对 RocketMQ 进行简单的学习。可以阅读本系列前面的[RocketMQ]文章,在本机搭建一个 RocketMQ 服务。

二、Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,使用 Spring Integration 与 Broker 进行连接。

一般来说,消息队列中间件都有一个 Broker Server(代理服务器),消息中转角色,负责存储消息、转发消息。

例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Spring Cloud Stream 提供了消息中间件的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:BinderBinding

Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。

public interface Binder<T, 
 C extends ConsumerProperties, // 消费者配置
 P extends ProducerProperties> { // 生产者配置

 // 创建消费者的 Binding
 Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

 // 创建生产者的 Binding
 Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);

}

Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

最终整体交互如下图所示: Spring Cloud Stream Application binder&bindings

三、 快速入门

本小节,我们一起来快速入门下,会创建 2 个项目,分别作为生产者和消费者。最终项目如下图所示: producer和consumer项目

3.1 搭建生产者

3.1.1 引入依赖

创建 [pom.xml]文件中,引入 Spring Cloud Alibaba RocketMQ 相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.erbadagang.springcloud.stream</groupId>
    <artifactId>sc-stream-rocketmq-producer</artifactId>
    <version>0.0.1</version>

    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <spring.boot.version>2.2.4.RELEASE</spring.boot.version>
        <spring.cloud.version>Hoxton.SR1</spring.cloud.version>
        <spring.cloud.alibaba.version>2.2.0.RELEASE</spring.cloud.alibaba.version>
    </properties>

    <!--
        引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
        在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
     -->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>${spring.cloud.alibaba.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>
    </dependencies>

</project>

3.1.2 配置文件

[application.yaml]

spring:
  application:
    name: stream-rocketmq-producer-application
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        erbadagang-output:
          destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON

        trek-output:
          destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
      # Spring Cloud Stream RocketMQ 配置项
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
        bindings:
          erbadagang-output:
            # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
            producer:
              group: test # 生产者分组
              sync: true # 是否同步发送消息,默认为 false 异步。

server:
  port: 18080

同时我们设置了2个binding,模拟2个topic情形。

spring.cloud.stream 为 Spring Cloud Stream 配置项,对应 BindingServiceProperties 类。配置的层级有点深,我们一层一层来看看。

spring.cloud.stream.bindings 为 Binding 配置项,对应 BindingProperties Map。其中,key 为 Binding 的名字。要注意,虽然说 Binding 分成 Input 和 Output 两种类型,但是在配置项中并不会体现出来,而是要在稍后搭配 @Input 还是 @Output 注解,才会有具体的区分。

这里,我们配置了一个名字为 erbadagang-outputtrek-output 的 Binding。从命名上,我们的意图是想作为 Output Binding,用于生产者发送消息。

spring.cloud.stream.rocketmq 为 Spring Cloud Stream RocketMQ 配置项。

spring.cloud.stream.rocketmq.binder 为 RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类。

spring.cloud.stream.rocketmq.bindings 为 RocketMQ 自定义 Binding 配置项,用于对通用的spring.cloud.stream.bindings 配置项的增强,实现 RocketMQ Binding 独特的配置。该配置项对应 RocketMQBindingProperties Map,其中 key 为 Binding 的名字,需要对应上噢。

这里,我们对名字为 erbadagang-output 的 Binding 进行增强,进行 Producer 的配置。其中,producer 为 RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类。

MySource.java

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

    @Output("erbadagang-output")
    MessageChannel erbadagangOutput();

    @Output("trek-output")
    MessageChannel trekOutput();

}

这里,我们通过 @Output 注解,声明了一个名字为 erbadagang-output 的 Output Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Output 注解的方法的返回结果为 MessageChannel 类型,可以使用它发送消息。

3.1.4 Demo01Message

创建 [Demo01Message]类,示例 Message 消息。代码如下:

public class Demo01Message {

 /**
 * 编号
 */
 private Integer id;

 // ... 省略 setter/getter/toString 方法

}

3.1.5 Demo01Controller

创建 [Demo01Controller]类,提供发送消息的 HTTP 接口。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer.controller;

import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.Demo01Message;
import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private MySource mySource;//<1>

    @GetMapping("/send")
    public boolean send() {
        // <2>创建 Message
        Demo01Message message = new Demo01Message()
                .setId(new Random().nextInt());
        // <3>创建 Spring Message 对象
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                .build();
        // <4>发送消息
        return mySource.erbadagangOutput().send(springMessage);
    }

    @GetMapping("/sendTrek")
    public boolean sendTrek() {
        // <2>创建 Message
        Demo01Message message = new Demo01Message()
                .setId(new Random().nextInt());
        // <3>创建 Spring Message 对象
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                .build();
        // <4>发送消息
        return mySource.trekOutput().send(springMessage);
    }

    @GetMapping("/send_delay")
    public boolean sendDelay() {
        // 创建 Message
        Demo01Message message = new Demo01Message()
                .setId(new Random().nextInt());
        // 创建 Spring Message 对象
        Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 设置延迟级别为 3,10 秒后消费。
                .build();
        // 发送消息
        boolean sendResult = mySource.erbadagangOutput().send(springMessage);
        logger.info("[sendDelay][发送消息完成, 结果 = {}]", sendResult);
        return sendResult;
    }

    @GetMapping("/send_tag")
    public boolean sendTag() {
        for (String tag : new String[]{"trek", "specialized", "look"}) {
            // 创建 Message
            Demo01Message message = new Demo01Message()
                    .setId(new Random().nextInt());
            // 创建 Spring Message 对象
            Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                    .setHeader(MessageConst.PROPERTY_TAGS, tag) // 设置 Tag
                    .build();
            // 发送消息
            mySource.erbadagangOutput().send(springMessage);
        }
        return true;
    }

}

3.1.6 ProducerApplication

创建 [ProducerApplication]类,启动应用。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.producer;

import com.erbadagang.springcloudalibaba.stream.rocketmq.producer.message.MySource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySource 接口。

3.2 搭建消费者

创建项目,作为消费者。

3.2.1 引入依赖

创建 [pom.xml],引入 Spring Cloud Alibaba RocketMQ 相关依赖。

友情提示:和「3.1.1 引入依赖」基本一样。

3.2.2 配置文件

创建 [application.yaml]配置文件,添加 Spring Cloud Alibaba RocketMQ 相关配置。

spring:
  application:
    name: erbadagang-consumer-application
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        erbadagang-input:
          destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
          group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消费者分组,命名规则:组名+topic名

        trek-input:
          destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
          group: trek-consumer-group-TREK-TOPIC-01 # 消费者分组,命名规则:组名+topic名
      # Spring Cloud Stream RocketMQ 配置项
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
        bindings:
          erbadagang-input:
            # RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
            consumer:
              enabled: true # 是否开启消费,默认为 true
              broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费,如果要使用广播消费值设成true。

server:
  port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

总体来说,和「3.1.2 配置文件」是比较接近的,所以我们只说差异点噢。

spring.cloud.stream.bindings 为 Binding 配置项。

这里,我们配置了一个名字为 erbadagang-inputtrek-input的 Binding。从命名上,我们的意图是想作为 Input Binding,用于消费者消费消息。

spring.cloud.stream.rocketmq.bindings 为 RocketMQ 自定义 Binding 配置项。

这里,我们对名字为 erbadagang-input 的 Binding 进行增强,进行 Consumer 的配置。其中,consumer 为 RocketMQ Producer 配置项,对应 RocketMQConsumerProperties 类。

这里一点要注意!!!加了三个感叹号,一定要理解集群消费和广播消费的差异。我们来举个例子,以有两个消费者分组 A 和 B 的场景举例子:

通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:

这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。

同时,相同消费者分组的多个实例,可以实现高可用,保证在一个实例意外挂掉的情况下,其它实例能够顶上。并且,多个实例都进行消费,能够提升消费速度

3.2.3 MySink

创建 [MySink]接口,声明名字为 Input Binding。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {

    String ERBADAGANG_INPUT = "erbadagang-input";
    String TREK_INPUT = "trek-input";

    @Input(ERBADAGANG_INPUT)
    SubscribableChannel demo01Input();

    @Input(TREK_INPUT)
    SubscribableChannel trekInput();

}

这里,我们通过 @Input 注解,声明了一个名字为 erbadagang-inputtrek-input的 Input Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Input 注解的方法的返回结果为 SubscribableChannel 类型,可以使用它订阅消息来消费。MessageChannel 提供的订阅消息的方法如下:

public interface SubscribableChannel extends MessageChannel {

 boolean subscribe(MessageHandler handler); // 订阅

 boolean unsubscribe(MessageHandler handler); // 取消订阅

}

那么,我们是否要实现 MySink 接口呢?和MySource一样,答案也是不需要,还是全部交给 Spring Cloud Stream 的 BindableProxyFactory 大兄弟来解决。BindableProxyFactory 会通过动态代理,自动实现 MySink 接口。 而 @Input注解的方法的返回值,BindableProxyFactory 会扫描带有 @Input 注解的方法,自动进行创建。

例如说,#demo01Input() 方法被自动创建返回结果为 DirectWithAttributesChannel,它也是 SubscribableChannel 的子类。

3.2.4 Demo01Message

创建 [Demo01Message]类,示例 Message 消息。

友情提示:和[3.1.4 Demo01Message]基本一样。

3.2.5 Demo01Consumer

创建 [Demo01Consumer] 类,消费消息。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;

import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.message.Demo01Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class Demo01Consumer {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @StreamListener(MySink.ERBADAGANG_INPUT)
    public void onMessage(@Payload Demo01Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

    @StreamListener(MySink.TREK_INPUT)
    public void onTrekMessage(@Payload Demo01Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}

在方法上,添加 @StreamListener 注解,声明对应的 Input Binding。这里,我们使用 MySink.ERBADAFANG_INPUT

又因为我们消费的消息是 POJO 类型,所以我们需要添加 @Payload 注解,声明需要进行反序列化成 POJO 对象。

3.2.6 ConsumerApplication

创建 [ConsumerApplication]类,启动应用。代码如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer;

import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener.MySink;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySink 接口。

3.3 测试单集群多实例的场景

本小节,我们会在一个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 erbadagang-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。

因为 IDEA 默认同一个程序只允许启动 1 次,所以我们需要配置 DemoProviderApplication 为 Allow parallel run。

② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// ConsumerApplication 控制台 01
2020-08-05 09:39:29.073  INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:78 消息内容:Demo01Message{id=-1682643477}]
2020-02-22 09:41:32.754  INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:78 消息内容:Demo01Message{id=1890257867}]

// ConsumerApplication 控制台 02
2020-08-05 09:41:32.264  INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:80 消息内容:Demo01Message{id=1401668556}]

符合预期。从日志可以看出,每条消息仅被消费一次。
访问http://localhost:18080/demo01/sendTrek也是同样效果,只不过是演示如何操作多个topic。

3.4 测试多消费集群多实例的场景

本小节,我们会在二个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 derbadagangemo01-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。

② 修改 sca-stream-rocketmq-consumer 项目的配置文件,修改 spring.cloud.stream.bindings.erbadagang-input.group 配置项,将消费者分组改成 NEW-erbadagang-consumer-group-ERBADAGANG-TOPIC-01

然后,执行 ConsumerApplication 两次,再启动两个消费者的实例,从而实现在消费者分组 NEW-erbadagang-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。

③ 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-08-06 10:17:07.886  INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:78 消息内容:Demo01Message{id=-276398167}]
2020-08-06 10:17:08.237  INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:78 消息内容:Demo01Message{id=-250975158}]

// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-02-22 10:17:08.710  INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:80 消息内容:Demo01Message{id=412281482}]

// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-08-06 10:17:07.887  INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:51 消息内容:Demo01Message{id=-276398167}]
2020-02-22 10:17:08.238  INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:51 消息内容:Demo01Message{id=-250975158}]

// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-08-06 10:17:08.787  INFO 51096 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:77 消息内容:Demo01Message{id=412281482}]

从日志可以看出,每条消息被每个消费者集群都进行了消费,且仅被消费一次。

3.5 小结

至此,我们已经完成了 Stream RocketMQ 的快速入门。回过头看看 Binder 和 Binding 的概念,是不是就清晰一些了。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

本文源代码
上一篇 下一篇

猜你喜欢

热点阅读