RabbitMQ(六)路由

2016-11-04  本文已影响136人  薛晨

RabbitMQ官网中文版教程:

http://rabbitmq.mr-ping.com/tutorials_with_python/[4]Routing.html

上述教程示例为pathon版,Java版及相应解释如下:

生产者

package com.xc.rabbit.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Created by xc.
 */
public class RoutingSendDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    // 路由关键字
    private static final String[] routingKeys = new String[] {"info", "warning", "error"};

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        // 创建一个新的连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 发送消息
        for (String severity : routingKeys) {
            String message = "Send the message level : " + severity;
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
        channel.close();
        connection.close();
    }
}

消费者1

package com.xc.rabbitmq.routing;

import com.rabbit.client.*;

import java.io.IOException;

/**
 * Created by xc.
 */
public class ReceiveLogsDirect1 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKey = new String[]{"info", "warning", "error"};

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        // 创建一个新的连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 获取匿名队列名称
        String queueName = channel.queueDeclare().getQueue();
        // 根据路由关键字进行多重绑定
        for (String severity : routingKey) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
            System.out.println("ReceiveLogsDirect1 exchange : " + EXCHANGE_NAME +
                    ", queue : " + queueName + ", BindRoutingKey : " + severity);

        }
        System.out.println("ReceiveLogsDirect1 [*] Waiting for messages. To exit press CTRL + C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

消费者2

package com.xc.rabbit.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * Created by xc.
 */
public class ReceiveLogsDirect2 {

    // 交换器名称
    private static final String EXCHANGE_NAME = "direct_logs";
    // 路由关键字
    private static final String[] routingKey = new String[]{"error"};

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        // 创建一个新的连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 获取匿名队列名称
        String queueName = channel.queueDeclare().getQueue();
        // 根据路由关键字进行多重绑定
        for (String severity : routingKey) {
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
            System.out.println("ReceiveLogsDirect2 exchange : " + EXCHANGE_NAME +
                    ", queue : " + queueName + ", BindRoutingKey : " + severity);

        }
        System.out.println("ReceiveLogsDirect2 [*] Waiting for messages. To exit press CTRL + C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

运行结果如下:

由图可知,生产者发出的消息,根据不同的路由,发送到不同的队列,进而被不同的消费者接收。

先跑消费者程序,在跑生产者程序。否则,生产者的消息到达交换器之后,如果没有队列连上交换器, 则消息被直接丢弃。

注意:

  1. Bindings can take an extra routingKey parameter. To avoid the confusion with a basic_publish,parameter we're going to call it a binding key.
    binding key和routing key是一回事,为了避免概念重复,channel.queueBind时叫binding key, channel.basicPublish时叫routing key。

  2. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
    direct交换器的路由规则很简单,消息会路由到binding key与routing key相同的队列。

上一篇下一篇

猜你喜欢

热点阅读