rabbitMQ

RabbitMQ学习(五)消费端削峰限流

2018-11-28  本文已影响0人  kobe0429

1.MQ的作用

1)解耦:在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
2)冗余(存储):在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化知道他们完全被处理
扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
3)削峰:在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃
4)可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理
5)顺序保证:在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性
6)缓冲:消息中间件通过一个缓冲层来帮助任务最高效率的执行
7)异步通信:通过把把消息发送给消息中间件,消息中间件并不立即处。

本文只讨论削峰填谷的应用场景:

举个业务场景的栗子,秒杀业务:
上游发起下单操作
下游完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)
上游下单业务简单,每秒发起了10000个请求,下游秒杀业务复杂,每秒只能处理2000个请求,很有可能上游不限速的下单,导致下游系统被压垮,引发雪崩。
为了避免雪崩,常见的优化方案有两种:
1)业务上游队列缓冲,限速发送
2)业务下游队列缓冲,限速执行

本文只讨论下游队列,就是消费端的限速执行

rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
使用 basicqos方法:
在消费端进行使用。 0 1 false
prefetSize:0
prefetCount:这个值一般在设置为非自动ack的情况下生效,一般大小为1
global: true是channel级别, false是消费者级别
注意:我们要使用非自动ack
消费者代码:

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {

    
    public static void main(String[] args) throws Exception {
        
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.197.244");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //1 限流方式  第一件事就是 autoAck设置为 false
        
        channel.basicQos(0,3,false);
        channel.basicConsume(queueName,false,new MyConsumer(channel));
    }
}

自定义消费者代码:

package com.bfxy.rabbitmq.api.limit;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {


    private Channel channel ;
    
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        
        channel.basicAck(envelope.getDeliveryTag(), false);
        
    }


}

生产者代码:

package com.bfxy.rabbitmq.api.limit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.136.197.244");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("admin");


        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        
    }
}

调试步骤:
1)启动消费者类,效果如图:

消费者启动mq交换机信息.JPG
消费者启动mq队列信息.JPG
2)在自定义消费者类中注释掉channel.basicAck(envelope.getDeliveryTag(), false);
启动生产者类,mq管控台信息
管控台信息.JPG
可以看到1个待确认的,4个准备好的消息,
3)放开代码channel.basicAck(envelope.getDeliveryTag(), false);
启动生产者类,mq管控台信息
管控台信息.JPG
总结:消费者消费成功一个消息后,需要设置成手动确认,当返回确认成功后,再去消费下一个消息,这样可以实现消费端的削峰限流,不至于让消费端服务崩溃。
到这里是不是以为结束了呢,其实还有一个知识点,就是消费端对没有消费成功的消息,可以不进行确认,让其重回队列,再次消费,与上面的代码相比,只需修改自定义的消费者,设置如果满足我们自己设置的条件就认为是没有消费成功,让其重回队列,这个时候broker端会再此发出这条消息。
修改如下:
重回队列.JPG
启动生产者和消费者,消费者控制台信息如下:
重复消费未确认的消息.JPG
上一篇下一篇

猜你喜欢

热点阅读