Java大数据

消息中间件--RabbitMQ总结

2021-12-23  本文已影响0人  Java弟中弟
消息中间件--RabbitMQ总结

基本知识

为什么使用消息中间件

消息中间件--RabbitMQ总结

在有多个系统组成的应用中,常常出现A系统会影响B、C、D数据的情况。通常做法是在A中调用其他系统的接口,这样系统之间的依赖太大,如果后续添加新的系统,就需要在A中添加相应的逻辑。这样做耦合程度太大,不利于维护。

消息中间件--RabbitMQ总结

加入MQ后,A系统中不用添加其他系统的调用,只需要发送消息,其他系统监听消息,在自己系统中处理。新增或者删除也不需要改动A系统的代码,只需要在自己中取消该类型的消息监听就行。

很多时候涉及多服务之间调用的情况,客户端发起请求,A中回去调用B、C、D的接口,最后再将执行结果返回到客户端,这样一个流程中A接口的执行时间,收到其他服务的影响,是他们执行时间的总和,如果A不关心B、C、D他们的执行情况,就可以使用MQ。A发送消息后直接返回,从而提升接口的响应时间。

消息中间件--RabbitMQ总结

当系统面临大量请求时,会对数据库造成很大压力,引入MQ后,你可以根据数据库的实际处理能力,每次从MQ中拿一定数量的数据处理,处理完从中取。

消息中间件--RabbitMQ总结

生产者/消费者

RabbitMQ 的安装

普通安装

直接去官网下载安装包。

https://www.rabbitmq.com/

消息中间件--RabbitMQ总结

docker安装

// 拉去镜像
docker pull rabbitmq

// 启动容器
docker run -it --name rabbitmq  \ 
-e RABBITMQ_DEFAULT_USER=admin \ 
-e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672  \ 
-d  rabbitmq

// 进入容器开启管理界面
docker exec -it rabbitmq sh
//开启管理界面
rabbitmq-plugins enable rabbitmq_management

通过访问 http://localhost:15672/ 即可看到管理界面

消息中间件--RabbitMQ总结

在Springboot集成RabbitMQ

引入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置队列和交换机

@Configuration
public class RabbitmqConfig {

@Bean
    public Queue msgQueue(){
        /**
         * name: 队列名称
         * durable 是否持久化
         * exclusive 是否是排他队列 只有创建者可以使用
         * autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
         */
        return new Queue("MSG_MQ", true, false, false);
    }
@Bean
    public DirectExchange msgExchange(){
        return new DirectExchange("MSG_ECHANGE", true, false);
    }
    @Bean
    public Binding mailBinding(){
        return BindingBuilder
                .bind(mailQueue())
                .to(msgExchange())
                .with("MSG_ROUTING");
    }
}

生产者发送消息

@RequestMapping("/v1/demo")
@RestController
public class DemoController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sed_queue")
    public void sendMsg(){
         rabbitTemplate.convertAndSend("MSG_ECHANGE", "MSG_ROUTING", "你好惨:"+ System.currentTimeMillis());
    }
}

消费者接受消息

使用 @RabbitListener 去监听消息队列,队列中有消息了就去消费

@Component
public class RabbitListner {
    @RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
    public void handleMsg(String msg){
        System.out.println("msg-"+ msg);
    }
}

queues 和queuesToDeclare 不同点 :使用queuesToDeclare时,服务启动时回去MQ中检测监听的队列是否存在,没有这个队列会就会去创建

RabbitMQ的组成

四种交换机

