RabbitMQ官方教程4--Routing

2020-05-29  本文已影响0人  亼珏

说明

      在第三个教程中我们创建了一个简单的日志系统,可以实现将日志消息广播到多个接收者中。在本教程中我们将向其中添加功能:仅订阅消息的一个子集。比如能够只将关键的错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有的日志消息。

绑定

      在之前的教程中已经创建过绑定了,代码如下:channel.queueBind(queueName, EXCHANGE_NAME, "");。绑定是交换机和队列之间的关系,可以简单理解为队列对来自这个交换机的消息感兴趣。绑定可以提供额外的routingKey参数,为了避免与basic_publish参数混淆,我们将其称为binding_key。创建一个带有key的绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

      绑定key的意义取决于交换机的类型。像之前使用的fanout类型的交换机就只是简单的忽略了它的值。

直接交换机(Direct exchange)

      在上一个教程中,日志系统将所有的消息广播给所有的消费者,我们希望扩展该功能,允许根据消息的严重性对消息进行过滤。比如希望将日志消息写入磁盘的程序只接收严重的错误,而不会记录warning或者info的日志在磁盘上。之前使用的扇出(fanout)类型的交换机并没有给我们带来很大的灵活性,它只能进行无意识的广播。
      我们使用direct交换机来替代fanout。direct交换机背后的路由算法很简单--消息传递到其绑定键(binding key)与消息的路由键(route key)完全匹配的队列。


      从上图中我们可以看出,direct交换机X绑定了两个队列:一个队列使用orange绑定key,另一个队列有两个绑定key分别是blankgreen。在这种设置下,带有orange路由key的消息会发布到Q1队列中,带有blankgreen路由key的消息会发布到Q2队列中,其他的消息就会被丢弃。

多重绑定

      使用相同的绑定键(binding key)来绑定多个队列是完全合法的。


      在上面的示例中,使用blackXQ1之间添加了绑定,也在XQ2之间添加了绑定。这种情况下,direct交换机和fanout交换机类似,都会将消息广播到所有匹配的队列中。带有black路由键的消息也会被同时传递给Q1Q2两个队列。

完整的代码

生产者

      将消息发送到direct的交换机中,将日志的严重级别设置为routing key

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }
  }
  //..
}

消费者

      接收信息的方式与之前相同,除了需要为每一个感兴趣的严重级别设置一个新的绑定。

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}
上一篇 下一篇

猜你喜欢

热点阅读