RabbitMq案例实战:RabbitMQ的TTL以及死信队列_

2021-03-10  本文已影响0人  jianyi1000

一、题目要求

基于RabbitMQ的TTL以及死信队列,使用SpringBoot实现延迟付款,手动补偿操作。

1、用户下单后展示等待付款页面

2、在页面上点击付款的按钮,如果不超时,则跳转到付款成功页面

3、如果超时,则跳转到用户历史账单中查看因付款超时而取消的订单。

二、思路分析

2.1 架构设计

image.png

2.2 前置准备

安装rabbitmq

2.3 使用延迟队列

在AMQP协议和RabbitMQ中都没有相关的规定和实现。不过,我们似乎可以借助rabbitmq中的“死信队列”来变相的实现。

可以使用rabbitmq_delayed_message_exchange插件实现。

需要在虚拟机运行如下命令

rabbitmq-plugins enable rabbitmq_management

2.4 源码分析

pom.xml

<dependencies>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-thymeleaf</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-test</artifactId>
 <scope>test</scope>
 </dependency>
 <dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit-test</artifactId>
 <scope>test</scope>
 </dependency>
 <!--lombok工具-->
 <dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <version>1.18.4</version>
 <scope>provided</scope>
 </dependency>
 </dependencies>

配置文件application.properties

spring.application.name=payDemo
spring.rabbitmq.host=192.168.31.204
spring.rabbitmq.virtual-host=test
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672

设置收到确认消息

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.direct.acknowledge-mode=manual

spring.thymeleaf.prefix=classpath:/templates/
spring.thymeleaf.check-template-location=true
spring.thymeleaf.suffix=.html</pre>

此处用的virtual-host是test ,也可用默认的spring.rabbitmq.virtual-host=/

配置类RabbitMqConfig.java

@Configuration
@EnableRabbit
@ComponentScan("com.idstaa")
public class RabbitMqConfig {
 @Bean
 /**
 * 订单消息队列
 */
 public Queue orderQueue() {
 return QueueBuilder.durable("q.order").build();
 }

 @Bean
 /**
 * 订单消息队列
 */
 public Queue ttlQueue() {
 Map<String, Object> args = new HashMap<>();
 args.put("x-message-ttl", 10000);
 args.put("x-dead-letter-exchange", "ex.dlx");
 args.put("x-dead-letter-routing-key", "key.dlx");
 return new Queue("ttl.order", true, false, false, args);
 }

 /**
 * 死信队列,用于取消用户订单
 */
 @Bean
 public Queue dlxQueue() {
 Map<String, Object> args = new HashMap<>();
 return new Queue("q.dlx", true, false, false, args);
 }

 /**
 * 订单交换器
 */
 @Bean
 public Exchange orderExchange() {
 Map<String, Object> args = new HashMap<>();
 DirectExchange exchange = new DirectExchange("ex.order",
 true,
 false,
 args);
 return exchange;
 }

 /**
 * ttl交换器
 */
 @Bean
 public Exchange ttlExchange() {
 Map<String, Object> args = new HashMap<>();
 DirectExchange exchange = new DirectExchange("ex.ttl",
 true,
 false,
 args);
 return exchange;
 }

 /**
 * 订单交换器
 */
 @Bean
 public Exchange dlxExchange() {
 Map<String, Object> args = new HashMap<>();
 DirectExchange exchange = new DirectExchange("ex.dlx",
 true,
 false,
 args);
 return exchange;
 }

 /**
 * 用于发送下单,做分布式事务的MQ
 */
 @Bean
 public Binding orderBinding() {
 return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("key.order").noargs();
 }

 /**
 * 用于等待用户支付的延迟队列绑定
 */
 @Bean
 public Binding ttlBinding() {
 return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("key.ttl").noargs();
 }


 /**
 * 用于支付超时取消用户订单的死信队列绑定
 */
 @Bean
 public Binding dlxBinding() {
 return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("key.dlx").noargs();
 }

 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
 return new RabbitAdmin(connectionFactory);
 }

 @Bean(name="rabbitMessageListenerContainer")
 public DirectMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory){
 DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 container.setPrefetchCount(5);
 container.setConsumersPerQueue(5);
 container.setMessagesPerAck(1);

 ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
 taskExecutor.setCorePoolSize(10);
 taskExecutor.setMaxPoolSize(20);
 // 设置改属性,灵活设置并发
 container.setTaskExecutor(taskExecutor);
 return container;
 }

 @Bean
 public MessageConverter messageConverter(){
 return new Jackson2JsonMessageConverter();
 }


}

订单控制器类OrderController.java

@Controller
public class OrderController {
 @Autowired
 private RabbitTemplate rabbitTemplate;

