RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列

2020-08-20  本文已影响0人  thinking2019

​---实践是检验真理的唯一标准---

yml参数配置

这次我使用的是RabbitTemplate

rabbitmq:
    host: 192.168.225.136
    port: 5672
    username: thinking
    password: 123
    virtual-host: host1
    publisher-returns: true
    # 事务模式下这行需要删除
    publisher-confirm-type: correlated
    template:
      # 找不到路由规则的消息 是否保留
      mandatory: true

为什么Template不需要定义configuration文件来接收yml文件的参数?

这是个常识问题,我这里做个记录。。。

我都能忘记昨天吃了东西的,好在我喜欢做笔记。。。

springboot中何时加载Template,可以仔细看看自动装配注解:EnableAutoConfiguration
这类Template模板的初始化有个Properties文件,不如:
RabbitProperties
RedisProperties
方法中注解:ConfigurationProperties  指定了默认取得yml格式内容
    至于具体的属性可以找set方法

我们的demo都是基于RabbitTemplate来写。。。

初始化数据

通过枚举ExchangeEnum、QueueEnum、BindingEnum动态维护和创建
1.初始化交换机

@Bean("createExchange")
public Object createExchange(RabbitAdmin rabbitAdmin) {
    // 遍历交换机枚举
    ExchangeEnum.toList().forEach(exchangeEnum -> {
        // 根据交换机模式 生成不同的交换机
        switch (exchangeEnum.getType()) {
            case fanout:
                rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
            case topic:
                rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
            case direct:
                rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
        }
    });
    return null;
}

2.初始化队列

@Bean("createQueue")
public Object createQueue(RabbitAdmin rabbitAdmin) {
    // 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
    QueueEnum.toList().forEach(queueEnum -> {
        rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),
                queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()));
    });
    return null;
}

3.交换机和队列绑定

@Bean("createBinding")
public Object createBinding(RabbitAdmin rabbitAdmin) {
    // 遍历队列枚举 将队列绑定到指定交换机
    BindingEnum.toList().forEach(bindingEnum -> {
        // 交换机
        ExchangeEnum exchangeEnum = bindingEnum.getExchangeEnum();
        // queue
        QueueEnum queueEnum = bindingEnum.getQueueEnum();
        // 绑定
        rabbitAdmin.declareBinding(new Binding(
                // queue名称
                queueEnum.getName(),
                Binding.DestinationType.QUEUE,
                // exchange名称
                exchangeEnum.getExchangeName(),
                // queue的routingKey
                queueEnum.getRoutingKey(),
                // 绑定的参数
                bindingEnum.getArguments()));
    });
    return null;
}

延迟队列

1.定义队列

/**
 * 超时队列---不需要定义RabbitListener方法
 */
deal_queue("deal_queue", "deal.queue", true, false, false, dealParams()),
/**
 * 超时接收队列
 */
reply_queue("reply_queue", "reply.queue", true, false, false, null),
public static Map<String, Object> dealParams(){
      // reply_to 队列
      Map<String,Object> map = new HashMap<>();
      //设置消息的过期时间 单位毫秒
      map.put("x-message-ttl",10000);
      //设置附带的死信交换机
      map.put("x-dead-letter-exchange","reply_exchange");
      //指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
      map.put("x-dead-letter-routing-key","reply.queue");
      return map;
  }

2.定义交换机

/**
 * 超时交换机
 */
deal_exchange("deal_exchange", ExchangeTypeEnum.topic, true, false),
/**
 * 超时接收交换机
 */
reply_exchange("reply_exchange", ExchangeTypeEnum.topic, true, false),

3.交换机和队列绑定

deal_binding(ExchangeEnum.deal_exchange, QueueEnum.deal_queue, null),
reply_binding(ExchangeEnum.reply_exchange, QueueEnum.reply_queue, null)

4.不定义超时队列的@RabbitListener,只定义超时接收队列的@RabbitListener

@RabbitListener(queues = {"reply_queue"})
@RabbitHandler
public void reply_queue(Message message, Channel channel) throws Exception {
    System.err.println("消费端-reply: " + new String(message.getBody(), "UTF-8"));
    Long deliveryTag = message.getMessageProperties().getDeliveryTag();
    channel.basicAck(deliveryTag, false);
}

