2021-10-11_如何通过rabbitmq实现前后台的多系统

2021-10-13  本文已影响0人  kikop

20211011_如何通过rabbitmq实现前后台的多系统实时通信

1概述

本文基于SpringBoot(v2.2.1)介绍如何通过rabbitmq(v3.7.8)实现前后台的多系统实时通信,具体实现思路有2种方式,本节主要实现1.1.2。系统架构如下:

image-20211013223446871.png

项目结构:

image-20211013220457366.png

涉及的知识点如下:

  1. rabbitmq的生产投递
  2. rabbitmq消费手动确认应答及消费限流。
  3. rabbitmq的数据json序列化。
  4. aqs互斥锁及条件变量(Lock+Condition)的运用。
  5. 前端Web系统、后台系统A、后台系统B、rabbitmq的多系统的实时通信。

1.1实现思路

1.1.1rabbitmq整合websocket(未实现)

  1. 后台系统A,开启webSocket服务监听。
  2. 后台系统A开启对rabbitmq结果任务队列的消费监听。
  3. 前端Web系统作为websocket的客户端,连接webSocket服务(webSocket服务缓存所有连接的客户端),并进行消息订阅
  4. 前端Web系统发起请求到后台系统A后一直处于等待状态,后台系统A将请求发给rabbitmq服务中的请求任务队列。
  5. 后台系统B消费rabbitmq请求队列中的任务,消费完成后,将结果推送到rabbitmq中的结果任务队列。
  6. 后台系统A将监听到的结果,进行反推请求发起者,最终将结果通过webSocket推送到前端Web系统。

1.1.2rabbitmq整合aqs

  1. 后台系统A开启对rabbitmq结果任务队列的消费监听。
  2. 前端Web系统作为websocket的客户端,连接webSocket服务(webSocket服务缓存所有连接的客户端),并进行消息订阅
  3. 前端Web系统发起请求到后台系统A后一直处于等待状态,后台系统A以线程的方式将请求发给rabbitmq服务中的请求任务队列,同时进行future.get()同步阻塞等待,线程中通过Lock+condition实现条件等待await。
  4. 后台系统B消费rabbitmq请求队列中的任务,消费完成后,将结果推送到rabbitmq中的结果任务队列。
  5. 后台系统A将监听到的结果,进行反推请求发起者,唤醒指定的condition,从而将结束同步阻塞中的任务。

1.2时序错误场景分析

// lock对应以个condition可能引起的唤醒时序错误场景
1.前端Web系统发起请求任务1、前端Web系统发起请求任务2
2.rabbitmq请求任务队列:
请求任务1、请求任务2
2.处理完成后,结果任务队列可能的情况:
请求任务2、请求任务1
3.rabbitmq消费顺序:
消费任务2-->signal2WaitQueueBySequenceTo1(错误)
消费任务1-->signal2WaitQueueBySequenceTo2(错误)

2代码实现

2.1配置

2.1.1yml配置

spring:
 profiles:
  active: dev

# 配置 RabbitMQ的基本信息
 rabbitmq:
  host: 127.0.0.1
  port: 5672
  username: guest
  password: guest
  virtual-host: /
#  生产投递机制(1.事务、2.Confirm、3.异步监听Return)
  # 开启Exchange消息发送确认功能
  publisher-confirm-type: correlated
  # 开启Queue失败退回功能
  publisher-returns: true
  listener:
    type: direct
    direct:
#    手动确认
      acknowledge-mode: manual
#      basicQos
      prefetch: 1
#      消息拒绝是否重写入队
      default-requeue-rejected: true
#      重试配置
      retry:
        enabled: true
        max-attempts: 3
server.port=8085
server.servlet.context-path=/myrabbitwebrequest

2.1.2生产端配置

package com.kikop.config;

import com.kikop.ConstRabbit;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.locks.ReentrantLock;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: RabbitProducerConfig
 * @desc mq生产端
 * @date 2021/10/10
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */
@Configuration
public class RabbitProducerConfig implements InitializingBean {


