Stream使用入门

2021-07-21  本文已影响0人  文景大大

一、Stream介绍

SpringCloud Stream是在Spring Messaging和Spring Integration这两个项目的基础上发展而来的,我们在项目中可能使用多个MQ,或者在项目的某个阶段需要更换MQ,如果使用了Stream,我们只要更改配置即可,完全不用修改消息发送和接收的代码。而且Stream定义的消息收发模型使得我们代码更加简洁易懂。

详细的介绍文档可以参考:
Spring Cloud Stream 体系及原理介绍 (qq.com)

二、入门案例

首先我们从start.spring.io下载一个全新的项目,需要的依赖有web、lombok,下载好了之后再自行引入stream-rocketmq的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    <version>2021.1</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

然后,我们开始创建生产者:

@Slf4j
@Component
@EnableBinding(Source.class)
public class MyMessageProducer {

    @Autowired
    private Source source;

    public void send(String message){
        log.info("生产者开始发送消息!");
        source.output().send(MessageBuilder.withPayload(message).build());
    }

}

我们使用Controller来出发生产者发送消息:

@RestController
public class ProducerController {

    @Autowired
    private MyMessageProducer producer;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        producer.send("hello spring cloud stream!");
        return "OK";
    }

}

随后,我们创建消费者:

@Slf4j
@Component
@EnableBinding(Sink.class)
public class MyMessageConsumer {

    @StreamListener(Sink.INPUT)
    public void receive(String message){
        log.info("监听到消息内容为:{}", message);
    }
    
}

最后,是我们最重要的配置文件内容:

server:
  port: 8081

spring:
  application:
    name: stream-demo
  cloud:
    stream:
      bindings:
        output:
          destination: myRocketMQTopic
          group: test-stream
        input:
          destination: myRocketMQTopic
          group: test-stream
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876

在启动MQ服务的前提下,启动我们的项目,就能成功运行一个Stream-Rocketmq的快速入门案例了。我们浏览器访问链接,使得生产者发送消息,随后消费者就能监听到这个消息。

2021-07-16 09:35:26.805  INFO 16920 --- [nio-8081-exec-1] c.e.s.producer.MyMessageProducer         : 生产者开始发送消息!
2021-07-16 09:35:49.737  INFO 16920 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer         : 监听到消息内容为:hello spring cloud stream!
2021-07-16 09:36:14.371  INFO 16920 --- [nio-8081-exec-3] c.e.s.producer.MyMessageProducer         : 生产者开始发送消息!
2021-07-16 09:36:14.379  INFO 16920 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer         : 监听到消息内容为:hello spring cloud stream!

三、自定义Channel

在如上的入门案例中,我们使用的是Stream自带的output、input通道,我们完全可以仿照它们自定义自己的Channel。

public interface MyOutputChannel {

    @Output("myOutput")
    MessageChannel myOutput();

}
public interface MyInputChannel {

    @Input("myInput")
    SubscribableChannel myInput();

}

然后我们新建一个生产者,使用自定义的output通道。

@Slf4j
@Component
@EnableBinding(MyOutputChannel.class)
public class MyMessageProducer2 {

    @Autowired
    private MyOutputChannel myOutputChannel;

    public void send(String message){
        log.info("MyMessageProducer2生产者开始发送消息!");
        myOutputChannel.myOutput().send(MessageBuilder.withPayload(message).build());
    }

}

新建一个消费者,监听自定义的input通道。

@Slf4j
@Component
@EnableBinding(MyInputChannel.class)
public class MyMessageConsumer2 {

    @StreamListener("myInput")
    public void receive(String message){
        log.info("myInput监听到消息内容为:{}", message);
    }

}

再新建一个controller来触发发送消息:

@RestController
public class ProducerController2 {

    @Autowired
    private MyMessageProducer2 producer;

    @GetMapping("/sendMessage2")
    public String sendMessage(){
        producer.send("hello spring cloud stream using customize channel!");
        return "OK";
    }

}

最后,配置文件中需要增加我们自定义的output和input通道:

server:
  port: 8081

spring:
  application:
    name: stream-demo
  cloud:
    stream:
      bindings:
        output:
          destination: myRocketMQTopic
          group: test-stream
        myOutput:
          destination: myRocketMQTopic2
          group: test-stream2
        input:
          destination: myRocketMQTopic
          group: test-stream
        myInput:
          destination: myRocketMQTopic2
          group: test-stream2
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876

如此,我们两个例子中的channel可以同时工作了。

2021-07-16 10:16:46.495  INFO 18580 --- [nio-8081-exec-3] c.e.s.producer.MyMessageProducer2        : MyMessageProducer2生产者开始发送消息!
2021-07-16 10:16:52.220  INFO 18580 --- [nio-8081-exec-2] c.e.s.producer.MyMessageProducer2        : MyMessageProducer2生产者开始发送消息!
2021-07-16 10:16:52.808  INFO 18580 --- [nio-8081-exec-4] c.e.s.producer.MyMessageProducer2        : MyMessageProducer2生产者开始发送消息!
2021-07-16 10:16:55.610  INFO 18580 --- [nio-8081-exec-5] c.e.s.producer.MyMessageProducer         : 生产者开始发送消息!
2021-07-16 10:16:55.729  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer         : 监听到消息内容为:hello spring cloud stream!
2021-07-16 10:16:56.306  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2        : myInput监听到消息内容为:hello spring cloud stream using customize channel!
2021-07-16 10:16:56.306  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2        : myInput监听到消息内容为:hello spring cloud stream using customize channel!
2021-07-16 10:16:58.249  INFO 18580 --- [nio-8081-exec-6] c.e.s.producer.MyMessageProducer         : 生产者开始发送消息!
2021-07-16 10:16:58.256  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer         : 监听到消息内容为:hello spring cloud stream!
2021-07-16 10:16:59.310  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2        : myInput监听到消息内容为:hello spring cloud stream using customize channel!

四、消息过滤

有两种消息过滤的方法,第一种是设置Cosumer的消费Tag,指定的Tag才会被消费,未指定的Tag的消息则被过滤;另外一种是根据自定义header内容进行过滤。

为了区别以上的例子,我们重新创建一个Channel:

public interface NewChannel {

    String INPUT = "newInput";
    String OUTPUT = "newOutput";

    @Output(NewChannel.INPUT)
    MessageChannel output();

    @Input(NewChannel.INPUT)
    SubscribableChannel input();
}

然后分别是生产者和消费者:

@Slf4j
@Component
@EnableBinding(NewChannel.class)
public class MyMessageProducer3 {

    @Autowired
    private NewChannel newChannel;

    public void send(Person person){
        log.info("MyMessageProducer3生产者开始发送消息!");
        Message<Person> message1 = MessageBuilder.withPayload(person)
                // 指定ROcketMQ中消息的TAG
                .setHeader(RocketMQHeaders.TAGS, "animal")
                // 自定义header用于consumer监听器进行过滤
                .setHeader("MyTag", "TAG1")
                .build();

        Message<Person> message2 = MessageBuilder.withPayload(person)
                .setHeader(RocketMQHeaders.TAGS, "person")
                .setHeader("MyTag", "TAG2")
                .build();

        Message<Person> message3 = MessageBuilder.withPayload(person)
                .setHeader(RocketMQHeaders.TAGS, "person")
                .setHeader("MyTag", "TAG3")
                .build();
        newChannel.output().send(message1);
        newChannel.output().send(message2);
        newChannel.output().send(message3);
    }

}
@Slf4j
@Component
@EnableBinding(NewChannel.class)
public class MyMessageConsumer3 {

    @StreamListener(value = NewChannel.INPUT, condition = "headers['MyTag']=='TAG2'")
    public void receive(Message<Person> message){
        log.info("MyMessageConsumer3监听到消息内容为:{}", message);
        log.info("MyMessageConsumer3消息的payload:{}", message.getPayload());
        log.info("MyMessageConsumer3消息的header:{}", message.getHeaders().get("MyTag"));
    }

}