@Bean
    public Queue faQueue1(){
        /**
         * name: 队列名称
         * durable 是否持久化
         * exclusive 是否是排他队列 只有创建者可以使用
         * autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
         */
        return new Queue("fa.queue1", true, false, false);
    }
 @Bean
    public Queue faQueue2(){
        /**
         * name: 队列名称
         * durable 是否持久化
         * exclusive 是否是排他队列 只有创建者可以使用
         * autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
         */
        return new Queue("fa.queue2", true, false, false);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout.exchange", true, false);
    }

    @Bean
    public Binding bindingFanoutExchange(){
        return BindingBuilder
                .bind(faQueue1())
                .to(fanoutExchange());
    }

    @Bean
    public Binding bindingFanoutExchange1(){
        return BindingBuilder
                .bind(faQueue2())
                .to(fanoutExchange());
    }
@RabbitListener(queuesToDeclare = @Queue("fa.queue1"))
    public void faQueue1(String msg){
        System.out.println("faQueue1-"+msg);
    }

    @RabbitListener(queuesToDeclare = @Queue("fa.queue2"))
    public void faQueue2(String msg){
        System.out.println("faQueue2-"+msg);
    }
@GetMapping("/sed_fanout")
    public void sendFanoutMsg(){
        rabbitTemplate.convertAndSend("fanout.exchange", null, "fanoutExchange:"+ System.currentTimeMillis());
    }
@Bean
    public Queue topicQueue1(){
        /**
         * name: 队列名称
         * durable 是否持久化
         * exclusive 是否是排他队列 只有创建者可以使用
         * autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
         */
        return new Queue("topic.queue1", true, false, false);
    }

    @Bean
    public Queue topicQueue2(){
        /**
         * name: 队列名称
         * durable 是否持久化
         * exclusive 是否是排他队列 只有创建者可以使用
         * autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
         */
        return new Queue("topic.queue2", true, false, false);
    }

    @Bean
    public TopicExchange topicExchange1(){
        return new TopicExchange("topic.exchange1", true, false);
    }

    @Bean
    public Binding topicBinding1(){
        return BindingBuilder
                .bind(topicQueue1())
                .to(topicExchange1())
                .with("top.*");
    }

    @Bean
    public Binding topicBinding2(){
        return BindingBuilder
                .bind(topicQueue2())
                .to(topicExchange1())
                .with("top.#");
    }
@RabbitListener(queuesToDeclare = @Queue("topic.queue1"))
    public void topicQueue1(String msg){
        System.out.println("topicQueue1-"+msg);
    }

    @RabbitListener(queuesToDeclare = @Queue("topic.queue2"))
    public void topicQueue2(String msg){
        System.out.println("topicQueue2-"+msg);
    }
@GetMapping("/sed_topic")
    public void sendFanoutMsg(String key){
        rabbitTemplate.convertAndSend("topic.exchange1", key, "TopicExchange:"+ System.currentTimeMillis());
    }
@Bean
    public Queue headQueue(){
        /**
         * name: 队列名称
         * durable 是否持久化
         * exclusive 是否是排他队列 只有创建者可以使用
         * autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
         */
        return new Queue("head.queue1", true, false, false);
    }

    @Bean
    public Queue headQueue1(){
        /**
         * name: 队列名称
         * durable 是否持久化
         * exclusive 是否是排他队列 只有创建者可以使用
         * autoDelete 声明此队列为临时队列,最后一个消费者使用完自动删除
         */
        return new Queue("head.queue2", true, false, false);
    }

    @Bean
    public HeadersExchange headersExchange(){
        return new HeadersExchange("head.exchange", true, false);
    }

    @Bean
    public Binding headBinding(){
        Map<String, Object> headers = new HashMap<>();
        headers.put("abk", "asd");
        return BindingBuilder
                .bind(headQueue())
                .to(headersExchange())
                .whereAll(headers)
                .match();
    }

    @Bean
    public Binding headBinding1(){
        Map<String, Object> headers = new HashMap<>();
        headers.put("abk", "ack");
        return BindingBuilder
                .bind(headQueue1())
                .to(headersExchange())
                .whereAll(headers)
                .match();
    }
