工作队列之轮询分发

2019-04-07  本文已影响0人  寂静的春天1988

直接上代码
生产者1

package com.demo.controller;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RabbitMQSendMq1 {
    //2、工作队列之轮询分发
    /**
     *    生产者1
     * @throws IOException
     * @throws TimeoutException
     * @throws InterruptedException 
     */
    public void sendMq1() throws IOException, TimeoutException, InterruptedException {
        //创建连接
        Connection  connection=RabbitMQ.getConnection();
        //得到通道
        Channel channel =connection.createChannel();
        //得到队列
        channel.queueDeclare("queue1", false, false, false, null);
        
        for (int i = 0; i < 20; i++) {
            String msg="hello world"+i;
            channel.basicPublish("", "queue1", null, msg.getBytes());
            Thread.sleep(i*10);
        }
        channel.close();
        connection.close();
    }
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        RabbitMQSendMq1 c=new RabbitMQSendMq1();
        c.sendMq1();
    }
}

消费者1

package com.demo.controller;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RabbitMQGetMq1 {
    /**
     *    消费者1
     * @throws IOException
     * @throws TimeoutException
     */
    public void getMq1() throws IOException, TimeoutException {
        //创建连接
        Connection  connection=RabbitMQ.getConnection();
        //得到通道
        Channel channel =connection.createChannel();
        //得到队列
        channel.queueDeclare("queue1", false, false, false, null);
        DefaultConsumer consumer=new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("1==="+msg);
            };
        };
        channel.basicConsume("queue1", true, consumer);
    }
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        RabbitMQGetMq1 c=new RabbitMQGetMq1();
        c.getMq1();
    }
}
package com.demo.controller;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.demo.util.RabbitMQ;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RabbitMQGetMq2 {
    //2、工作队列之轮询分发
    /**
     *    消费者2
     * @throws IOException
     * @throws TimeoutException
     */
    public void getMq2() throws IOException, TimeoutException {
        //创建连接
        Connection  connection=RabbitMQ.getConnection();
        //得到通道
        Channel channel =connection.createChannel();
        //得到队列
        channel.queueDeclare("queue1", false, false, false, null);
        DefaultConsumer consumer=new DefaultConsumer(channel) {
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("2==="+msg);
            };
        };
        channel.basicConsume("queue1", true, consumer);
    }
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        RabbitMQGetMq2 c=new RabbitMQGetMq2();
        c.getMq2();
    }
}

上述代码运行可以发现,即使消费者二处理的时间更快,但是消费者二并没有消费更多的消息,而是你
一个我一个的方式进行处理!

上一篇下一篇

猜你喜欢

热点阅读