工作队列之轮询分发
2019-04-07 本文已影响0人
寂静的春天1988
直接上代码
生产者1
package com.demo.controller;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RabbitMQSendMq1 {
//2、工作队列之轮询分发
/**
* 生产者1
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
public void sendMq1() throws IOException, TimeoutException, InterruptedException {
//创建连接
Connection connection=RabbitMQ.getConnection();
//得到通道
Channel channel =connection.createChannel();
//得到队列
channel.queueDeclare("queue1", false, false, false, null);
for (int i = 0; i < 20; i++) {
String msg="hello world"+i;
channel.basicPublish("", "queue1", null, msg.getBytes());
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
RabbitMQSendMq1 c=new RabbitMQSendMq1();
c.sendMq1();
}
}
消费者1
package com.demo.controller;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RabbitMQGetMq1 {
/**
* 消费者1
* @throws IOException
* @throws TimeoutException
*/
public void getMq1() throws IOException, TimeoutException {
//创建连接
Connection connection=RabbitMQ.getConnection();
//得到通道
Channel channel =connection.createChannel();
//得到队列
channel.queueDeclare("queue1", false, false, false, null);
DefaultConsumer consumer=new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("1==="+msg);
};
};
channel.basicConsume("queue1", true, consumer);
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
RabbitMQGetMq1 c=new RabbitMQGetMq1();
c.getMq1();
}
}
package com.demo.controller;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RabbitMQGetMq2 {
//2、工作队列之轮询分发
/**
* 消费者2
* @throws IOException
* @throws TimeoutException
*/
public void getMq2() throws IOException, TimeoutException {
//创建连接
Connection connection=RabbitMQ.getConnection();
//得到通道
Channel channel =connection.createChannel();
//得到队列
channel.queueDeclare("queue1", false, false, false, null);
DefaultConsumer consumer=new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("2==="+msg);
};
};
channel.basicConsume("queue1", true, consumer);
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
RabbitMQGetMq2 c=new RabbitMQGetMq2();
c.getMq2();
}
}
上述代码运行可以发现,即使消费者二处理的时间更快,但是消费者二并没有消费更多的消息,而是你
一个我一个的方式进行处理!