    @Autowired
    public RabbitTemplate rabbitTemplate;


    /**
     * 设置一个简单的队列2
     */
    @Bean(name = ConstRabbit.QUEUE_WEBREQUEST_DIRECT)
    public Queue queue2() {

        /*
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列,一直保留
         * 参数5:队列其它参数
         */
        return new Queue(ConstRabbit.QUEUE_WEBREQUEST_DIRECT,
                true, false, false, null);
    }


    /**
     * 设置一个简单的队列2
     */
    @Bean(name = ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT)
    public Exchange exchange2() {

        return ExchangeBuilder.topicExchange(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT).durable(true).build();
    }


    /*
        1. 知道哪个队列
        2. 知道哪个交换机
        3. routing key
     */
    @Bean
    public Binding bindQueueExchange2(@Qualifier(ConstRabbit.QUEUE_WEBREQUEST_DIRECT) Queue queue,
                                      @Qualifier(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT) Exchange exchange) {
        // import org.springframework.amqp.core.Exchange;
        return BindingBuilder.bind(queue).to(exchange).with(ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT).noargs();
    }


    /**
     * 设置一个简单的队列2
     */
    @Bean(name = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
    public Queue queue3() {

        /*
         * 参数1:队列名称
         * 参数2:是否定义持久化队列
         * 参数3:是否独占本次连接
         * 参数4:是否在不使用的时候自动删除队列,一直保留
         * 参数5:队列其它参数
         */
        return new Queue(ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT,
                true, false, false, null);
    }


    /**
     * 设置一个简单的队列2
     */
    @Bean(name = ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT)
    public Exchange exchange3() {

        return ExchangeBuilder.topicExchange(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT).durable(true).build();
    }

    /*
        1. 知道哪个队列
        2. 知道哪个交换机
        3. routing key
     */
    @Bean
    public Binding bindQueueExchange3(@Qualifier(ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT) Queue queue,
                                      @Qualifier(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT) Exchange exchange) {
        // import org.springframework.amqp.core.Exchange;
        return BindingBuilder.bind(queue).to(exchange).with(ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT)
                .noargs();
    }




//    @Bean(name = "myMqCondition")
//    public Condition myMqCondition(@Qualifier("myMqLock") ReentrantLock reentrantLock) {
//        Condition condition = reentrantLock.newCondition();
//        return condition;
//    }


    @Bean
    public Jackson2JsonMessageConverter converter() {
        return new Jackson2JsonMessageConverter();
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        // 生产端序列化
        rabbitTemplate.setMessageConverter(converter());
    }
}

2.1.3消费端配置

package com.kikop.config;

import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: RabbitConfig
 * @desc mq消费端
 * @date 2021/10/10
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */
@Configuration
public class RabbitConsumerConfig implements RabbitListenerConfigurer {


    // 注意,引入包的类型
    // org.springframework.amqp.rabbit.connection
//    connectionFactory instance CachingConnectionFactory
    @Autowired
    public ConnectionFactory connectionFactory;


    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {

        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        // 这里的转换器设置实现了 通过 @Payload 注解 自动反序列化message body
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

}

2.1.4独占锁配置

package com.kikop.config;

import com.kikop.ConstRabbit;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.locks.ReentrantLock;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: ReentrantLockConfig
 * @desc mq生产端
 * @date 2021/10/10
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */
@Configuration
public class ReentrantLockConfig  {

    /**
     * 向SpringIoc容器中注入可重入锁
     * 一个任务一个条件对象Condition,每个Condition只关联一个等待节点(是不是很浪费,你们说呢)
     * 主要为了解决:
     * 生产、消费的数据不一致性,请求响应数据错乱问题
     * @return
     */
    @Bean(name = "myMqLock")
    public ReentrantLock myMqLock() {
        ReentrantLock myMqLock = new ReentrantLock();
        return myMqLock;
    }

}

2.1.4常量配置

package com.kikop;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Condition;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: ConstRabbit
 * @desc 配置类
 * @date 2021/10/10
 * @time 16:59
 * @by IDE: IntelliJ IDEA
 */
public class ConstRabbit {