消息的实体类为:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person {

    private String name;
    private Integer age;
}

新建一个Controller用于发送消息:

@RestController
public class ProducerController3 {

    @Autowired
    private MyMessageProducer3 producer;

    @GetMapping("/sendMessage3")
    public String sendMessage(){
        Person tom = new Person("tom", 21);
        producer.send(tom);
        return "OK";
    }

}

配置文件修改为:

server:
  port: 8081

spring:
  application:
    name: stream-demo
  cloud:
    stream:
      bindings:
        newInput:
          destination: newRocketMQTopic
          group: test-stream3
        newOutput:
          destination: newRocketMQTopic
          group: test-stream3
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          enable-msg-trace: true
        bindings:
          newInput:
            consumer:
              # 仅接受指定tags的消息
              tags: person || plant

此时重启应用,我们访问controller的接口,就能同时发送三条消息出去。

在配置文件中,我们指定消费者只消费tags为person和plant的消息,对于tags为animal的消息则会显示CONSUMED_BUT_FILTERED

在消费者消费配置代码中,我们的注解@StreamListener指定了condition为只消费headers['MyTag']=='TAG2'的消息;而TAG3的消息也被CONSUMED,只不过没有找到对应的消费者,所以控制台显示:

Cannot find a @StreamListener matching for message with id: 48260ac4-772b-54b8-2ae0-8b91635729a9

五、函数式编程

自从SpringCloud升级到3.1版本以上,如上示例中命令式编程注解@EnableBinding、@Output、@Input等都已经被标记为废弃了,废弃说明如下:

@deprecated as of 3.1 in favor of functional programming model

那么我们该如何使用函数式编程改造如上的示例呢。

5.1 定时驱动模型

我们需要重新创建一个工程,父工程stream-function,并且引入如下的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-stream-rocketmq -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    <version>2021.1</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

然后创建一个子工程consumer,新建一个Producer类:

@Slf4j
@Component
public class FunctionConsumer {

    @Bean
    public Consumer<Date> mySink(){
        return (message) -> {
            log.info("收到的消息为:{}", message);
        };
    }

}

增加配置文件:

server:
  port: 8082

spring:
  application:
    name: stream-demo-consumer
  cloud:
    stream:
      bindings:
        # 格式:方法名-类型-序号
        # 方法名表示消费消息的方法的名称
        # 类型in表示消息接收,out表示消息发送
        # 序号用来区分不同的消费者
        mySink-in-0:
          # topic
          destination: functionRocketMQTopic
          group: test-stream4
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          enable-msg-trace: true
    function:
      # 消费者处理消息的方法名称
      definition: mySink

至此,消费者就完成了,可以启动运行,就会开始监听functionRocketMQTopic这个主题的消息了。

然后我们还需要再创建一个子工程producer,新建一个Producer类:

@Slf4j
@Component
public class FunctionProducer {

    @Bean
    public Supplier<Date> mySource(){
        return () -> {
            log.info("当前发送消息开始!");
            return new Date();
        };
    }

}

然后增加配置文件如下:

server:
  port: 8081

spring:
  application:
    name: stream-demo-producer
  cloud:
    stream:
      bindings:
        # 格式:方法名-类型-序号
        # 方法名表示消费消息的方法的名称
        # 类型in表示消息接收,out表示消息发送
        # 序号用来区分不同的消费者
        mySource-out-0:
          # topic
          destination: functionRocketMQTopic
          group: test-stream4
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          enable-msg-trace: true
    function:
      # 生产者产生消息的方法名称
      definition: mySource

如此,我们生产者也配置好了,启动就可以运行了。

5.2 streamBridge模型

该模型主要是为了满足由业务动作触发消息发送的场景。我们只需要新建一个Producer类:

@Slf4j
@Service
public class StreamBridgeProducer {

    @Autowired
    private StreamBridge streamBridge;

    public void sendDateMessage(){
        Message<Date> message = MessageBuilder.withPayload(new Date()).build();
        log.info("开始发送消息!");
        streamBridge.send("mySource-out-1", message);
    }

}

