spring-boot-starter-amqp 那些文档里没告
2018-09-13 本文已影响46人
黄大海
spring-boot 非常简便,但是文档不全、背后隐藏了大量配置细节、门槛比较高。这就需要阅读源码才能随心所欲得使用。下面配合源码来解释下官方文档中配置背后的原理。
启动配置路径:一个入口拉出一串螃蟹。配置都在这几个类里,之后还会提到。
- RabbitAutoConfiguration
- -> RabbitAnnotationDrivenConfiguration
- -> EnableRabbitConfiguration
- -> @EnableRabbit
- -> RabbitBootstrapConfiguration
首先是AMQP三大件的申明,这段代码有啥用?
@SpringBootApplication
public class Application {
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
...
}
- RabbitAdmin会读取所有的Queue/Exchange/Binding类型的Bean(包括它们的集合),并在启动时向Rabbit Server注册。
- RabbitAdmin#afterPropertiesSet
@Override
public void afterPropertiesSet() {
...
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
initialize();
}
})
...
}
- RabbitAdmin#initialize
public void initialize() {
...
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
@Override
public Object doInRabbit(Channel channel) throws Exception {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
}
});
...
}
- 下一段是注册一个客户端接收端点, 适配任意带有处理逻辑的对象
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
-
SimpleMessageListenerContainer 是一个实现了org.springframework.context.Lifecycle 的自启动类,是客户端接收消息的入口。
- AbstractMessageListenerContainer#start
- -> SimpleMessageListenerContainer#doStart
- -> AsyncMessageProcessingConsumer#run
- -> SimpleMessageListenerContainer#receiveAndExecute
- -> SimpleMessageListenerContainer#doReceiveAndExecute
- ...
- -> AbstractMessageListenerContainer#doInvokeListener
- -> AbstractAdaptableMessageListener#onMessage
... - -> DelegatingInvocableHandler#invoke
- -> InvocableHandlerMethod#invoke
-
InvocableHandlerMethod就是代理了刚才提供的业务逻辑的类了,我们之后再介绍。
-
Spring提供的客户端配置并不简便,一般我们用Rabbit的annotation来配置客户端代码,像这样:
@Component
@RabbitListener(queues = "${queue.name}")
public class Receiver {
@RabbitHandler
public void process(@Payload String body, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
}
}
- 处理这些Annotation的类位于
-
RabbitBootstrapConfiguration#rabbitListenerAnnotationProcessor
-
-> RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitialization
-
过程与例子里类似,也是构造幷启动了一个SimpleMessageListenerContainer
-
- 这里有几个大的问题, RabbitHandler到底如何定义,可以接受哪些参数,返回值如何处理呢?
- 首先一个RabbitListener内是可以定义多个RabbitHandler的,调用者通过payload类型来选择最合适的handler来处理
@Component
@RabbitListener(queues = "${queue.name}")
public class Receiver {
@RabbitHandler
public void process(@Payload String stringBody, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
}
@RabbitHandler
public void process(@Payload Integer intBody) {
}
@RabbitHandler
public void process(@Payload Long longBody) {
}
}
-
这个逻辑流程如下
- DelegatingInvocableHandler#invoke
- DelegatingInvocableHandler#getHandlerForPayload
- DelegatingInvocableHandler#findHandlerForPayload
- DelegatingInvocableHandler #matchHandlerMethod
-
方法可以接受哪些参数?这个是由org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver的实现类处理的
-
argumentResolver.jpeg
-
由实现类的名字可以看出,参数可以接受Header,Message, Payload, Principal等类型
-
实现逻辑流程:
- DelegatingInvocableHandler#invoke
- InvocableHandlerMethod#invoke
- InvocableHandlerMethod#getMethodArgumentValues
- HandlerMethodArgumentResolverComposite#resolveArgument
- HandlerMethodArgumentResolver#resolveArgument
-
哪些Header可以用?参考org.springframework.amqp.support.AmqpHeaders
-
非空的返回值会从客户端发起一个新的消息,可以配合@Sendto或者AmqpHeaders.REPLY_TO使用,这里不再展开。参考AbstractAdaptableMessageListener#handleResult