 @RequestMapping("/createOrder")
 public String createOrder(Model model) throws ExecutionException, InterruptedException {
 Order order = new Order();
 order.setOrderId(UUID.randomUUID().toString().substring(0,10));
 order.setStatus("待支付");
 order.setUserId("jianyi");
 OrderDetail detail = new OrderDetail();
 detail.setItemId(UUID.randomUUID().toString().substring(0,5));
 detail.setItemName("");
 detail.setItemPrice(100d);
 detail.setNum(2);
 ArrayList detailList = new ArrayList();
 detailList.add(detail);
 order.setDetail(detailList);
 CorrelationData correlationData = new CorrelationData();
 rabbitTemplate.convertAndSend("ex.order",
 "key.order",
 order,
 correlationData);
 CorrelationData.Confirm confirm = correlationData.getFuture().get();
 boolean ack = confirm.isAck();
 if(!ack){
 return "failOrder";
 }
 System.out.println("发送延迟取消信息,10s不支付就取消"+",当前时间"+ new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date()));
 rabbitTemplate.convertAndSend("ex.ttl","key.ttl",order.getOrderId());
 model.addAttribute("orderId",order.getOrderId());
 return "order";
 }

 @RequestMapping("/failOrder/{orderId}")
 public String failOrder(@PathVariable  String orderId,Model model) throws ExecutionException, InterruptedException {
 // 修改订单状态
 System.out.println(orderId);
 model.addAttribute("orderId",orderId);
 return "fail";
 }

 @RequestMapping("/pay")
 public String pay(String orderId,Model model) throws ExecutionException, InterruptedException {
 // 修改订单状态
 System.out.println(orderId+"订单状态为已支付");
 model.addAttribute("orderId",orderId);
 return "success";
 }

 @RequestMapping("/cancelOrderView")
 public String cancelOrderView(String orderId,Model model) throws ExecutionException, InterruptedException {
 // 修改订单状态
 System.out.println(orderId+"订单状态为已取消");
 model.addAttribute("orderId",orderId);
 return "cancelOrderView";
 }
}

订单实体类Order.java OrderDetail.java

@Data
public class Order {
 private String orderId;

 private String userId;

 private String status;

 private ArrayList<OrderDetail> detail;
}

@Data
public class OrderDetail {
 private String itemId;

 private String itemName;

 private double itemPrice;

 private int num;
}

静态页面order.html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
 <meta name="viewport" content="width=device-width, initial-scale=1,shrink-to-fit=no">
 <title>支付界面</title>
 <link th:href="@{/css/bootstrap.min.css}" rel="stylesheet">
 <link th:href="@{/css/signin.css}" rel="stylesheet">
 <script src="../static/js/jquery-1.10.2.js"></script>
</head>
<script language="javascript">
 var num = 9; //倒计时的秒数
 var URL = "/cancelOrderView?orderId=[[${orderId}]]";
 var id = window.setInterval('doUpdate()', 1000);
 function doUpdate() {
 document.getElementById('page_div').innerHTML = '支付时间还剩'+num+'秒' ;
 if(num == 0) {
 window.clearInterval(id);
 window.location = URL;
 }
 num --;
 }
</script>
<body>
<div  th:text="${orderId}"></div>
<div>订单创建成功</div>
<div class="time"> <p id="page_div">支付时间还剩10s,请尽快支付</p>
<a th:href="@{/pay/(orderId=${orderId})}">去支付</a>
</div>
</body>
</html>

其他静态页面

cancelOrderView.html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
 <meta name="viewport" content="width=device-width, initial-scale=1,shrink-to-fit=no">
 <title>取消订单</title>
 <link th:href="@{/css/bootstrap.min.css}" rel="stylesheet">
 <link th:href="@{/css/signin.css}" rel="stylesheet">
 <script src="../static/js/jquery-1.10.2.js"></script>
</head>
<body>
<div  th:text="${orderId}"></div>订单已取消
</body>
</html>

failOrder.html

<!DOCTYPE html>
<html lang="en">
<head>
 <meta charset="UTF-8">
 <title>Title</title>
</head>
<body>
 failorder
</body>
</html>

index.html

<!DOCTYPE html>
<html lang="en">
<head>
 <meta charset="UTF-8">
 <title>Title</title>
</head>
<body>
 <a href="/createOrder">下单</a>
</body>
</html>

success.html

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
 <meta name="viewport" content="width=device-width, initial-scale=1,shrink-to-fit=no">
 <title>取消订单</title>
 <link th:href="@{/css/bootstrap.min.css}" rel="stylesheet">
 <link th:href="@{/css/signin.css}" rel="stylesheet">
 <script src="../static/js/jquery-1.10.2.js"></script>
</head>
<body>
<div  th:text="${orderId}"></div>支付成功
</body>
</html>

三、效果演示

前置准备,启动rabbitmq。设置Virtualhost = test 的虚机

1、启动spring项目。访问页面

image.png

2、点击下单按钮。不支付

image.png
image.png

页面倒计时,查看后台日志

image.png

10s后自动取消订单,并跳转订单取消页面

image.png

3、点击下单按钮。支付

image.png

查看后台日志

image.png

4、也可查看rabbitmq队列的变化,可自行查看

image.png

附件:github源码

https://github.com/xjdm/rabbitmq_paydemo
上一篇下一篇

猜你喜欢

热点阅读