然后新建一个Controller供调用触发消息的发送:

@RestController
public class SendController {

    @Autowired
    private StreamBridgeProducer streamBridgeProducer;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        streamBridgeProducer.sendDateMessage();
        return "OK";
    }

}

最后,配置文件改为如下:

server:
  port: 8081

spring:
  application:
    name: stream-demo-producer
  cloud:
    stream:
      bindings:
        mySource-out-1:
          destination: functionRocketMQTopic
          group: test-stream4
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          enable-msg-trace: true

如此,我们启动该生产者,在调用接口的时候,消息就能成功发送了。

5.3 多通道配置

我们可以在一个项目中同时配置多个生产者和多个消费者,甚至是既有生产者,也有消费者。

这里只给出配置文件了,其余内容可以参考上述已经给出的例子。

server:
  port: 8081

spring:
  application:
    name: stream-demo-producer
  cloud:
    stream:
      bindings:
        # 多个生产者通道时,定时消息模式destination可以相同,streamBridge模式destination不能相同
        mySource-out-0:
          destination: functionRocketMQTopic0
        mySource-out-1:
          destination: functionRocketMQTopic1
        mySource-out-2:
          destination: functionRocketMQTopic2
        mySource2-out-3:
          destination: functionRocketMQTopic3
        # 消费者通道  
        mySink-in-0:
          destination: functionRocketMQTopic0
          group: test-stream4
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          enable-msg-trace: true
    function:
      definition: mySource;mySource2;mySink

六、消息分区

SpringCloudStream消息分区是在某些场景下确保具有相同特征的消息被同一个消费者消费,它能帮助某些不支持消息分区的消息队列实现消息分区的功能。

生产者的配置文件改为:

server:
  port: 8084

spring:
  application:
    name: stream-demo-producer
  cloud:
    stream:
      bindings:
        mySource-out-0:
          destination: functionRocketMQTopic
          producer:
            # 指定分区键的表达式规则
            partition-key-expresion: payload
            # 指定分区数量
            partition-count: 2
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          enable-msg-trace: true
    function:
      definition: mySource

多个消费者的配置文件改为:

server:
  port: 8082

spring:
  application:
    name: stream-demo-consumer2
  cloud:
    stream:
      # 指定当前消费者分区数量
      instance-count: 2
      # 指定当前消费者的索引
      instance-index: 0
      bindings:
        mySink-in-0:
          destination: functionRocketMQTopic
          group: test-stream4
          consumer:
            # 开启分区
            partitioned: true
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          enable-msg-trace: true
    function:
      definition: mySink
server:
  port: 8083

spring:
  application:
    name: stream-demo-consumer2
  cloud:
    stream:
      # 指定当前消费者分区数量
      instance-count: 2
      # 指定当前消费者的索引
      instance-index: 1
      bindings:
        mySink-in-0:
          destination: functionRocketMQTopic
          group: test-stream4
          consumer:
            # 开启分区
            partitioned: true
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
          enable-msg-trace: true
    function:
      definition: mySink

待实验验证。

七、多消息队列

我们可以使用Stream配置多个不同类型的MQ,具体实例待补充

SpringCloudStream自学文档 - 知乎 (zhihu.com)

八、其它特性

关于Stream的用法其实已经讲的差不多了,剩下的主要就是RocketMQ Binder的一些配置属性,比如:

等等,其它特性都可以参考github上rocketmq的文档,本文就不再一一列举了。

RocketMQ en · alibaba/spring-cloud-alibaba Wiki · GitHub

九、补充

有的时候,发送MQ消息显示消息没有成功发送,出现了很奇怪的问题,网上找的方案是,需要关闭vip通道。

spring:
 cloud:
   stream:
      rocketmq:
          default:
            producer:
              # 关闭vip channel,否则消息会发送失败
              vipChannelEnabled: false

尝试之后,确实有效。

上一篇下一篇

猜你喜欢

热点阅读