SpringBoot整合中间件RabbitMQ(六)
使用AmqpAdmin创建,删除Queue,Exchanges,Binding。
1.注入 AmqpAdmin
@Autowired
AmqpAdmin amqpAdmin;
AmqpAdmin是由RabbitAutoConfiguration 自动注入的。
Path:org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
2.创建交换器Exchange
比如创建:DirectExchange
调用amqpAdmin的declareExchange方法。
package com.invi;
import com.invi.bean.Book;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {
//使用*AmqpAdmin*创建,删除Queue,Exchanges,Binding。
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange() {
DirectExchange directExchange = new DirectExchange("AmqpAdmin-DirectExchange");
amqpAdmin.declareExchange(directExchange);
}
}
amqpAdmin.declareXXXX() 都是创建组件的方法哦。
接口: org.springframework.amqp.core.Exchange
实现类:
- AbstractExchange (org.springframework.amqp.core)
- DirectExchange (org.springframework.amqp.core)
- FanoutExchange (org.springframework.amqp.core)
- CustomExchange (org.springframework.amqp.core)
- TopicExchange (org.springframework.amqp.core)
- HeadersExchange (org.springframework.amqp.core)
DirectExchange 可以通过构造方法定制:
org.springframework.amqp.core.DirectExchange#DirectExchange(java.lang.String)
package org.springframework.amqp.core; import java.util.Map; public class DirectExchange extends AbstractExchange { public static final DirectExchange DEFAULT = new DirectExchange(""); public DirectExchange(String name) { super(name); } public DirectExchange(String name, boolean durable, boolean autoDelete) { super(name, durable, autoDelete); } public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { super(name, durable, autoDelete, arguments); } @Override public final String getType() { return ExchangeTypes.DIRECT; } }
运行测试方法:
Name | Type | Features | Message rate in | Message rate out |
---|---|---|---|---|
(AMQP default) | direct | D | ||
AmqpAdmin-DirectExchange | direct | D | ||
amq.direct | direct | D | ||
amq.fanout | fanout | D | ||
amq.headers | headers | D | ||
amq.match | headers | D | ||
amq.rabbitmq.trace | topic | D I | ||
amq.topic | topic | D | ||
exchange.direct | direct | D | 0.00/s | 0.00/s |
发现已经创建好新的Exchange:AmqpAdmin-DirectExchange ,类型 direct !
3.创建队列Queue
编码:
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createQueue() {
//通过构造方法 定制队列属性
Queue queue = new Queue("AmqpAdmin-Queue", true);
amqpAdmin.declareQueue(queue);
}
Queue可以通过构造方法定制:
package org.springframework.amqp.core; import java.util.Map; import org.springframework.util.Assert; public class Queue extends AbstractDeclarable { private final String name; private final boolean durable; private final boolean exclusive; private final boolean autoDelete; private final java.util.Map<java.lang.String, java.lang.Object> arguments; public Queue(String name) { this(name, true, false, false); } public Queue(String name, boolean durable) { this(name, durable, false, false, null); } /** * 构造一个新的队列,给定一个名称、持久性、排他性和自动删除标志。 * @param name 指定队列的名称。 * @param durable 为true如果我们声明一个耐久队列(该队列将在服务器重启后存活) * @param exclusive 为true如果我们声明了一个独占队列(队列只会被声明者使用连接) * @param autoDelete为true 如果服务器在队列不再使用时应该删除队列, */ public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) { this(name, durable, exclusive, autoDelete, null); } ...... }
运行测试方法:
Overview | Messages | Message rates | +/- | |||||
---|---|---|---|---|---|---|---|---|
Name | Features | State | Ready | Unacked | Total | incoming | deliver / get | ack |
AmqpAdmin-Queue | D | idle | 0 | 0 | 0 | |||
unionpaysmart | D | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
unionpaysmart.news | D | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
invi | D | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
invi.emps | D | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
invi.idea | D | idle | 1 | 0 | 1 | 0.00/s | 0.00/s | 0.00/s |
invi.news | D | idle | 0 | 0 | 0 | 0.00/s | 0.00/s | 0.00/s |
队列 AmqpAdmin-Queue 已经被创建。
4.将交换器和队列绑定。
编码:
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createBinding() {
Binding binding
= new Binding("AmqpAdmin-Queue", Binding.DestinationType.QUEUE, "AmqpAdmin-DirectExchange", "AmqpAdmin-routingKey", null);
amqpAdmin.declareBinding(binding);
}
构造Binding:
String destination 目的地
- 和那个队列绑定:AmqpAdmin-Queue ,......
DestinationType destinationType 目的地类型( QUEUE, EXCHANGE;)
- Binding.DestinationType.QUEUE
- Binding.DestinationType.EXCHANGE
String exchange 交换器名
String routingKey 路由键 自定义名字
Map<String, Object> arguments 参数头信息 ,没有的话 写NULL
源码Binding类:
package org.springframework.amqp.core; import java.util.Map; public class Binding extends AbstractDeclarable { public enum DestinationType { QUEUE, EXCHANGE; } private final String destination; private final String exchange; private final String routingKey; private final Map<String, Object> arguments; private final DestinationType destinationType; public Binding(String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments) { this.destination = destination; this.destinationType = destinationType; this.exchange = exchange; this.routingKey = routingKey; this.arguments = arguments; } public String getDestination() { return this.destination; } public DestinationType getDestinationType() { return this.destinationType; } public String getExchange() { return this.exchange; } public String getRoutingKey() { return this.routingKey; } public Map<String, Object> getArguments() { return this.arguments; } public boolean isDestinationQueue() { return DestinationType.QUEUE.equals(this.destinationType); } @Override public String toString() { return "Binding [destination=" + this.destination + ", exchange=" + this.exchange + ", routingKey=" + this.routingKey + "]"; } }
Exchange: AmqpAdmin-DirectExchange 默认
... no bindings ...
运行测试方法:
To | Routing key | Arguments |
---|---|---|
AmqpAdmin-Queue | AmqpAdmin-routingKey |
至此已经完成了:AmqpAdmin-DirectExchange 和 AmqpAdmin-Queue的绑定。