SpringCloud集群整合Rabbitmq、延迟队列、Str
2021-05-31 本文已影响0人
笔记本一号
代码github地址:https://github.com/tzb1017432592/springcloud-lean
SpringCloud高可用集群的搭建在我之前的博客已经写有,我这里启动了三个服务中心、三个服务网关、两个服务提供客户端、一个消费端、一个nginx服务,在此集群之上我们整合rabbitmq,这里我会启动一个mq的生产者端和mq消费者端
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
yml配置,eureka-rabbitmq-producer是服务的生产者端,eureka-rabbitmq-consumer是服务的消费者端,
server:
port: 8951
eureka:
client:
service-url:
defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
application:
name: eureka-rabbitmq-producer
profiles: mq01
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: cloudtest
---
server:
port: 8952
eureka:
client:
service-url:
defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
application:
name: eureka-rabbitmq-consumer
profiles: mq02
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: cloudtest
配置好idea启动的参数,根据不同的参数启动不同的环境
代码创建服务生产者端的exchange、queue、binding、routingkey
@Configuration
public class RabbitmqConfig {
public final static String EXCHANGE_TEST = "exchange_test";
public final static String QUEUE_TEST = "queue_test";
public final static String ROUTINGKEY_TEST = "cloudtest.*";
@Bean(EXCHANGE_TEST)
@Profile(value = "mq01")
public Exchange exchange() {
return ExchangeBuilder
.topicExchange(EXCHANGE_TEST)
.durable(true)
.build();
}
@Bean(QUEUE_TEST)
@Profile(value = "mq01")
public Queue queue() {
return new Queue(QUEUE_TEST);
}
@Bean
@Profile(value = "mq01")
public Binding binding(
@Qualifier(EXCHANGE_TEST) Exchange exchange
, @Qualifier(QUEUE_TEST) Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ROUTINGKEY_TEST)
.noargs();
}
}
发送消息
@RestController
@RequestMapping("/rabbitmq/producer/")
@Profile(value = "mq01")
public class TestController {
public final static String ROUTING_KEY1 = "cloudtest.test1";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("producer1/{message}")
public String producer1(@PathVariable("message") String message) {
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TEST, ROUTING_KEY1, message);
return "success";
}
}
服务消费者端监听队列
@Component
@Profile("mq02")
@Slf4j
public class RabbitmqConsumer {
@RabbitListener(queues = RabbitmqConfig.QUEUE_TEST)
public void Consumer1(String payload, Message message){
log.info("payload:【{}】,message:【{}】",payload,message.getMessageProperties().toString());
}
}
配置好网关
server:
port: 8851
eureka:
client:
service-url:
defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
application:
name: eureka-gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: eureka-client01
uri: lb://eureka-client01
predicates:
- Path=/client01/**
- id: eureka-consumer
uri: lb://eureka-consumer
predicates:
- Path=/consumer/**
- id: eureka-rabbitmq-producer
uri: lb://eureka-rabbitmq-producer
predicates:
- Path=/rabbitmq/producer/**
- id: eureka-rabbitmq-consumer
uri: lb://eureka-rabbitmq-consumer
predicates:
- Path=/rabbitmq/**
profiles: gw01
启动服务,服务已经启动好了
发送消息
延迟队列
延迟队列有许多应用场景如:自动收货、自动取消订单、自动发布文章等,实现延迟队列需要下载插件
下载地址:https://www.rabbitmq.com/community-plugins.html
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
代码
@Configuration
public class RabbitmqDelayConfig {
public final static String DELAY_EXCHANGE_TEST = "delay_exchange_test";
public final static String DELAY_QUEUE_TEST = "delay_queue_test";
public final static String ROUTINGKEY_DELAY = "delay_cloudtest.*";
@Bean(DELAY_EXCHANGE_TEST)
@Profile(value = "mq01")
public Exchange exchange(){
return ExchangeBuilder
.topicExchange(DELAY_EXCHANGE_TEST)
.delayed() // 开启支持延迟消息
.durable(true)
.build();
}
// 创建队列
@Bean(DELAY_QUEUE_TEST)
@Profile(value = "mq01")
public Queue queue(){
return new Queue(DELAY_QUEUE_TEST);
}
// 队列绑定交换机
@Bean
@Profile(value = "mq01")
public Binding delayBinding(
@Qualifier(DELAY_QUEUE_TEST) Queue queue,
@Qualifier(DELAY_EXCHANGE_TEST) Exchange exchange){
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ROUTINGKEY_DELAY)
.noargs();
}
}
发送延迟消息
@RestController
@RequestMapping("/rabbitmq/producer")
@Profile("mq01")
@Slf4j
public class TestController {
public final static String ROUTING_KEY1 = "cloudtest.test1";
public final static String DELAY_ROUTING_KEY1 = "delay_cloudtest.test1";
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("delayproducer1/{message}")
public String delayproducer1(@PathVariable("message") String message) {
rabbitTemplate.convertAndSend(
RabbitmqDelayConfig.DELAY_EXCHANGE_TEST
, DELAY_ROUTING_KEY1
, message
, m -> {
// 设置消息的持久
m.getMessageProperties()
.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息延迟的时间,单位ms毫秒
m.getMessageProperties()
.setDelay(5000);
return m;
});
log.info("发送延迟消息成功【{}】",new Date());
return "success";
}
}
接收延迟消息
@Component
@Profile("mq02")
@Slf4j
public class RabbitmqConsumer {
@RabbitListener(queues = RabbitmqDelayConfig.DELAY_QUEUE_TEST)
public void Consumer2(String payload, Message message){
log.info("payload:【{}】,message:【{}】",payload,message.getMessageProperties().toString());
log.info("接收延迟消息成功【{}】",new Date());
}
}
访问延迟队列发送接口
我们代码中设置的延迟队列是5秒,时间正好相差5秒
SpringStream
引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
生产者端
生产者端 yml
server:
port: 8951
eureka:
client:
service-url:
defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
application:
name: eureka-rabbitmq-producer
profiles: mq01
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: cloudtest
cloud:
stream:
bindings:
#定义生产者通道
myOutput:
#定义交换机名称
destination: stream_exchange
代码
定义生产者通道
@Component
@Profile("mq01")
public interface OutputStreamChannel {
String OUTPUT="myOutput";
@Output(OUTPUT)
MessageChannel output();
}
定义生产者生产消息逻辑
@Service
@EnableBinding(OutputStreamChannel.class)
@Profile("mq01")
@Slf4j
public class ProducerStreamServiceImpl implements ProducerStreamService {
@Autowired
private OutputStreamChannel outputStreamChannel;
@Override
public boolean rabbitMQSend(String msg) {
return outputStreamChannel
.output()
.send(MessageBuilder.withPayload(msg).build());
}
}
生产者控制类
@RestController
@RequestMapping("/rabbitmq/producer")
@Profile("mq01")
@Slf4j
public class TestController {
@Resource
private ProducerStreamService producerStreamService;
@GetMapping("streamproducer1/{message}")
public String streamproducer1(@PathVariable("message") String message) {
return producerStreamService.rabbitMQSend(message)?"success":"fail";
}
}
消费者端
yml
server:
port: 8952
eureka:
client:
service-url:
defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
application:
name: eureka-rabbitmq-consumer
profiles: mq02
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: cloudtest
cloud:
stream:
bindings:
#定义消费者通道
myInput:
#定义交换机名称
destination: stream_exchange
消费者通道
@Component
@Profile("mq02")
public interface InputStreamChannel {
String INPUT="myInput";
@Input(INPUT)
SubscribableChannel input();
}
定义消费者消费消息逻辑
@Service
@EnableBinding(InputStreamChannel.class)
@Profile("mq02")
@Slf4j
public class ConsumerStreamServiceImpl implements ConsumerStreamService {
@Override
@StreamListener(InputStreamChannel.INPUT)
public void rabbitMQreceive(String msg) {
log.info("stream消费到的消息:【{}】",msg);
}
}
启动项目发送消息
消息成功发送,并且已经被消费到