RabbitMQ Shovel

2019-06-19  本文已影响0人  PC_Repair

与 Federation 具备的数据转发功能类似, Shovel 能够可靠、持续地从一个Broker 中的队列(作为源端,即source )拉取数据并转发至另一个Broker 中的交换器(作为目的端,即destination )。
作为源端的队列和作为目的端的交换器可以同时位于同一个 Broker 上,也可以位于不同的 Broker 上。

Shovel 的原理

Shovel的结构.PNG

==通常情况下,使用 Shovel 时配置队列作为源端,交换器作为目的端==,如上图所示。

Shovel 的使用

Shovel 插件默认也在 RabbitMQ 的发布包中,执行 rabbitmq-plugins enable rabbitmq_shovel命令可以开启 Shovel 功能。

开启 rabbitmq shovel management 插件之后, 在 RabbitMQ 的管理界面中" Admin "的右侧会多出"Shovel Status" 和" Shovel Management " 两个 Tab 页。rabbitmq-plugins enable rabbitmq_shovel_management

开启shovel_management.PNG

Shovel 既可以部署在源端,也可以部署在目的端。有两种方式可以部署 Shovel:

RabbitMQ Web 管理界面配置:

add_shovel.PNG

注意:URI 的 写法,%2f 用来转义

amqp://username:passwd@IP:端口/%2fVhost

测试代码:

public class RabbitMQSender {
    private final static String QUEUE_NAME = "shovel.queue1";
    private final static String DUHFMQ01 = "10.224.162.189";
    
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("jiaflu");
        factory.setPassword("123456");
        factory.setVirtualHost("/vhost_jiaflu");

        // 创建连接
        try {
            Address[] address = new Address[]{new Address(DUHFMQ01)};
            Connection connection = factory.newConnection(address);
            // 创建信息管道
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            String message;

            for (int i = 0; i < 10; i++) {
                message = System.currentTimeMillis() + " hello " + i;
                System.out.println(i + " send " + message);
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                Thread.sleep(30);
            }
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e){
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class RabbitMQReceiver {
    private final static String QUEUE_NAME = "shovel.queue1";
    private final static String EXCHANGE_NAME = "shovel.test.exchange1";
    private final static String ROUTING_KEY = "shovel.test.exchange";
    private final static String DUHFMQ01 = "10.225.20.237";
    private final static String DUHFMQ02 = "10.225.20.231";

    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setUsername("jiaflu");
        factory.setPassword("123456");
        factory.setVirtualHost("/vhost_jiaflu");

        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(2);

        // 创建连接
        try {
            Address[] address = new Address[]{new Address(DUHFMQ01), new Address(DUHFMQ02)};
            Connection connection = factory.newConnection(address);
            // 创建信息管道
            Channel channel = connection.createChannel();
            // queue
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, false, null);
            // bind
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
            System.out.println("Queue Receiver Start!");

            // 定义一个消费者
            Consumer consumer = new DefaultConsumer(channel) {
                // 消息到达 触发方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println("Recv msg: " + msg);
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            boolean autoAck = true;
            while (true) {
                channel.basicConsume(QUEUE_NAME, autoAck, consumer);
                break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e){
            e.printStackTrace();
        }
    }
}

配置双向 shovel 测试:

总结

案例:消息堆积的治理

当某个队列中的消息堆积严重时,比如超过某个设定的阈值,就可以通过 Shovel 将队列中的消息移交给另一个集群。

消息堆积的治理.PNG
上一篇 下一篇

猜你喜欢

热点阅读