SpringBoot整合中间件RabbitMQ(六)

2019-05-06  本文已影响0人  Invi

使用AmqpAdmin创建,删除Queue,Exchanges,Binding。

1.注入 AmqpAdmin

    @Autowired
    AmqpAdmin amqpAdmin;

AmqpAdmin是由RabbitAutoConfiguration 自动注入的。

Path:org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration

2.创建交换器Exchange

比如创建:DirectExchange

调用amqpAdmin的declareExchange方法。

package com.invi;

import com.invi.bean.Book;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {

    //使用*AmqpAdmin*创建,删除Queue,Exchanges,Binding。

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange() {
        DirectExchange directExchange = new DirectExchange("AmqpAdmin-DirectExchange");
        amqpAdmin.declareExchange(directExchange);
    }
}

amqpAdmin.declareXXXX() 都是创建组件的方法哦。

接口: org.springframework.amqp.core.Exchange

实现类:

  • AbstractExchange (org.springframework.amqp.core)
    • DirectExchange (org.springframework.amqp.core)
    • FanoutExchange (org.springframework.amqp.core)
    • CustomExchange (org.springframework.amqp.core)
    • TopicExchange (org.springframework.amqp.core)
    • HeadersExchange (org.springframework.amqp.core)

DirectExchange 可以通过构造方法定制:

org.springframework.amqp.core.DirectExchange#DirectExchange(java.lang.String)

package org.springframework.amqp.core;

import java.util.Map;

public class DirectExchange extends AbstractExchange {

  public static final DirectExchange DEFAULT = new DirectExchange("");


  public DirectExchange(String name) {
      super(name);
  }

  public DirectExchange(String name, boolean durable, boolean autoDelete) {
      super(name, durable, autoDelete);
  }

  public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
      super(name, durable, autoDelete, arguments);
  }

  @Override
  public final String getType() {
      return ExchangeTypes.DIRECT;
  }

}

运行测试方法:

Name Type Features Message rate in Message rate out
(AMQP default) direct D
AmqpAdmin-DirectExchange direct D
amq.direct direct D
amq.fanout fanout D
amq.headers headers D
amq.match headers D
amq.rabbitmq.trace topic D I
amq.topic topic D
exchange.direct direct D 0.00/s 0.00/s

发现已经创建好新的Exchange:AmqpAdmin-DirectExchange ,类型 direct !

3.创建队列Queue

编码:


    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createQueue() {
        //通过构造方法 定制队列属性
        Queue queue = new Queue("AmqpAdmin-Queue", true);
        amqpAdmin.declareQueue(queue);
    }

Queue可以通过构造方法定制:

package org.springframework.amqp.core;

import java.util.Map;

import org.springframework.util.Assert;

public class Queue extends AbstractDeclarable {

  private final String name;

  private final boolean durable;

  private final boolean exclusive;

  private final boolean autoDelete;

  private final java.util.Map<java.lang.String, java.lang.Object> arguments;

 
  public Queue(String name) { this(name, true, false, false); }

 
  public Queue(String name, boolean durable) { this(name, durable, false, false, null); }
    

  /**
   * 构造一个新的队列,给定一个名称、持久性、排他性和自动删除标志。
     * @param name 指定队列的名称。
     * @param durable 为true如果我们声明一个耐久队列(该队列将在服务器重启后存活)
     * @param exclusive 为true如果我们声明了一个独占队列(队列只会被声明者使用连接)
     * @param autoDelete为true 如果服务器在队列不再使用时应该删除队列, 
   */
  public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)  {
      this(name, durable, exclusive, autoDelete, null);
  }

  ......

}


运行测试方法:

Overview Messages Message rates +/-
Name Features State Ready Unacked Total incoming deliver / get ack
AmqpAdmin-Queue D idle 0 0 0
unionpaysmart D idle 1 0 1 0.00/s 0.00/s 0.00/s
unionpaysmart.news D idle 1 0 1 0.00/s 0.00/s 0.00/s
invi D idle 1 0 1 0.00/s 0.00/s 0.00/s
invi.emps D idle 1 0 1 0.00/s 0.00/s 0.00/s
invi.idea D idle 1 0 1 0.00/s 0.00/s 0.00/s
invi.news D idle 0 0 0 0.00/s 0.00/s 0.00/s

队列 AmqpAdmin-Queue 已经被创建。

4.将交换器和队列绑定。

编码:


    @Autowired
    AmqpAdmin amqpAdmin; 

    @Test
    public void createBinding() {
        Binding binding
                = new Binding("AmqpAdmin-Queue", Binding.DestinationType.QUEUE, "AmqpAdmin-DirectExchange", "AmqpAdmin-routingKey", null);
        amqpAdmin.declareBinding(binding);
    }

构造Binding:

  • String destination 目的地

    • 和那个队列绑定:AmqpAdmin-Queue ,......
  • DestinationType destinationType 目的地类型( QUEUE, EXCHANGE;)

    • Binding.DestinationType.QUEUE
    • Binding.DestinationType.EXCHANGE
  • String exchange 交换器名

  • String routingKey 路由键 自定义名字

  • Map<String, Object> arguments 参数头信息 ,没有的话 写NULL

源码Binding类:

package org.springframework.amqp.core;

import java.util.Map;

public class Binding extends AbstractDeclarable {

    public enum DestinationType {
        QUEUE, EXCHANGE;
    }

    private final String destination;

    private final String exchange;

    private final String routingKey;

    private final Map<String, Object> arguments;

    private final DestinationType destinationType;

    public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
            Map<String, Object> arguments) {
        this.destination = destination;
        this.destinationType = destinationType;
        this.exchange = exchange;
        this.routingKey = routingKey;
        this.arguments = arguments;
    }

    public String getDestination() {
        return this.destination;
    }

    public DestinationType getDestinationType() {
        return this.destinationType;
    }

    public String getExchange() {
        return this.exchange;
    }

    public String getRoutingKey() {
        return this.routingKey;
    }

    public Map<String, Object> getArguments() {
        return this.arguments;
    }

    public boolean isDestinationQueue() {
        return DestinationType.QUEUE.equals(this.destinationType);
    }

    @Override
    public String toString() {
        return "Binding [destination=" + this.destination + ", exchange=" + this.exchange + ", routingKey="
                    + this.routingKey + "]";
    }

}

Exchange: AmqpAdmin-DirectExchange 默认

... no bindings ...

运行测试方法:

To Routing key Arguments
AmqpAdmin-Queue AmqpAdmin-routingKey

至此已经完成了:AmqpAdmin-DirectExchange 和 AmqpAdmin-Queue的绑定。

上一篇下一篇

猜你喜欢

热点阅读