Sprin Cloud Stream rabbit实战
2022-10-12 本文已影响0人
Mr培
架构图
Binder
Binder
- Destination Binder (目标绑定器) :与消息中间件通信的组件
- Destination Bindings (目标绑定) : Binding是连接应用程序跟消息中间件的桥梁 ,用于消息的消费和生产,由binder创建
- Message(消息)
springboot整合stream之生产者
- 加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- 写注解,在启动类上加
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class StreamProducerMain {
public static void main(String[] args){
SpringApplication.run(StreamProducerMain.class,args);
}
}
- 写配置
server:
port: 8081
spring:
application:
name: cloud-stream-producer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: xypspExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
- 测试方法
/**
* @author rp
*/
@AllArgsConstructor
@RestController
@RequestMapping("/message")
public class MessageProviderController {
private final MessageProviderServer messageProviderServer;
/**
* 发送消息
* */
@GetMapping("/sendMessage")
public Result sendMessage(){
String uuid = messageProviderServer.send();
return Result.success(uuid);
}
}
package com.xypsp.springcloud.server;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author rp
* 定义消息的推送管道
*/
@EnableBinding(Source.class)
public class MessageProviderServer {
/**
* 消息发送管道
*/
@Resource
private MessageChannel output;
public String send() {
String serial = UUID.randomUUID().toString();
//发送延迟消息
// output.send(MessageBuilder.withPayload(serial).setHeader("x-delay",5000).build());
output.send(MessageBuilder.withPayload(serial).build());
return serial;
}
}
springboot整合stream之消费者A
- 加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- 写配置
server:
port: 8082
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: xypspExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: xypspGroup # 消息的持久化 Consumer断开连接,队列仍然存在,相同group的消费同一个queue的时候是轮询的方式,每个实例一条轮着消费,避免重复消费。
- 消费方法
package com.xypsp.springcloud.server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @author rp
* 定义消息的接收管道
*/
@Slf4j
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
log.info("接收到消息: {}",message.getPayload());
}
}
springboot整合stream之消费者B
- 加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
- 写配置
server:
port: 8083
spring:
application:
name: cloud-stream-consumer-b
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672 #查看rabbitmq Listening ports amqp 为 ip 端口为 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: xypspExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: xypspGroup # 消息的持久化 Consumer断开连接,队列仍然存在,相同group的消费同一个queue的时候是轮询的方式,每个实例一条轮着消费,避免重复消费。
- 消费方法
package com.xypsp.springcloud.server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @author rp
* 定义消息的接收管道
*/
@Slf4j
@Component
@EnableBinding(Sink.class)
public class MessageConsumerListener {
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
log.info("接收到消息: {}",message.getPayload());
}
}