<三>RabbitMQ单发多收(工作队列)
2017-04-25 本文已影响0人
者薄
单发多理解---打虎基本功
单发多收简单理解就是:一个生产者生产,多个消费者消费.生产者将生产的消息放入队列当中,由多个消费者从消息队列中取出消息进行消费.这样能提高系统的吞吐能力
示例场景---四面八方
一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息.实现原理图如下:
图片.png
项目构建步骤---一步一个脚丫子
1.实现生产者
在上篇文章中创建的项目中创建一对多任务的生产者(Provider)
package com.rabbitmq.task;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* 消息队列的任务处理提供者P
*
* @author panyuanyuan
*
*/
public class TaskP {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for(int i = 0; i < 5; i++) {
String message = "Hello World! " + i;
//注意,这里第一个参数为""发送到了匿名交换器上,第二个参数为路由线索====>当时匿名交换器的时候,会发消息发送到和路由线索一样的消息队列上
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
//关闭资源,否则一直会链接到rabbitmq服务器
channel.close();
connection.close();
}
}
2.实现消费者1(Consumer1)
package com.rabbitmq.task;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* 任务队列消费者1
*
* @author panyuanyuan
*
*/
public class TaskC1 {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL+C");
//每次从消息队列中获取数量
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Worker1 [x] Received '" + message + "'");
try {
doWork(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
//消息处理完成确认
System.out.println("Worker1 [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//消息消费确认完成
channel.basicConsume(QUEUE_NAME, false, consumer);
}
/**
* 处理任务
*
* @param task
*/
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
3.实现消费者2(Consumer2)
package com.rabbitmq.task;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* 任务队列消费者1
*
* @author panyuanyuan
*
*/
public class TaskC2 {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//第二个参数为true说明这个队列需持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL+C");
//每次从消息队列中获取数量---负载均衡,将消息任务均衡的发送到业务服务器
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {
String message = new String(body,"UTF-8");
System.out.println("Worker2 [x] Received '" + message + "'");
try {
doWork(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
//消息处理完成确认
System.out.println("Worker2 [x] Done");
//回复ack非常重要,只有回复了,消息队列才会删除消息从队列中,负责可能造成队列爆满
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//消息消费确认完成
channel.basicConsume(QUEUE_NAME, false, consumer);
}
/**
* 处理任务
*
* @param task
*/
private static void doWork(String task) {
try {
Thread.sleep(1000); // 暂停1秒钟
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
运行测试---万无一失
这里省略启动RabbitMQ服务,上篇中已经介绍
1.运行客户端1(消费者Consumer1)
运行结果如下:
Worker1 [*] Waiting for messages. To exit press CTRL+C
2.运行客户端2(消费者Consumer2)
运行结果如下:
Worker2 [*] Waiting for messages. To exit press CTRL+C
3.运行生产者(生产者Provider)
运行结果如下:
[x] Sent 'Hello World! 0'
[x] Sent 'Hello World! 1'
[x] Sent 'Hello World! 2'
[x] Sent 'Hello World! 3'
[x] Sent 'Hello World! 4'
4.查看消费者1窗口(消费者Consumer1)
查看结果如下:
Worker1 [*] Waiting for messages. To exit press CTRL+C
Worker1 [x] Received 'Hello World! 0'
Worker1 [x] Done
Worker1 [x] Received 'Hello World! 2'
Worker1 [x] Done
5.查看消费者2窗口(消费者Consumer2)
查看结果如下:
Worker2 [*] Waiting for messages. To exit press CTRL+C
Worker2 [x] Received 'Hello World! 1'
Worker2 [x] Done
Worker2 [x] Received 'Hello World! 3'
Worker2 [x] Done
Worker2 [x] Received 'Hello World! 4'
Worker2 [x] Done
注意总结---万剑归宗
当RabbitMQ为一对多任务的时候,生产者生产消息放入队列,队列分消息给消费者消费,这里消费者获取消息是随机的,但是消费者消费的总和等于生产者生产的.
致谢
至此RabbitMQ的单发多收(工作)模式已经完成,感谢大大的微笑的博客,参照博客链接 > http://blog.csdn.net/chwshuang/article/details/50521708