测试:

/**
 * 延迟队列测试
 */
public void deal_queue_test() {
    ExchangeEnum exchangeEnum = BindingEnum.deal_binding.getExchangeEnum();
    QueueEnum queueEnum = BindingEnum.deal_binding.getQueueEnum();
    // 消息
    String message = "11111111111111111111111111111111111111";
    MessageProperties messageProperties = getMessageProperties();
    // 发送
    rabbitTemplate.convertSendAndReceive(
            exchangeEnum.getExchangeName(),
            queueEnum.getRoutingKey(),
            new Message(message.getBytes(), messageProperties));
}

异步队列

1.AsyncRabbitTemplate定义

/**
 * 异步队列
 * @param rabbitTemplate
 * @return
 */
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
    AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
    asyncRabbitTemplate.setReceiveTimeout(50000);
    return asyncRabbitTemplate;
}

2.测试

public void async() {
    System.err.println("---------------async--------------start---------");
    AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("reply_exchange", "reply.queue", "123123123");
    // 配置下面代码时 如果 队列监听中没有返回值时会报错
    future.addCallback(new ListenableFutureCallback<Object>() {
        @Override
        public void onFailure(Throwable ex) {
            ex.printStackTrace();
        }
        @Override
        public void onSuccess(Object result) {
            System.out.println("回调收到结果=> " + result);
        }
    });
    System.err.println("---------------async--------------end---------");
}

3.监听方法

@RabbitListener(queues = {"async_queue"})
@RabbitHandler
public Object async_queue(Message message, Channel channel) throws Exception {
    System.err.println("消费端-async: " + new String(message.getBody(), "UTF-8"));
    return "ok";
}

Java api

1.消息回退:

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

long deliveryTag:消息唯一标识,这是RabbitMQ自动生成的,不需要人为管理,只需要从message.getMessageProperties().getDeliveryTag() 就可以获得。
boolean multiple:是否批量退回,不开启就使用false,开启批量退回需要增加自己的业务判断逻辑(比如:攒够几条再批量回退,或者设置等待间隔等等)
boolean requeue:是否退回到消息队列,退回就使用true,就是交给其他消费者处理。

2.拒绝消息

void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的
requeue:表示如何处理这条消息,为true表示重新放入RabbitMQ的发送队列中,为false表示通知RabbitMQ销毁该消息

3.确认ack

void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:该消息的index
multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。

4.创建一个队列

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
​
durable:true、false true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列

5.启动一个消费者,并返回服务端生成的消费者标识

/**
 * queue:队列名
 * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
 * consumerTag:客户端生成的一个消费者标识
 * nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
 * exclusive: 如果是单个消费者,则为true
 * arguments:消费的一组参数
 * deliverCallback: 当一个消息发送过来后的回调接口
 * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
 * shutdownSignalCallback: 当channel/connection 关闭后回调
 */
channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});

6.取消消费者订阅

/**
* 取消消费者对队列的订阅关系
* consumerTag:服务器端生成的消费者标识
**/
void basicCancel(String consumerTag)

7.主动拉取队列中的一条消息

/**
 * 从消息队列中取出第一条消息;整个方法的执行过程是首先消费队列,然后检索第一条消息,然后再取消订阅
 */
GetResponse response = channel.basicGet(QUEUE_NAME, true);
System.out.println("消费者接收到的消息是:"+new String(response.getBody(), "UTF-8"));

参数介绍

1.队列参数

x-dead-letter-exchange 死信交换机
x-dead-letter-routing-key 死信消息重定向路由键
x-expires 队列在指定毫秒数后被删除
x-ha-policy 创建HA队列
x-ha-nodes HA队列的分布节点
x-max-length 队列的最大消息数
x-message-ttl 毫秒为单位的消息过期时间,队列级别
x-max-priority 最大优先值为255的队列优先排序功能

2.消息参数

content-type 消息体的MIME类型,如application/json
content-encoding 消息的编码类型
message-id 消息的唯一性标识,由应用进行设置
correlation-id 一般用做关联消息的message-id,常用于消息的响应
timestamp 消息的创建时刻,整形,精确到秒

完整项目地址在微信公众中,谢谢大家支持

Java技术学习笔记
上一篇 下一篇

猜你喜欢

热点阅读