@RabbitListener(queuesToDeclare = @Queue("head.queue1"))
    public void headQueue1(String msg){
        System.out.println("headQueue1-"+msg);
    }

    @RabbitListener(queuesToDeclare = @Queue("head.queue2"))
    public void headQueue2(String msg){
        System.out.println("headQueue2-"+msg);
    }
@GetMapping("/sed_head_msg")
    public void sendHeaderMsg1(@RequestParam String msg,
                               @RequestBody Map<String, Object> map){

        MessageProperties messageProperties = new MessageProperties();
        //消息持久化
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("UTF-8");
        //添加消息
        messageProperties.getHeaders().putAll(map);
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("head.exchange", null, message);
    }

用postman调用请求

消息中间件--RabbitMQ总结

后台打印出

消息中间件--RabbitMQ总结

说明匹配到了 head.queue2

同理,设置head的值

消息中间件--RabbitMQ总结 消息中间件--RabbitMQ总结

消息可靠性

消息中间件--RabbitMQ总结

图中显示的是一条消息传递的整个过程,我们大致可以分析出那些环节会导致消息不可靠或者说消息丢失。

针对于以上的是三种情况,Rabbit为我们提供了对应的解决方案:持久化、confirm机制、ACK事务机制。

消息持久化

配置Exchange持久化和Queue持久化。

在创建Queue 和Exchange时设置 durable 为true

消息中间件--RabbitMQ总结

你也可以使用默认值,默认为true

消息中间件--RabbitMQ总结

交换机同样如此

消息中间件--RabbitMQ总结

消息确认机制

消息中间件--RabbitMQ总结

在生产者发送消息到MQ这段过程中,MQ挂了,导致消息丢失。Rabbit提供confirm和returnMessage方法去处理消息丢失。

springboot 添加配置

## 新版中使用 publisher-confirm-type 有三个参数
# none(禁用)
# correlated(触发confirm回调)
# simple(具有correlated的功能 同时可以使rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie)
# 旧版中 publisher-confirms 默认 false
spring.rabbitmq.publisher-confirm-type=simple
# 消息没有匹配到队列 触发returnMessage 回调
spring.rabbitmq.publisher-returns= true
# publisher-returns 和 mandatory 同时使用时优先使用 mandatory
spring.rabbitmq.template.mandatory= true

实现 RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback

@Component
public class RabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String error) {
       if(ack){
           System.out.println("消息发送成功");
       } else {
           System.out.println("消息发送失败");
       }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("replyCode:").append(replyCode).append(",")
        .append("replyText:").append(replyText).append(",")
        .append("exchange:").append(exchange).append(",")
        .append("routingKey:").append(routingKey).append(",");
    }
}
消息中间件--RabbitMQ总结

没有匹配到路由触发returnMessage

消息中间件--RabbitMQ总结

找到交换机触发confirm

消息中间件--RabbitMQ总结

没有找到交换机和队列

消息中间件--RabbitMQ总结

ACK 事务机制

消息确认机制解决了消息发送MQ这个过程中的问题,ACK则是解决消费者处理过程中消息丢失的问题。

消息中间件--RabbitMQ总结

消费者接受消息,在处理过程中出现失败手动拒签,重新放回队列等待再次消费,消费成功后手动签收。

配置手动模式

### 开启手动模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
## 最小消费者数量
spring.rabbitmq.listener.simple.concurrency=1
## 最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=1

改造消费者

@RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
    public void handleMsg(String msg, Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        if("success".equals(msg)){
            channel.basicAck(deliveryTag, false);
        } else if("reply".equals(msg)) {
            // basicReject 和 basicNack的区别  basicReject不支持批量 basicNack不支持
            //  channel.basicReject(deliveryTag, true);
            channel.basicNack(deliveryTag, false, true);
        } else {
            channel.basicNack(deliveryTag, false, false);
        }
    }

basicAck : 成功确认消息

basicReject: 失败拒绝

ack带来的问题

上一篇 下一篇

猜你喜欢

热点阅读