<三>RabbitMQ单发多收(工作队列)

2017-04-25  本文已影响0人  者薄

单发多理解---打虎基本功

单发多收简单理解就是:一个生产者生产,多个消费者消费.生产者将生产的消息放入队列当中,由多个消费者从消息队列中取出消息进行消费.这样能提高系统的吞吐能力

示例场景---四面八方

一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息.实现原理图如下:


图片.png

项目构建步骤---一步一个脚丫子

1.实现生产者
在上篇文章中创建的项目中创建一对多任务的生产者(Provider)

package com.rabbitmq.task;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * 消息队列的任务处理提供者P
 * 
 * @author panyuanyuan
 *
 */
public class TaskP {
    
    private static final String QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        for(int i = 0; i < 5; i++) {
            String message = "Hello World! " + i;
            //注意,这里第一个参数为""发送到了匿名交换器上,第二个参数为路由线索====>当时匿名交换器的时候,会发消息发送到和路由线索一样的消息队列上
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
        
        //关闭资源,否则一直会链接到rabbitmq服务器
        channel.close();
        connection.close();
        
    }
}

2.实现消费者1(Consumer1)

package com.rabbitmq.task;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * 任务队列消费者1
 * 
 * @author panyuanyuan
 *
 */
public class TaskC1 {
    private static final String QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL+C");
        
        //每次从消息队列中获取数量
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {
            
                String message = new String(body,"UTF-8");
                System.out.println("Worker1 [x] Received '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //消息处理完成确认
                    System.out.println("Worker1 [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        
        //消息消费确认完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
    
    /**
     * 处理任务
     * 
     * @param task
     */
    private static void doWork(String task) {
        try {
            Thread.sleep(1000); // 暂停1秒钟
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

3.实现消费者2(Consumer2)

package com.rabbitmq.task;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * 任务队列消费者1
 * 
 * @author panyuanyuan
 *
 */
public class TaskC2 {
    private static final String QUEUE_NAME = "task_queue";
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        
        //第二个参数为true说明这个队列需持久化
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL+C");
        
        //每次从消息队列中获取数量---负载均衡,将消息任务均衡的发送到业务服务器
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {
            
                String message = new String(body,"UTF-8");
                System.out.println("Worker2 [x] Received '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    //消息处理完成确认
                    System.out.println("Worker2 [x] Done");
                    //回复ack非常重要,只有回复了,消息队列才会删除消息从队列中,负责可能造成队列爆满
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        
        //消息消费确认完成
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
    
    /**
     * 处理任务
     * 
     * @param task
     */
    private static void doWork(String task) {
        try {
            Thread.sleep(1000); // 暂停1秒钟
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

运行测试---万无一失

这里省略启动RabbitMQ服务,上篇中已经介绍
1.运行客户端1(消费者Consumer1)
运行结果如下:

Worker1 [*] Waiting for messages. To exit press CTRL+C

2.运行客户端2(消费者Consumer2)
运行结果如下:

Worker2 [*] Waiting for messages. To exit press CTRL+C

3.运行生产者(生产者Provider)
运行结果如下:

 [x] Sent 'Hello World! 0'
 [x] Sent 'Hello World! 1'
 [x] Sent 'Hello World! 2'
 [x] Sent 'Hello World! 3'
 [x] Sent 'Hello World! 4'

4.查看消费者1窗口(消费者Consumer1)
查看结果如下:

Worker1 [*] Waiting for messages. To exit press CTRL+C
Worker1 [x] Received 'Hello World! 0'
Worker1 [x] Done
Worker1 [x] Received 'Hello World! 2'
Worker1 [x] Done

5.查看消费者2窗口(消费者Consumer2)
查看结果如下:

Worker2 [*] Waiting for messages. To exit press CTRL+C
Worker2 [x] Received 'Hello World! 1'
Worker2 [x] Done
Worker2 [x] Received 'Hello World! 3'
Worker2 [x] Done
Worker2 [x] Received 'Hello World! 4'
Worker2 [x] Done

注意总结---万剑归宗

当RabbitMQ为一对多任务的时候,生产者生产消息放入队列,队列分消息给消费者消费,这里消费者获取消息是随机的,但是消费者消费的总和等于生产者生产的.

致谢

至此RabbitMQ的单发多收(工作)模式已经完成,感谢大大的微笑的博客,参照博客链接 > http://blog.csdn.net/chwshuang/article/details/50521708

上一篇下一篇

猜你喜欢

热点阅读