RabbitMQ Shovel
与 Federation 具备的数据转发功能类似, Shovel 能够可靠、持续地从一个Broker 中的队列(作为源端,即source )拉取数据并转发至另一个Broker 中的交换器(作为目的端,即destination )。
作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker 上,也可以位于不同的 Broker 上。
Shovel 的原理
==通常情况下,使用 Shovel 时配置队列作为源端,交换器作为目的端==,如上图所示。
Shovel 的使用
Shovel 插件默认也在 RabbitMQ 的发布包中,执行 rabbitmq-plugins enable rabbitmq_shovel
命令可以开启 Shovel 功能。
开启 rabbitmq shovel management 插件之后, 在 RabbitMQ 的管理界面中" Admin "的右侧会多出"Shovel Status" 和" Shovel Management " 两个 Tab 页。rabbitmq-plugins enable rabbitmq_shovel_management
Shovel 既可以部署在源端,也可以部署在目的端。有两种方式可以部署 Shovel:
- 静态方式:在
rabbitmq.config
配置文件中设置 - 动态方式:通过 Runtime Parameter 设置
RabbitMQ Web 管理界面配置:
注意:URI 的 写法,%2f
用来转义
amqp://username:passwd@IP:端口/%2fVhost
测试代码:
- RabbitMQSender
public class RabbitMQSender {
private final static String QUEUE_NAME = "shovel.queue1";
private final static String DUHFMQ01 = "10.224.162.189";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setUsername("jiaflu");
factory.setPassword("123456");
factory.setVirtualHost("/vhost_jiaflu");
// 创建连接
try {
Address[] address = new Address[]{new Address(DUHFMQ01)};
Connection connection = factory.newConnection(address);
// 创建信息管道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message;
for (int i = 0; i < 10; i++) {
message = System.currentTimeMillis() + " hello " + i;
System.out.println(i + " send " + message);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Thread.sleep(30);
}
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e){
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- RabbitMQReceiver
public class RabbitMQReceiver {
private final static String QUEUE_NAME = "shovel.queue1";
private final static String EXCHANGE_NAME = "shovel.test.exchange1";
private final static String ROUTING_KEY = "shovel.test.exchange";
private final static String DUHFMQ01 = "10.225.20.237";
private final static String DUHFMQ02 = "10.225.20.231";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setUsername("jiaflu");
factory.setPassword("123456");
factory.setVirtualHost("/vhost_jiaflu");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(2);
// 创建连接
try {
Address[] address = new Address[]{new Address(DUHFMQ01), new Address(DUHFMQ02)};
Connection connection = factory.newConnection(address);
// 创建信息管道
Channel channel = connection.createChannel();
// queue
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, false, null);
// bind
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("Queue Receiver Start!");
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("Recv msg: " + msg);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
while (true) {
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
break;
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e){
e.printStackTrace();
}
}
}
配置双向 shovel 测试:
- 数据在两端之间来会发送
- 当消费时,可以正常消费完,不出现冗余
总结
- 在使用 shovel 插件时,只需要在 source 节点进行配置,destination 节点不需要配置;同理,只需要在 source 节点上使能 shovel 插件,destination 节点无需使能该插件;
- 在 shovel 正常工作时,对于 source 节点来说,增加了一条用于 consumer 的 TCP 连接;对于 destination 节点来说,增加了一条用于 producer 的 TCP 连接,和普通客户端的连接行为没什么不同;
案例:消息堆积的治理
当某个队列中的消息堆积严重时,比如超过某个设定的阈值,就可以通过 Shovel 将队列中的消息移交给另一个集群。
-
情形 1:当检测到当前运行集群 cluster1 中的队列 queue1 中有严重消息堆积,比如通过
/api/queues/vhost/ name
接口获取到队列的消息个数(messages) 超过2 千万或者消息占用大小(messages bytes) 超过10GB 时,就启用 shovel1 将队列 queue1 中的消息转发至备份集群 cluster2 中的队列queue2 。 -
情形 2 :紧随情形1,当检测到队列queue1 中的消息个数低于1 百万或者消息占用大小低于1GB 时就停止shovel1 ,然后让原本队列 queue1 中的消费者慢慢处理剩余的堆积。
-
情形 3:当检测到队列 queue1 中的消息个数低于10 万或者消息占用大小低于100MB时,就开启 shovel2 将队列 queue2 中暂存的消息返还给队列queue1 。
-
情形 4:紧随情形3 ,当检测到队列queuel 中的消息个数超过 1百万或者消息占用大小高于1GB 时就将shovel2 停掉。