    // 正常交换机
    public static final String EXCHANGE_WEBREQUEST_DIRECT = "ex_webrequest";
    // 正常路由
    public static final String ROUTINGKEY_WEBREQUEST_DIRECT = "rk_webrequest";
    // 正常队列
    public static final String QUEUE_WEBREQUEST_DIRECT = "queue_webrequest";

    // 正常交换机
    public static final String EXCHANGE_WEBREQUEST_DIRECT_RESULT = "ex_webrequest_result";
    // 正常路由
    public static final String ROUTINGKEY_WEBREQUEST_DIRECT_RESULT = "rk_webrequest_result";
    // 正常队列
    public static final String QUEUE_WEBREQUEST_DIRECT_RESULT = "queue_webrequest_result";

    /**
     * 缓存所有的条件对象
     * 一个任务一个条件对象Condition,每个Condition只关联一个等待节点(是不是很浪费,你们说呢)
     * 主要为了解决:
     * 生产、消费的数据不一致性,请求响应数据错乱问题
     */
    public static final ConcurrentHashMap<String, Condition> conditionCache = new ConcurrentHashMap<String, Condition>();
}

2.2消息请求体

package com.kikop.model;


/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: MqRequest
 * @desc mq消息请求体(8:00,13:30,19:30,21:00)
 * @date 2021/10/13
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */
public class MqRequest {
    public String reqId;
    public String reqInfo;

    public String getReqId() {
        return reqId;
    }

    public void setReqId(String reqId) {
        this.reqId = reqId;
    }

    public String getReqInfo() {
        return reqInfo;
    }

