JAVA后端架构

消息队列实践(二)——面对多个Consumer时的RabbitM

2019-07-23  本文已影响70人  瑞瑞余之

上一节介绍了针对一个producer、一个broker、一个consumer时,消息的发布、入队、获取的过程。本文将话题延深一步:其它因素不变,当存在多个consumer时,broker会如何处理。本篇的思路同于官网的Work Queue,可作为其中文解读版本。

本文要实现的效果很简单:

面对以上的实现效果,容易让人产生这样的疑虑:

  1. 这种情况有点类似于分布式当中的主从结构,那么RabbitMQ分配任务给consumer,会不会有负载均衡的效果,也就是说它是否会自动计算各个consumer的负载,进行科学的分配?
  2. 另外,一旦一个consumer挂掉,它所负责的队列任务会丢失掉,还是转移到其它的consumer?
  3. 如果RabbitMQ挂掉或者重启,那还未分配的任务会被丢掉吗?

为了解决上面的疑惑,我们分3步来进行本节的实践:
首先,建立1Producer + 1broker + 2*Consumer的代码结构,观察系统在默认情况下是如何分配任务,处理Consumer宕机和处理RabbitMQ宕机的;
其次,通过修改各个Consumer的执行时间,模拟不同节点的处理任务能力,看系统会不会负载均衡;
最后,对于不满足工程实践的漏洞,我们看看官方推荐的解决方案。

代码实践

  1. 创建队列任务
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "a_task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);

        String message = String.join(" ", argv); //将数组转化为一个由空格分隔的字符串

        channel.basicPublish("", TASK_QUEUE_NAME,
                null,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}
  1. 创建执行任务的Consumer * 2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "a_task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  }

//任务字符串中,每出现一个“.”,延迟一秒,用来模拟consumer处理需要的耗时
  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}
  1. 打开三个terminal,其中两个分别启动Work.java,作为两个consumer;之后再执行NewTask.java
    之前需要通过以下方式安装依赖,在NewTask.java和Worker.java的同级目录执行:
wget https://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.6.0/amqp-client-5.6.0.jar
wget https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar
wget https://repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.25/slf4j-simple-1.7.25.jar

配置环境变量:

export CP=.:amqp-client-5.6.0.jar:slf4j-api-1.7.25.jar:slf4j-simple-1.7.25.jar

编译NewTask.java和Worker.java生成字节码文件:

//再次提醒:要保证依赖和源文件在同一级目录
javac -cp $CP NewTask.java Worker.java
文件都在同一级目录
此时我们让Worker跑起来,它们会等待任务发布,在terminal1和terminal2中分别执行:java -cp $CP Worker,在terminal3中执行java -cp $CP NewTask Message1 一直到 java -cp $CP NewTask Message5
NewTask&Worker
可以发现,RabbitMQ会交替的将任务给到下辖的Worker进行处理,我们可以看以下此时RabbitMQ的队列情况
a_task_queue

工程实践

可以看到一旦Worker接收,队列中的message会被删除。这里可能存在一个隐患,如果NewTask在发送过程中,将其中一个Worker干掉,看这条message是否会丢失,此时我们重新执行Worker,且修改输入参数为: java -cp $CP NewTask Message5......,我们注意到在private方法doWork中,每一个“.”代表处理延迟一秒,这用来模拟系统在处理message的时候花费的时间

//任务字符串中,每出现一个“.”,延迟一秒,用来模拟consumer处理需要的耗时
  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }

我们在NewTask打印了“[x] Sent 'Message num.........'”日志之后,关闭正在处理的Worker终端,如下图所示:

干掉一个正在执行的Worker
问题出现了!RabbitMQ中的queue因为Worker1已经接收到了Message 5......所以把对应的message删除了,但是Worker1在处理这个message还没有完成的时候(未打印done,可以看一下上文的代码细节)被干掉了。此时Message 5......即没有重新出现在queue中也没有被转派给Worker2,造成了数据丢失!这就是我们在文初提到的第二个问题:一旦一个consumer挂掉,它所负责的队列任务会丢失掉,还是转移到其它的consumer **
目前看来答案是“丢失掉”,RabbitMQ对此进行了处理
(划重点啦!!!)**:
RabbitMQ提供了一种机制叫做—— message acknowledgments,我们不妨称它为消息认证,也就是说,当consumer收到了message,并且彻底处理完之后,会回传给broker一个ack标志,告诉broker可以从queue中删除对应的message了,如果broker发现有当和某个consumer断了之后,还没有收到message的ack,它就知道需要重新发送给其它的consumer(如果有的话)。这个功能实现起来并不复杂,修改Worker.java的对应代码就可以:
//关闭自动认证,改为手动认证(其默认为自动认证,也就是一接收到message就认证成功)
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
//最后,在打印完成之后,发出ack
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};

在给出以上代码运行结果之前,我想展示一下,我在coding中遇到的一个小问题,大家要注意:我将autoAck = false,而忘记在日志后面手动发送认证,也就是做了上面第一处代码修改,而忘记修改第二处。这里导致了一个问题,因为系统不会自动返回ack,而我又没有手动添加,导致队列中unacknowledged的message越来越多,进入docker执行rabbitmqctl list_queues name messages_ready messages_unacknowleged看到即便已经处理过的message也没有被清除。如果这里不注意,会带来比较严重的后果,一是unack的message在与当前consumer断开连接后,它会重新发给其它consumer,对于我们的系统而言,它们其实已经被处理过,可能早上数据混乱;另一方面,不断增加的unack message会导致RabbitMQ吃掉更多内存。

未确认的message数
到目前为止,我们对broker(RabbitMQ)正常,Consumer异常而导致的数据丢失已经做了处理。那反过来思考一下:broker挂掉了那些还未分派的队列任务还会存在吗?
读者可以模拟上文我coding时犯的错误,使得rabbitmq中存在unack message,然后再配合rabbitmqctl stop_apprabbitmqctl start_app 看之前unack message是否还在。这里我们略过过程直接给出结果:rabbitmq中数据丢失!
rabbitmq数据丢失
问题来了,我们如何解决,最直接的一个想法,就是queue message持久化嘛!这里分为两个方面:
最基本的:rabbitmq中的队列不能丢失,比如上面演示中遇到的“队列不见了”,肯定要避免;
另外:queue中的message不能丢失;
对于这两个方面,Rabbitmq都有方式解决,首先,我们在声明队列的时候,需要将队列的duration设置为true,表示即便Rabbitmq挂掉后重启,该队列仍然存在!
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

此外,我们在发布channel(NewTask.java)的时候需要设置message的持久化:

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "a_task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

以上可以基本实现rabbitmq中数据不丢失,在这里回应开篇的第三个问题“如果RabbitMQ挂掉或者重启,那还未分配的任务会被丢掉吗?”
answer:会被丢掉,但是可通过配置队列和持久化message解决!

终于到了终极一问:“RabbitMQ分配任务给consumer,会不会有负载均衡的效果,也就是说它是否会自动计算各个consumer的负载,进行科学的分配?”
我们做这样一个试验,Worker1执行一个耗时较长的任务,比如

//NewTask.java to Worker1
java -cp $CP NewTask Message 9...............................................................................

Worker2执行耗时很短的任务:

//NewTask.java to Worker2
java -cp $CP NewTask Message 10..

在Worker1还未完成任务之前,我们立即发布两次新任务

//NewTask.java to some worker
java -cp $CP NewTask Message 11..
java -cp $CP NewTask Message 12..
fair dispatch

上图的结果,说明了问题:RabbitMQ默认会将队列任务均分到各个consumer,即便某一个consumer耗时较长,轮到它的任务不会负载均衡到其它节点,而是等待它执行完成,可以说RabbitMQ在分发任务时处于blindly dispatch(盲发)的状态。为了解决这个问题我们可以设置consumer与broker的chanel属性basicQos。比如

channel.basicQos(1);

意味着如果这个consumer存在一个或多个未完成的任务,包括正在执行的和unack的任务,则rabbitmq会去找下一个consumer,直到找到完全空置的consumer,才将任务分配给它。
到此为止,我们回答了开篇提出的所有问题,读者可以从github上找到对应的完整源码(NewTask.java Worker.java),不妨动手试一试!

上一篇下一篇

猜你喜欢

热点阅读