RabbitMQ消费端的限流策略

2019-03-20  本文已影响0人  若兮缘
为什么需要消费端的限流?
消费端限流机制

RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息 (通过基于consume或者channel设置Qos的值) 未被确认前,不进行消费新的消息。
需要注意:
1.不能设置自动签收功能(autoAck = false)
2.如果消息没被确认,就不会到达消费端,目的就是给消费端减压

限流相关API
限流设置 - BasicQos()

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize: 单条消息的大小限制,消费端通常设置为0,表示不做限制
prefetchCount: 一次最多能处理多少条消息,通常设置为1
global: 是否将上面设置应用于channel,false代表consumer级别

注意事项

prefetchSizeglobal这两项,rabbitmq没有实现,暂且不研究
prefetchCountautoAck=false 的情况下生效,即在自动应答的情况下这个值是不生效的

手工ACK - basicAck()

void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你可以给我下一条了。参数multiple表示是否批量签收,由于我们是一次处理一条消息,所以设置为false

限流演示
生产端

生产端就是正常的逻辑

public class Producer {

    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";
        
        String msg = "Hello RabbitMQ QOS Message";
        //发送消息
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
    }
}
自定义消费者

在这里可以进行消息的手工ACK

public class MyConsumer extends DefaultConsumer {

    //接收channel
    private Channel channel ;
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        //System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        //手工ACK,参数multiple表示不批量签收
        //channel.basicAck(envelope.getDeliveryTag(), false);   
    }
}
消费端

关闭autoACK,进行限流设置

public class Consumer {

    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.43.157");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 通过Connection创建一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        //4 声明交换机和队列,然后进行绑定设置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //进行参数设置:单条消息的大小限制,一次最多能处理多少条消息,是否将上面设置应用于channel
        channel.basicQos(0, 1, false);
        
        //限流: autoAck设置为 false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}
运行说明

我们先注释掉手工ACK方法,然后启动消费端和生产端,此时消费端只打印了一条消息

-----------consume message----------
consumerTag: amq.ctag-vtsQsdK17o1Z3BWeGvZKRA
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=test_qos_exchange, routingKey=qos.save)
body: Hello RabbitMQ QOS Message

这是因为我们设置了手工签收,并且设置了一次只处理一条消息,当我们没有回送ack应答时,Broker端就认为消费端还没有处理完这条消息,基于这种限流机制就不会给消费端发送新的消息了,所以消费端只打印了一条消息。
通过管控台也可以看到队列总共收到了5条消息,有一条消息没有ack。

将手工签收代码取消注释,再次运行消费端,此时就会打印5条消息的内容。

上一篇 下一篇

猜你喜欢

热点阅读