    public void setReqInfo(String reqInfo) {
        this.reqInfo = reqInfo;
    }
}

2.3web层

package com.kikop.controller;


import com.alibaba.fastjson.JSONObject;
import com.kikop.ConstRabbit;
import com.kikop.handler.MyMqRequestTask;
import com.kikop.model.MqRequest;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import javax.annotation.PreDestroy;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: OrderPayController
 * @desc
 * @date 2021/10/10
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */
@RestController
@RequestMapping("/orderpay")
public class OrderPayController {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Autowired
    private ReentrantLock myMqLock;


    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    /**
     * communicateWithMq
     *
     * @return
     */
    @RequestMapping(value = "communicateWithMq", method = {RequestMethod.GET, RequestMethod.POST})
    @ResponseBody
    public JSONObject communicateWithMq(String task_uuid) {

        // http://localhost:8085/myrabbitwebrequest/aform/communicateWithMq?task_uuid=1

        // 1.后台请求
        System.out.println("------------开始后台系统A请求:" + task_uuid);

        JSONObject result = new JSONObject();
        result.put("success", false);

        Future<String> stringFuture = executorService.submit(new MyMqRequestTask(
                rabbitTemplate, myMqLock, task_uuid));
        try {
            String strMqResult = null;
            // 2.同步等待后台处理,但执行时在其它系统中完成
            strMqResult = stringFuture.get(5, TimeUnit.MINUTES);
            result.put("data", strMqResult);
            result.put("success", true);

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        // 3.gc释放
        if (ConstRabbit.conditionCache.contains(task_uuid)) {
            Condition currentCndition = ConstRabbit.conditionCache.remove(task_uuid);
            if (null != currentCndition) {
                currentCndition = null; // for gc
            }
        }
        System.out.println("------------结束后台系统A请求:" + task_uuid);
        return result;
    }


    /**
     * communicateWithMq
     * RequestBody作用:序列化json发送
     *
     * @return
     */
    @RequestMapping(value = "communicateWithMqByReqObj", method = {RequestMethod.GET, RequestMethod.POST})
    @ResponseBody
    public JSONObject communicateWithMqByReqObj(@RequestBody MqRequest mqRequest) {

        // 1.后台请求
        String task_uuid = mqRequest.getReqId();
        System.out.println("------------前端Web系统开始后台系统A请求:" + task_uuid);
        JSONObject result = new JSONObject();
        result.put("success", false);

        Future<String> stringFuture = executorService.submit(new MyMqRequestTask(
                rabbitTemplate, myMqLock, mqRequest));
        try {
            String strMqResult = null;

            // 2.同步等待后台处理,但执行时在其它系统中完成
            strMqResult = stringFuture.get(5, TimeUnit.MINUTES);
            result.put("data", strMqResult);
            result.put("success", true);

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        // 3.gc释放
        if (ConstRabbit.conditionCache.contains(task_uuid)) {
            Condition currentCndition = ConstRabbit.conditionCache.remove(task_uuid);
            if (null != currentCndition) {
                currentCndition = null; // for gc
            }
        }
        System.out.println("------------前端Web系统结束后台系统A请求:" + task_uuid);
        return result;
    }

    @PreDestroy
    public void destroy() {
        executorService.shutdown();
    }

}

2.4业务线程

package com.kikop.handler;

import com.kikop.ConstRabbit;
import com.kikop.model.MqRequest;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import java.util.concurrent.Callable;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: MyMqRequestTask
 * @desc 线程池任务处理器(mq请求任务)
 * @date 2021/10/10
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */
public class MyMqRequestTask implements Callable<String> {


    private RabbitTemplate rabbitTemplate;


    private ReentrantLock myMqLock;


    private Condition myMqCondition;

    private MqRequest mqRequest;

    public MyMqRequestTask(RabbitTemplate rabbitTemplate, ReentrantLock myMqLock, String task_uuid) {
    }

    public MyMqRequestTask(RabbitTemplate rabbitTemplate, ReentrantLock myMqLock, MqRequest mqRequest) {
        this.rabbitTemplate = rabbitTemplate;
        this.myMqLock = myMqLock;
        Condition condition = this.myMqLock.newCondition();
        this.mqRequest = mqRequest;
        ConstRabbit.conditionCache.put(this.mqRequest.getReqId(), condition);
        this.myMqCondition = condition;
    }

    @Override
    public String call() throws Exception {

        try {
            System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,开始获取锁:" + this.mqRequest.getReqId());
            myMqLock.lock();

            System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,获取锁成功:" + this.mqRequest.getReqId());



            // 发送的对象 Object:String类型
            rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT,
                    ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT,
                    this.mqRequest.getReqId());

            // 等待mq处理结果
            System.out.println("------------业务线程发送到rabbitmq服务中的请求队列,await等待结果:" + this.mqRequest.getReqId() + ",准备唤醒AQS节点中下一个锁");
            myMqCondition.await(); // release aqs state,node-->condition queue

            return "result_" + this.mqRequest.getReqId();

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            System.out.println("------------业务线程开始释放锁:" + this.mqRequest.getReqId());
            myMqLock.unlock(); // 释放锁
        }
        return "";
    }
}

2.5mq消费端

package com.kikop.listener;

import com.fasterxml.jackson.databind.ser.std.RawSerializer;
import com.kikop.ConstRabbit;
import com.kikop.model.MqRequest;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: MyRabbitListener
 * @desc
 * @date 2021/10/10
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */
@Slf4j
@Component
public class MyRabbitListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ReentrantLock myMqLock;

    private Condition myMqCondition;

    private static Random sleepRandom;

    static {
        sleepRandom = new Random(System.currentTimeMillis());
    }

// 1.Java原生数据类型消费
//    @RabbitListener(queues = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
//    public void helloRabbitMq(Message message, Channel channel) throws IOException {

