RabbitMQ的QoS流控

2018-04-24  本文已影响0人  杰森斯坦sen

RabbitMQ可以设置basicQoS(Consumer Prefetch)来对consumer进行流控,从而限制未ack的消息数量。

示例

每个consumer单独流控

channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

多个consumer共享流控

channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

分析

官方Java客户端提供了DefaultConsumer和QueueingConsumer。 其中QueueingConsumer内部维护了一个阻塞队列BlockingQueue,此队列就是用来缓存从queue获取的message。

private final BlockingQueue<QueueingConsumer.Delivery> _queue;

Spring amqp提供了类似的BlockingQueueConsumer,但是默认的prefetchCount是1。

链接

Consumer Prefetch
Some queuing theory: throughput, latency and bandwidth

上一篇 下一篇

猜你喜欢

热点阅读