RabbitMQ官方教程4--Routing
说明
在第三个教程中我们创建了一个简单的日志系统,可以实现将日志消息广播到多个接收者中。在本教程中我们将向其中添加功能:仅订阅消息的一个子集。比如能够只将关键的错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有的日志消息。
绑定
在之前的教程中已经创建过绑定了,代码如下:channel.queueBind(queueName, EXCHANGE_NAME, "");
。绑定是交换机和队列之间的关系,可以简单理解为队列对来自这个交换机的消息感兴趣。绑定可以提供额外的routingKey
参数,为了避免与basic_publish
参数混淆,我们将其称为binding_key
。创建一个带有key的绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定key的意义取决于交换机的类型。像之前使用的fanout类型的交换机就只是简单的忽略了它的值。
直接交换机(Direct exchange)
在上一个教程中,日志系统将所有的消息广播给所有的消费者,我们希望扩展该功能,允许根据消息的严重性对消息进行过滤。比如希望将日志消息写入磁盘的程序只接收严重的错误,而不会记录warning或者info的日志在磁盘上。之前使用的扇出(fanout)类型的交换机并没有给我们带来很大的灵活性,它只能进行无意识的广播。
我们使用direct
交换机来替代fanout
。direct交换机背后的路由算法很简单--消息传递到其绑定键(binding key)与消息的路由键(route key)完全匹配的队列。

从上图中我们可以看出,
direct
交换机X
绑定了两个队列:一个队列使用orange
绑定key,另一个队列有两个绑定key分别是blank
和green
。在这种设置下,带有orange
路由key的消息会发布到Q1
队列中,带有blank
和green
路由key的消息会发布到Q2
队列中,其他的消息就会被丢弃。
多重绑定
使用相同的绑定键(binding key)来绑定多个队列是完全合法的。

在上面的示例中,使用
black
在X
和Q1
之间添加了绑定,也在X
和Q2
之间添加了绑定。这种情况下,direct
交换机和fanout
交换机类似,都会将消息广播到所有匹配的队列中。带有black
路由键的消息也会被同时传递给Q1
和Q2
两个队列。
完整的代码

生产者
将消息发送到direct
的交换机中,将日志的严重级别设置为routing key
。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
//..
}
消费者
接收信息的方式与之前相同,除了需要为每一个感兴趣的严重级别设置一个新的绑定。
import com.rabbitmq.client.*;
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String severity : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}