2021-10-11_如何通过rabbitmq实现前后台的多系统
2021-10-13 本文已影响0人
kikop
20211011_如何通过rabbitmq实现前后台的多系统实时通信
1概述
本文基于SpringBoot(v2.2.1)介绍如何通过rabbitmq(v3.7.8)实现前后台的多系统实时通信,具体实现思路有2种方式,本节主要实现1.1.2。系统架构如下:
image-20211013223446871.png项目结构:
image-20211013220457366.png涉及的知识点如下:
- rabbitmq的生产投递
- rabbitmq消费手动确认应答及消费限流。
- rabbitmq的数据json序列化。
- aqs互斥锁及条件变量(Lock+Condition)的运用。
- 前端Web系统、后台系统A、后台系统B、rabbitmq的多系统的实时通信。
1.1实现思路
1.1.1rabbitmq整合websocket(未实现)
- 后台系统A,开启webSocket服务监听。
- 后台系统A开启对rabbitmq结果任务队列的消费监听。
- 前端Web系统作为websocket的客户端,连接webSocket服务(webSocket服务缓存所有连接的客户端),并进行消息订阅
- 前端Web系统发起请求到后台系统A后一直处于等待状态,后台系统A将请求发给rabbitmq服务中的请求任务队列。
- 后台系统B消费rabbitmq请求队列中的任务,消费完成后,将结果推送到rabbitmq中的结果任务队列。
- 后台系统A将监听到的结果,进行反推请求发起者,最终将结果通过webSocket推送到前端Web系统。
1.1.2rabbitmq整合aqs
- 后台系统A开启对rabbitmq结果任务队列的消费监听。
- 前端Web系统作为websocket的客户端,连接webSocket服务(webSocket服务缓存所有连接的客户端),并进行消息订阅
- 前端Web系统发起请求到后台系统A后一直处于等待状态,后台系统A以线程的方式将请求发给rabbitmq服务中的请求任务队列,同时进行future.get()同步阻塞等待,线程中通过Lock+condition实现条件等待await。
- 后台系统B消费rabbitmq请求队列中的任务,消费完成后,将结果推送到rabbitmq中的结果任务队列。
- 后台系统A将监听到的结果,进行反推请求发起者,唤醒指定的condition,从而将结束同步阻塞中的任务。
1.2时序错误场景分析
// lock对应以个condition可能引起的唤醒时序错误场景
1.前端Web系统发起请求任务1、前端Web系统发起请求任务2
2.rabbitmq请求任务队列:
请求任务1、请求任务2
2.处理完成后,结果任务队列可能的情况:
请求任务2、请求任务1
3.rabbitmq消费顺序:
消费任务2-->signal2WaitQueueBySequenceTo1(错误)
消费任务1-->signal2WaitQueueBySequenceTo2(错误)
2代码实现
2.1配置
2.1.1yml配置
spring:
profiles:
active: dev
# 配置 RabbitMQ的基本信息
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
# 生产投递机制(1.事务、2.Confirm、3.异步监听Return)
# 开启Exchange消息发送确认功能
publisher-confirm-type: correlated
# 开启Queue失败退回功能
publisher-returns: true
listener:
type: direct
direct:
# 手动确认
acknowledge-mode: manual
# basicQos
prefetch: 1
# 消息拒绝是否重写入队
default-requeue-rejected: true
# 重试配置
retry:
enabled: true
max-attempts: 3
server.port=8085
server.servlet.context-path=/myrabbitwebrequest
2.1.2生产端配置
package com.kikop.config;
import com.kikop.ConstRabbit;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: RabbitProducerConfig
* @desc mq生产端
* @date 2021/10/10
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@Configuration
public class RabbitProducerConfig implements InitializingBean {
@Autowired
public RabbitTemplate rabbitTemplate;
/**
* 设置一个简单的队列2
*/
@Bean(name = ConstRabbit.QUEUE_WEBREQUEST_DIRECT)
public Queue queue2() {
/*
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列,一直保留
* 参数5:队列其它参数
*/
return new Queue(ConstRabbit.QUEUE_WEBREQUEST_DIRECT,
true, false, false, null);
}
/**
* 设置一个简单的队列2
*/
@Bean(name = ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT)
public Exchange exchange2() {
return ExchangeBuilder.topicExchange(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT).durable(true).build();
}
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
@Bean
public Binding bindQueueExchange2(@Qualifier(ConstRabbit.QUEUE_WEBREQUEST_DIRECT) Queue queue,
@Qualifier(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT) Exchange exchange) {
// import org.springframework.amqp.core.Exchange;
return BindingBuilder.bind(queue).to(exchange).with(ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT).noargs();
}
/**
* 设置一个简单的队列2
*/
@Bean(name = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
public Queue queue3() {
/*
* 参数1:队列名称
* 参数2:是否定义持久化队列
* 参数3:是否独占本次连接
* 参数4:是否在不使用的时候自动删除队列,一直保留
* 参数5:队列其它参数
*/
return new Queue(ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT,
true, false, false, null);
}
/**
* 设置一个简单的队列2
*/
@Bean(name = ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT)
public Exchange exchange3() {
return ExchangeBuilder.topicExchange(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT).durable(true).build();
}
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
@Bean
public Binding bindQueueExchange3(@Qualifier(ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT) Queue queue,
@Qualifier(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT) Exchange exchange) {
// import org.springframework.amqp.core.Exchange;
return BindingBuilder.bind(queue).to(exchange).with(ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT)
.noargs();
}
// @Bean(name = "myMqCondition")
// public Condition myMqCondition(@Qualifier("myMqLock") ReentrantLock reentrantLock) {
// Condition condition = reentrantLock.newCondition();
// return condition;
// }
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Override
public void afterPropertiesSet() throws Exception {
// 生产端序列化
rabbitTemplate.setMessageConverter(converter());
}
}
2.1.3消费端配置
package com.kikop.config;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: RabbitConfig
* @desc mq消费端
* @date 2021/10/10
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@Configuration
public class RabbitConsumerConfig implements RabbitListenerConfigurer {
// 注意,引入包的类型
// org.springframework.amqp.rabbit.connection
// connectionFactory instance CachingConnectionFactory
@Autowired
public ConnectionFactory connectionFactory;
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
// 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
}
2.1.4独占锁配置
package com.kikop.config;
import com.kikop.ConstRabbit;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: ReentrantLockConfig
* @desc mq生产端
* @date 2021/10/10
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@Configuration
public class ReentrantLockConfig {
/**
* 向SpringIoc容器中注入可重入锁
* 一个任务一个条件对象Condition,每个Condition只关联一个等待节点(是不是很浪费,你们说呢)
* 主要为了解决:
* 生产、消费的数据不一致性,请求响应数据错乱问题
* @return
*/
@Bean(name = "myMqLock")
public ReentrantLock myMqLock() {
ReentrantLock myMqLock = new ReentrantLock();
return myMqLock;
}
}
2.1.4常量配置
package com.kikop;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Condition;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: ConstRabbit
* @desc 配置类
* @date 2021/10/10
* @time 16:59
* @by IDE: IntelliJ IDEA
*/
public class ConstRabbit {
// 正常交换机
public static final String EXCHANGE_WEBREQUEST_DIRECT = "ex_webrequest";
// 正常路由
public static final String ROUTINGKEY_WEBREQUEST_DIRECT = "rk_webrequest";
// 正常队列
public static final String QUEUE_WEBREQUEST_DIRECT = "queue_webrequest";
// 正常交换机
public static final String EXCHANGE_WEBREQUEST_DIRECT_RESULT = "ex_webrequest_result";
// 正常路由
public static final String ROUTINGKEY_WEBREQUEST_DIRECT_RESULT = "rk_webrequest_result";
// 正常队列
public static final String QUEUE_WEBREQUEST_DIRECT_RESULT = "queue_webrequest_result";
/**
* 缓存所有的条件对象
* 一个任务一个条件对象Condition,每个Condition只关联一个等待节点(是不是很浪费,你们说呢)
* 主要为了解决:
* 生产、消费的数据不一致性,请求响应数据错乱问题
*/
public static final ConcurrentHashMap<String, Condition> conditionCache = new ConcurrentHashMap<String, Condition>();
}
2.2消息请求体
package com.kikop.model;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: MqRequest
* @desc mq消息请求体(8:00,13:30,19:30,21:00)
* @date 2021/10/13
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
public class MqRequest {
public String reqId;
public String reqInfo;
public String getReqId() {
return reqId;
}
public void setReqId(String reqId) {
this.reqId = reqId;
}
public String getReqInfo() {
return reqInfo;
}
public void setReqInfo(String reqInfo) {
this.reqInfo = reqInfo;
}
}
2.3web层
package com.kikop.controller;
import com.alibaba.fastjson.JSONObject;
import com.kikop.ConstRabbit;
import com.kikop.handler.MyMqRequestTask;
import com.kikop.model.MqRequest;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.PreDestroy;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: OrderPayController
* @desc
* @date 2021/10/10
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@RestController
@RequestMapping("/orderpay")
public class OrderPayController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ReentrantLock myMqLock;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
/**
* communicateWithMq
*
* @return
*/
@RequestMapping(value = "communicateWithMq", method = {RequestMethod.GET, RequestMethod.POST})
@ResponseBody
public JSONObject communicateWithMq(String task_uuid) {
// http://localhost:8085/myrabbitwebrequest/aform/communicateWithMq?task_uuid=1
// 1.后台请求
System.out.println("------------开始后台系统A请求:" + task_uuid);
JSONObject result = new JSONObject();
result.put("success", false);
Future<String> stringFuture = executorService.submit(new MyMqRequestTask(
rabbitTemplate, myMqLock, task_uuid));
try {
String strMqResult = null;
// 2.同步等待后台处理,但执行时在其它系统中完成
strMqResult = stringFuture.get(5, TimeUnit.MINUTES);
result.put("data", strMqResult);
result.put("success", true);
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 3.gc释放
if (ConstRabbit.conditionCache.contains(task_uuid)) {
Condition currentCndition = ConstRabbit.conditionCache.remove(task_uuid);
if (null != currentCndition) {
currentCndition = null; // for gc
}
}
System.out.println("------------结束后台系统A请求:" + task_uuid);
return result;
}
/**
* communicateWithMq
* RequestBody作用:序列化json发送
*
* @return
*/
@RequestMapping(value = "communicateWithMqByReqObj", method = {RequestMethod.GET, RequestMethod.POST})
@ResponseBody
public JSONObject communicateWithMqByReqObj(@RequestBody MqRequest mqRequest) {
// 1.后台请求
String task_uuid = mqRequest.getReqId();
System.out.println("------------前端Web系统开始后台系统A请求:" + task_uuid);
JSONObject result = new JSONObject();
result.put("success", false);
Future<String> stringFuture = executorService.submit(new MyMqRequestTask(
rabbitTemplate, myMqLock, mqRequest));
try {
String strMqResult = null;
// 2.同步等待后台处理,但执行时在其它系统中完成
strMqResult = stringFuture.get(5, TimeUnit.MINUTES);
result.put("data", strMqResult);
result.put("success", true);
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 3.gc释放
if (ConstRabbit.conditionCache.contains(task_uuid)) {
Condition currentCndition = ConstRabbit.conditionCache.remove(task_uuid);
if (null != currentCndition) {
currentCndition = null; // for gc
}
}
System.out.println("------------前端Web系统结束后台系统A请求:" + task_uuid);
return result;
}
@PreDestroy
public void destroy() {
executorService.shutdown();
}
}
2.4业务线程
package com.kikop.handler;
import com.kikop.ConstRabbit;
import com.kikop.model.MqRequest;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: MyMqRequestTask
* @desc 线程池任务处理器(mq请求任务)
* @date 2021/10/10
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
public class MyMqRequestTask implements Callable<String> {
private RabbitTemplate rabbitTemplate;
private ReentrantLock myMqLock;
private Condition myMqCondition;
private MqRequest mqRequest;
public MyMqRequestTask(RabbitTemplate rabbitTemplate, ReentrantLock myMqLock, String task_uuid) {
}
public MyMqRequestTask(RabbitTemplate rabbitTemplate, ReentrantLock myMqLock, MqRequest mqRequest) {
this.rabbitTemplate = rabbitTemplate;
this.myMqLock = myMqLock;
Condition condition = this.myMqLock.newCondition();
this.mqRequest = mqRequest;
ConstRabbit.conditionCache.put(this.mqRequest.getReqId(), condition);
this.myMqCondition = condition;
}
@Override
public String call() throws Exception {
try {
System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,开始获取锁:" + this.mqRequest.getReqId());
myMqLock.lock();
System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,获取锁成功:" + this.mqRequest.getReqId());
// 发送的对象 Object:String类型
rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT,
ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT,
this.mqRequest.getReqId());
// 等待mq处理结果
System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,await等待结果:" + this.mqRequest.getReqId() + ",准备唤醒AQS节点中下一个锁");
myMqCondition.await(); // release aqs state,node-->condition queue
return "result_" + this.mqRequest.getReqId();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
System.out.println("------------业务线程开始释放锁:" + this.mqRequest.getReqId());
myMqLock.unlock(); // 释放锁
}
return "";
}
}
2.5mq消费端
package com.kikop.listener;
import com.fasterxml.jackson.databind.ser.std.RawSerializer;
import com.kikop.ConstRabbit;
import com.kikop.model.MqRequest;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: MyRabbitListener
* @desc
* @date 2021/10/10
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
@Slf4j
@Component
public class MyRabbitListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ReentrantLock myMqLock;
private Condition myMqCondition;
private static Random sleepRandom;
static {
sleepRandom = new Random(System.currentTimeMillis());
}
// 1.Java原生数据类型消费
// @RabbitListener(queues = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
// public void helloRabbitMq(Message message, Channel channel) throws IOException {
// 2.序列化Json数据消费
@RabbitHandler
@RabbitListener(queues = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
public void helloRabbitMq(Message message, @Payload MqRequest mqRequest, Channel channel) throws IOException {
MessageProperties messageProperties = message.getMessageProperties();
log.info(messageProperties.toString());
try {
// 1.队列结果
log.info(message.toString());
// body:payLoad负载
log.info(new String(message.getBody()));
byte[] messageBody = message.getBody();
// 这个task_uuid能是sync队列中的第一节点吗
// 不一定
String task_uuid = mqRequest.getReqId();
// 2.注意:
// 1.手动应答模式需要,消息中带:getDeliveryTag,用于重写投递
// listener:
// simple:
// # manual 手动确认
// acknowledge-mode: manual
// 2.报错信息
// Channel shutdown: channel error; protocol method: #method<channel.close>
// (reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1,
// class-id=60, method-id=80)
channel.basicAck(messageProperties.getDeliveryTag(), false);
// 3.推送处理结果到-->mq
// rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT,
// ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT,
// message.getBody());
// 模拟XXX系统处理耗时(理论上应该在XXX系统中)
// int sleepTimes = sleepRandom.nextInt(60) + 1;
// System.out.println("------------task_uuid:" + task_uuid + ",sleepTimes:" + sleepTimes);
// TimeUnit.SECONDS.sleep(sleepTimes);
// 4.条件变量通知
try {
System.out.println("------------Mq消费者(后台系统A中)解析XXX系统理结果,开始获取锁:" + task_uuid);
myMqLock.lock();
System.out.println("------------Mq消费者(后台系统A中)解析XXX系统理结果,获取锁成功:" + task_uuid);
System.out.println("------------Mq消费者解析XXX系统处理结果,激活信号:" + task_uuid);
// 按顺序唤醒条件队列中的节点(11,22),和task_uuid没有直接的关系绑定(22,11)
Condition condition = ConstRabbit.conditionCache.get(task_uuid);
this.myMqCondition = condition;
if (this.myMqCondition != null) {
myMqCondition.signal(); // node-->aqs
} else {
System.out.println("------------Mq消费者(后台系统A中)解析XXX系统处理结果,无效的条件:" + task_uuid);
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
System.out.println("------------Mq消费者(后台系统A中)开始释放锁:" + task_uuid);
myMqLock.unlock(); // 唤醒aqs节点
}
} catch (Exception ex) {
ex.printStackTrace();
// begin_手动消费方式是启用
if (messageProperties.getRedelivered()) {
// 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息
// 主要防止死循环消费
log.info("------------Mq消费者消息已重复处理失败,拒绝再次接收...");
// 拒绝消息
channel.basicReject(messageProperties.getDeliveryTag(), false);
} else {
log.info("------------Mq消费者消息即将再次返回队列处理...");
channel.basicNack(messageProperties.getDeliveryTag(), false, true);
}
// end_手动消费方式是启用
}
}
}
2.6启动类
package com.kikop;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitwebrequest
* @file Name: MyRabbitWebRequestApplication
* @desc
* @date 2021/10/11
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
// 默认组件扫描当前包空间
@SpringBootApplication
public class MyRabbitWebRequestApplication {
public static void main(String[] args) {
SpringApplication.run(MyRabbitWebRequestApplication.class, args);
}
}
2.7后台系统B
package com.kikop;
import com.alibaba.fastjson.JSONObject;
import com.kikop.model.MqRequest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.stream.IntStream;
/**
* @author kikop
* @version 1.0
* @project Name: myrabbitreliableproducer
* @file Name: ProducerTest
* @desc
* @date 2021/10/10
* @time 8:00
* @by IDE: IntelliJ IDEA
*/
//@SpringBootTest
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
public class RabbitWebRequestTest {
// 引入 SpringBootTest
// 模拟 rest请求
@Autowired
private TestRestTemplate testRestTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 传输字符串
* get请求
*/
@Test
public void testConsurrentSend() {
// http://localhost:8085/myrabbitwebrequest/aform/communicateWithMq?task_uuid=1
// 本机CPU为4核,同时并发最多4个
// IntStream.range(0, 3).parallel().forEach(i ->
// {
// MqRequest mqRequest = new MqRequest();
// mqRequest.setReqId(String.valueOf(i + 1));
// mqRequest.setReqInfo("info_" + String.valueOf(i + 1));
//
// JSONObject result = testRestTemplate.getForObject("/orderpay/communicateWithMqByReqObj?task_uuid={task_uuid}",
// JSONObject.class, mqRequest);
// System.out.println(result.toJSONString());
// }
// );
}
/**
* 传输字符串
* post请求
*/
@Test
public void testConsurrentSendByObj() {
// 本机CPU为4核,同时并发最多4个
MqRequest mqRequest = new MqRequest();
mqRequest.setReqId(String.valueOf(1));
mqRequest.setReqInfo("info_" + String.valueOf(1));
JSONObject result = testRestTemplate.postForObject("/orderpay/communicateWithMqByReqObj",
mqRequest,
JSONObject.class, "");
System.out.println(result.toJSONString());
}
@Test
public void testProcResult() {
MqRequest mqRequest = new MqRequest();
mqRequest.setReqId(String.valueOf(1));
mqRequest.setReqInfo("info_" + String.valueOf(1));
System.out.println("------------模拟后台系统B进行请求处理");
rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT,
ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT,
mqRequest);
}
}
2.8postman测试
image-20211013215913972.png image-20211013215947627.png// 后台系统A
2021-10-13 21:58:11,271 [INFO] [http-nio-8085-exec-1] [org.springframework.web.servlet.DispatcherServlet:547] [] Completed initialization in 5 ms
------------前端Web系统开始后台系统A请求:1
------------业务线程发送到rabbitmq服务中的请求队列,开始获取锁:1
------------业务线程发送到rabbitmq服务中的请求队列,获取锁成功:1
------------业务线程发送到rabbitmq服务中的请求队列,await等待结果:1,准备唤醒AQS节点中下一个锁
// 后台系统B
------------模拟后台系统B进行请求处理
// 后台系统A
2021-10-13 22:00:20,063 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:63] [] MessageProperties [headers={__TypeId__=com.kikop.model.MqRequest}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex_webrequest_result, receivedRoutingKey=rk_webrequest_result, deliveryTag=1, consumerTag=amq.ctag-j7dC0hzytf8DF2_SQ3Jhsw, consumerQueue=queue_webrequest_result]
2021-10-13 22:00:20,064 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:68] [] (Body:'{"reqId":"1","reqInfo":"info_1"}' MessageProperties [headers={__TypeId__=com.kikop.model.MqRequest}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex_webrequest_result, receivedRoutingKey=rk_webrequest_result, deliveryTag=1, consumerTag=amq.ctag-j7dC0hzytf8DF2_SQ3Jhsw, consumerQueue=queue_webrequest_result])
2021-10-13 22:00:20,064 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:70] [] {"reqId":"1","reqInfo":"info_1"}
------------Mq消费者(后台系统A中)解析XXX系统理结果,开始获取锁:1
------------Mq消费者(后台系统A中)解析XXX系统理结果,获取锁成功:1
------------Mq消费者解析XXX系统处理结果,激活信号:1
------------Mq消费者(后台系统A中)开始释放锁:1
------------业务线程开始释放锁:1
------------前端Web系统结束后台系统A请求:1