SpringBoot集成RabbitMQ-动态注入Bean方式
2018-07-18 本文已影响62人
七岁能杀猪
实现Direct,Fanout,Topic和死信转发方式实现的延迟队列
一个让处女座程序员很难受的问题:
每次申明一个队列,都需要用@Bean注解在config类里面显式的往容器里面注入一个Queue Bean和Binding Bean,十几个队列下来,那场面简直不能忍.
怎么解决呢,思路:
通过遍历枚举的方式,统一往spring容器中注入bean.废话不多说,上代码
一 使用场景说明
1.Direct
根据routekey精确匹配消费,只消费一次
2.Fanout
广播消息队列,同交换机内的所有消费者,都接收到消息
3.Topic
支持模糊匹配,可匹配到多个.配合AnonymousQueue队列可实现集群内多点同一业务集群消费.如:修改集群内所有应用内存中配置
4.TTL
延迟队列,实现消息延迟指定时间消费
二 关键代码
- 配置类:
import com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author onlinever
* @date 2018/09/06
*/
@Service
public class RabbitQueueBeanRegister implements BeanDefinitionRegistryPostProcessor, ApplicationContextAware {
private ApplicationContext applicationContext;
private BeanDefinitionRegistry beanDefinitionRegistry;
private String adapterSuffix = "Adapter";
private Map<RabbitQueueEnum, Queue> topicQueues = Maps.newHashMap();
private List<TopicConsumer> topicConsumers;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
this.beanDefinitionRegistry = beanDefinitionRegistry;
//声明交换机
declareExchange();
//声明队列和绑定
declareQueueAndBinding();
//奇怪的执行顺序
if (haveTopicQueue()) {
declareTopicMessageListenerAdapter();
declareTopicMessageListenerContainer();
}
}
private boolean haveTopicQueue() {
try {
topicConsumers = new ArrayList<>(applicationContext.getBeansOfType(TopicConsumer.class).values());
return !topicConsumers.isEmpty();
} catch (Exception e) {
System.out.println(e.getMessage());
return false;
}
}
/**
* 声明交换机
*/
private void declareExchange() {
for (RabbitExchangeEnum rabbitExchangeEnum : RabbitExchangeEnum.values()) {
switch (rabbitExchangeEnum.getRabbitExchangeTypeEnum()) {
case FANOUT_QUEUE:
beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () -> (FanoutExchange) ExchangeBuilder
.fanoutExchange(rabbitExchangeEnum.getExchangeName())
.durable(true)
.build()).getBeanDefinition());
break;
case TOPIC_QUEUE:
beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(TopicExchange.class, () -> (TopicExchange) ExchangeBuilder
.topicExchange(rabbitExchangeEnum.getExchangeName())
.durable(true)
.build()).getBeanDefinition());
break;
default:
beanDefinitionRegistry.registerBeanDefinition(rabbitExchangeEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class, () -> (DirectExchange) ExchangeBuilder
.directExchange(rabbitExchangeEnum.getExchangeName())
.durable(true)
.build()).getBeanDefinition());
break;
}
}
}
/**
* 声明队列和绑定
*/
private void declareQueueAndBinding() {
String bindingSuffix = "Binding";
for (RabbitQueueEnum rabbitQueueEnum : RabbitQueueEnum.values()) {
//注册所有队列
beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName(), BeanDefinitionBuilder.genericBeanDefinition(Queue.class, () -> {
Queue queue;
switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {
case TTL_QUEUE:
queue = QueueBuilder
.durable(rabbitQueueEnum.getRouteKey())
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", rabbitQueueEnum.getRabbitQueueEnum().getExchangeName())
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", rabbitQueueEnum.getRabbitQueueEnum().getRouteKey())
.build();
break;
case TOPIC_QUEUE:
queue = new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy(StringUtils.getTopicQueueNamePrefix(rabbitQueueEnum.getRouteKey())));
topicQueues.put(rabbitQueueEnum, queue);
break;
default:
queue = new Queue(rabbitQueueEnum.getRouteKey());
break;
}
return queue;
}).getBeanDefinition());
//注册队列与交换机的绑定
switch (rabbitQueueEnum.getExchangeEnum().getRabbitExchangeTypeEnum()) {
case FANOUT_QUEUE:
beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
.bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
.to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), FanoutExchange.class))).getBeanDefinition());
break;
case NORMAL_QUEUE:
case TTL_QUEUE:
beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
.bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
.to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), DirectExchange.class))
.with(rabbitQueueEnum.getRouteKey())).getBeanDefinition());
break;
case TOPIC_QUEUE:
beanDefinitionRegistry.registerBeanDefinition(rabbitQueueEnum.getBeanName() + bindingSuffix, BeanDefinitionBuilder.genericBeanDefinition(Binding.class, () -> BindingBuilder
.bind(applicationContext.getBean(rabbitQueueEnum.getBeanName(), Queue.class))
.to(applicationContext.getBean(rabbitQueueEnum.getExchangeEnum().getBeanName(), TopicExchange.class))
.with(StringUtils.getTopicQueueRoute(rabbitQueueEnum.getRouteKey()))).getBeanDefinition());
break;
default:
break;
}
}
}
/**
* 声明Topic消息监听适配器
*/
private void declareTopicMessageListenerAdapter() {
topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix,
BeanDefinitionBuilder.genericBeanDefinition(MessageListenerAdapter.class, () -> new MessageListenerAdapter(topicConsumer)).getBeanDefinition()));
}
/**
* 声明Topic消息监听容器
*/
private void declareTopicMessageListenerContainer() {
String containerSuffix = "Container";
topicConsumers.forEach(topicConsumer -> beanDefinitionRegistry.registerBeanDefinition(topicConsumer.getQueueEnum().getBeanName() + containerSuffix,
BeanDefinitionBuilder.genericBeanDefinition(SimpleMessageListenerContainer.class, () -> {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setQueues(topicQueues.get(topicConsumer.getQueueEnum()));
container.setConnectionFactory(applicationContext.getBean("rabbitConnectionFactory", ConnectionFactory.class));
container.setMessageListener(applicationContext.getBean(topicConsumer.getQueueEnum().getBeanName() + adapterSuffix));
container.setRabbitAdmin(applicationContext.getBean(RabbitAdmin.class));
return container;
}).getBeanDefinition()));
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
- 枚举类
2.1 交换机类型枚举
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
/**
* @author onlinever
* @date 2018/09/06
*/
public enum RabbitExchangeTypeEnum {
/**
* 死信转发方式延迟队列
*/
TTL_QUEUE(1, DirectExchange.class),
/**
* 正常队列
*/
NORMAL_QUEUE(2, DirectExchange.class),
/**
* 广播队列
*/
FANOUT_QUEUE(3, FanoutExchange.class),
/**
* topic队列
*/
TOPIC_QUEUE(4, TopicExchange.class);
/**
* 队列routeKey
*/
private int index;
/**
* 交换机class
*/
private Class exchangeClazz;
RabbitExchangeTypeEnum(int index, Class exchangeClazz) {
this.index = index;
this.exchangeClazz = exchangeClazz;
}
public int getIndex() {
return index;
}
public Class getExchangeClazz() {
return exchangeClazz;
}
}
2.2 交换机枚举
/**
* @author onlinever
* @date 2018/09/06
*/
public enum RabbitExchangeEnum {
/**
* rabbit交换机名称
* 默认一个应用设置一个交换机
* exchange.{0}.{1}
* 0: 交换机类型 direct、topic、fanout、headers
* 1: 应用名称
*/
DIRECT_EXCHANGE("directExchange", "exchange.direct.gateway", RabbitExchangeTypeEnum.NORMAL_QUEUE),
FANOUT_EXCHANGE("fanoutExchange", "exchange.fanout.gateway", RabbitExchangeTypeEnum.FANOUT_QUEUE),
TOPIC_EXCHANGE("topicExchange", "exchange.topic.gateway", RabbitExchangeTypeEnum.TOPIC_QUEUE),;
/**
* 交换机beanName
*/
private String beanName;
/**
* 交换机key
*/
private String exchangeName;
/**
* 交换机类型
*/
private RabbitExchangeTypeEnum rabbitExchangeTypeEnum;
RabbitExchangeEnum(String beanName, String exchangeName, RabbitExchangeTypeEnum rabbitExchangeTypeEnum) {
this.beanName = beanName;
this.exchangeName = exchangeName;
this.rabbitExchangeTypeEnum = rabbitExchangeTypeEnum;
}
public String getExchangeName() {
return exchangeName;
}
public String getBeanName() {
return beanName;
}
public RabbitExchangeTypeEnum getRabbitExchangeTypeEnum() {
return rabbitExchangeTypeEnum;
}
}
2.3 队列枚举
/**
* @author onlinever
* @date 2018/09/06
*/
public enum RabbitQueueEnum {
;
/**
* 队列BeanName
*/
private String beanName;
/**
* 队列routeKey
*/
private String routeKey;
/**
* 交换机
*/
private RabbitExchangeEnum exchangeEnum;
/**
* 死信转发到队列
*/
private RabbitQueueEnum rabbitQueueEnum;
RabbitQueueEnum(String beanName, String routeKey, RabbitExchangeEnum exchangeEnum, RabbitQueueEnum rabbitQueueEnum) {
this.beanName = beanName;
this.routeKey = routeKey;
this.exchangeEnum = exchangeEnum;
this.rabbitQueueEnum = rabbitQueueEnum;
}
public String getRouteKey() {
return routeKey;
}
public RabbitExchangeEnum getExchangeEnum() {
return exchangeEnum;
}
public String getExchangeName() {
return exchangeEnum.getExchangeName();
}
public String getBeanName() {
return beanName;
}
public RabbitQueueEnum getRabbitQueueEnum() {
return rabbitQueueEnum;
}
}
- Topic消费者接口
/**
* topic队列消费者
*
* @author onlinever
* @date 2018/8/17
*/
public interface TopicConsumer {
/**
* 消费的队列
*
* @return 队列
*/
RabbitQueueEnum getQueueEnum();
/**
* 具体消费者的实现
*
* @param message 消息
*/
void handleMessage(String message);
}
- 其他消费者使用@RabbitListener方式