    // 2.序列化Json数据消费
    @RabbitHandler
    @RabbitListener(queues = ConstRabbit.QUEUE_WEBREQUEST_DIRECT_RESULT)
    public void helloRabbitMq(Message message, @Payload MqRequest mqRequest, Channel channel) throws IOException {

        MessageProperties messageProperties = message.getMessageProperties();
        log.info(messageProperties.toString());

        try {

            // 1.队列结果
            log.info(message.toString());
            // body:payLoad负载
            log.info(new String(message.getBody()));
            byte[] messageBody = message.getBody();
            // 这个task_uuid能是sync队列中的第一节点吗
            // 不一定
            String task_uuid = mqRequest.getReqId();

            // 2.注意:
            // 1.手动应答模式需要,消息中带:getDeliveryTag,用于重写投递
            //            listener:
            //            simple:
            //            # manual 手动确认
            //                acknowledge-mode: manual

            // 2.报错信息
            //    Channel shutdown: channel error; protocol method: #method<channel.close>
            //    (reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1,
            //        class-id=60, method-id=80)
            channel.basicAck(messageProperties.getDeliveryTag(), false);


            // 3.推送处理结果到-->mq
//            rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT,
//                    ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT,
//                    message.getBody());


            // 模拟XXX系统处理耗时(理论上应该在XXX系统中)
//            int sleepTimes = sleepRandom.nextInt(60) + 1;
//            System.out.println("------------task_uuid:" + task_uuid + ",sleepTimes:" + sleepTimes);
//            TimeUnit.SECONDS.sleep(sleepTimes);


            // 4.条件变量通知
            try {
                System.out.println("------------Mq消费者(后台系统A中)解析XXX系统理结果,开始获取锁:" + task_uuid);
                myMqLock.lock();

                System.out.println("------------Mq消费者(后台系统A中)解析XXX系统理结果,获取锁成功:" + task_uuid);

                System.out.println("------------Mq消费者解析XXX系统处理结果,激活信号:" + task_uuid);

                // 按顺序唤醒条件队列中的节点(11,22),和task_uuid没有直接的关系绑定(22,11)

                Condition condition = ConstRabbit.conditionCache.get(task_uuid);
                this.myMqCondition = condition;
                if (this.myMqCondition != null) {
                    myMqCondition.signal();  // node-->aqs
                } else {
                    System.out.println("------------Mq消费者(后台系统A中)解析XXX系统处理结果,无效的条件:" + task_uuid);
                }

            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                System.out.println("------------Mq消费者(后台系统A中)开始释放锁:" + task_uuid);
                myMqLock.unlock(); // 唤醒aqs节点
            }

        } catch (Exception ex) {
            ex.printStackTrace();

            // begin_手动消费方式是启用
            if (messageProperties.getRedelivered()) {
                // 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息

                // 主要防止死循环消费
                log.info("------------Mq消费者消息已重复处理失败,拒绝再次接收...");
                // 拒绝消息
                channel.basicReject(messageProperties.getDeliveryTag(), false);
            } else {
                log.info("------------Mq消费者消息即将再次返回队列处理...");
                channel.basicNack(messageProperties.getDeliveryTag(), false, true);
            }
            // end_手动消费方式是启用
        }
    }
}

2.6启动类

package com.kikop;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitwebrequest
 * @file Name: MyRabbitWebRequestApplication
 * @desc
 * @date 2021/10/11
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */

// 默认组件扫描当前包空间
@SpringBootApplication
public class MyRabbitWebRequestApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyRabbitWebRequestApplication.class, args);
    }

}

2.7后台系统B

package com.kikop;


import com.alibaba.fastjson.JSONObject;
import com.kikop.model.MqRequest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.stream.IntStream;


/**
 * @author kikop
 * @version 1.0
 * @project Name: myrabbitreliableproducer
 * @file Name: ProducerTest
 * @desc
 * @date 2021/10/10
 * @time 8:00
 * @by IDE: IntelliJ IDEA
 */
//@SpringBootTest
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)

public class RabbitWebRequestTest {


