springcloudSpringCloudspring cloud

49 Spring Cloud Stream 入门案例

2022-01-15  本文已影响0人  木子教程

准备工作

案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装

消息生产者

(1)创建工程引入依赖

   <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

(2)定义bingding

发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:

public interface Source {
    String OUTPUT = "output";
    
    @Output("output")
    MessageChannel output();
}

这就接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生 产者。

(3)配置application.yml

spring:
 cloud:
   stream:
     bindings:
       output:
         destination: muziwk-default
         contentType: text/plain

(4)测试发送消息

@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {
    @Autowired
    @Qualifier("output")
    MessageChannel output;
    @Override
    public void run(String... strings) throws Exception {
        //发送MQ消息
        output.send(MessageBuilder.withPayload("hello world").build());
   }
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
   }
}

消息消费者

(1)创建工程引入依赖

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

(2)定义bingding

同发送消息一致,在Spring Cloud Stream中接受消息,需要定义一个接口,如下是内置的一个接口。

public interface Sink {
    String INPUT = "input";
    @Input("input")
    SubscribableChannel input();
}

注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。 这就接口声明了一个 binding 命名为 “input” 。

(3)配置application.yml

spring:
 cloud:
   stream:
     bindings:
       input:
         destination: muziwk-default 

destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 muziwk-default

(4) 测试

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("监听收到:" + message.getPayload());
   }
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
   }
}


上一篇下一篇

猜你喜欢

热点阅读