    // 引入 SpringBootTest
    // 模拟 rest请求
    @Autowired
    private TestRestTemplate testRestTemplate;


    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 传输字符串
     * get请求
     */
    @Test
    public void testConsurrentSend() {

        // http://localhost:8085/myrabbitwebrequest/aform/communicateWithMq?task_uuid=1

        // 本机CPU为4核,同时并发最多4个
//        IntStream.range(0, 3).parallel().forEach(i ->
//                {
//                    MqRequest mqRequest = new MqRequest();
//                    mqRequest.setReqId(String.valueOf(i + 1));
//                    mqRequest.setReqInfo("info_" + String.valueOf(i + 1));
//
//                    JSONObject result = testRestTemplate.getForObject("/orderpay/communicateWithMqByReqObj?task_uuid={task_uuid}",
//                            JSONObject.class, mqRequest);
//                    System.out.println(result.toJSONString());
//                }
//        );
    }

    /**
     * 传输字符串
     * post请求
     */
    @Test
    public void testConsurrentSendByObj() {

        // 本机CPU为4核,同时并发最多4个
        MqRequest mqRequest = new MqRequest();
        mqRequest.setReqId(String.valueOf(1));
        mqRequest.setReqInfo("info_" + String.valueOf(1));

        JSONObject result = testRestTemplate.postForObject("/orderpay/communicateWithMqByReqObj",
                mqRequest,
                JSONObject.class, "");
        System.out.println(result.toJSONString());

    }

    @Test
    public void testProcResult() {
        MqRequest mqRequest = new MqRequest();
        mqRequest.setReqId(String.valueOf(1));
        mqRequest.setReqInfo("info_" + String.valueOf(1));

        System.out.println("------------模拟后台系统B进行请求处理");
        rabbitTemplate.convertAndSend(ConstRabbit.EXCHANGE_WEBREQUEST_DIRECT_RESULT,
                ConstRabbit.ROUTINGKEY_WEBREQUEST_DIRECT_RESULT,
                mqRequest);
    }
}

2.8postman测试

image-20211013215913972.png image-20211013215947627.png
// 后台系统A
2021-10-13 21:58:11,271 [INFO] [http-nio-8085-exec-1] [org.springframework.web.servlet.DispatcherServlet:547] [] Completed initialization in 5 ms
------------前端Web系统开始后台系统A请求:1
------------业务线程发送到rabbitmq服务中的请求队列,开始获取锁:1
------------业务线程发送到rabbitmq服务中的请求队列,获取锁成功:1
------------业务线程发送到rabbitmq服务中的请求队列,await等待结果:1,准备唤醒AQS节点中下一个锁
// 后台系统B
------------模拟后台系统B进行请求处理
// 后台系统A
2021-10-13 22:00:20,063 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:63] [] MessageProperties [headers={__TypeId__=com.kikop.model.MqRequest}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex_webrequest_result, receivedRoutingKey=rk_webrequest_result, deliveryTag=1, consumerTag=amq.ctag-j7dC0hzytf8DF2_SQ3Jhsw, consumerQueue=queue_webrequest_result]
2021-10-13 22:00:20,064 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:68] [] (Body:'{"reqId":"1","reqInfo":"info_1"}' MessageProperties [headers={__TypeId__=com.kikop.model.MqRequest}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ex_webrequest_result, receivedRoutingKey=rk_webrequest_result, deliveryTag=1, consumerTag=amq.ctag-j7dC0hzytf8DF2_SQ3Jhsw, consumerQueue=queue_webrequest_result])
2021-10-13 22:00:20,064 [INFO] [pool-1-thread-4] [com.kikop.listener.MyRabbitListener:70] [] {"reqId":"1","reqInfo":"info_1"}
------------Mq消费者(后台系统A中)解析XXX系统理结果,开始获取锁:1
------------Mq消费者(后台系统A中)解析XXX系统理结果,获取锁成功:1
------------Mq消费者解析XXX系统处理结果,激活信号:1
------------Mq消费者(后台系统A中)开始释放锁:1
------------业务线程开始释放锁:1
------------前端Web系统结束后台系统A请求:1

参考

1RabbitMQ笔记(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer

https://blog.csdn.net/yingziisme/article/details/86418580

2postman发送json格式的post请求

https://www.cnblogs.com/shimh/p/6093229.html

上一篇下一篇

猜你喜欢

热点阅读