我爱编程RabbitMQ实战Java

RabbitMQ实战(四) - RabbitMQ & S

2019-07-04  本文已影响28人  JavaEdge

0 相关源码

1 你将学到

2 SpringAMQP用户管理组件 - RabbitAdmin

RabbitAdmin 类可以很好的操作 rabbitMQ,在 Spring 中直接进行注入即可

autoStartup 必须设置为 true,否则 Spring 容器不会加载它.

2.1 源码分析

RabbitAdmin 的底层实现

AmqpAdmin

为AMQP指定一组基本的便携式AMQP管理操作

image

ApplicationEventPublisherAware

实现该接口的类,通过函数setApplicationEventPublisher()获得它执行所在的ApplicationEventPublisher。

在这里插入图片描述

ApplicationContextAware

实现该接口的类,通过函数setApplicationContext()获得它执行所在的ApplicationContext。一般用来初始化object

image

InitializingBean

image

若class中实现该接口,在Spring Container中的bean生成之后,自动调用函数afterPropertiesSet()。

因其实现了InitializingBean接口,其中只有一个方法,且在Bean加载后就执行

该功能可以被用来检查是否所有的mandatory properties都设置好

RabbitAdmin借助于 ApplicationContextAware 和 InitializingBean来获取我们在配置类中声明的exchange, queue, binding beans等信息并调用channel的相应方法来声明。

下面是RabbitAdmin中afterPropertiesSet()函数的代码片段。这里在创建connection的时候调用函数initialize()。

于是以此为突破口进行源码分析

最后分别调用函数declareExchanges(),declareQueues(),declareBindings()来声明RabbitMQ Entity

回顾一下消费者配置

1. 设置交换机类型

2. 将队列绑定到交换机

交换机类型:

    FanoutExchange 类型: 将消息分发到所有的绑定队列,无 routingkey 的概念

    HeadersExchange 类型:通过添加属性 key-value 匹配

    DirectExchange :按照 routingkey 分发到指定队列

    TopicExchange : 多关键字匹配
image

SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。很多基于 RabbitMQ 的自制定化后端管控台在进行设置的时候,也是根据这一去实现的


image
image
image
image
image

5 SpringAMQP消息适配器-MessageListenerAdapter消息监听适配器,通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换.
允许监听器方法对消息内容类型进行操作,完全独立于RabbitMQ API

默认情况下,传入Rabbit消息的内容在被传递到目标监听器方法之前被提取,以使目标方法对消息内容类型进行操作以String或者byte类型进行操作,而不是原始Message类型。 (消息转换器)

消息类型转换委托给MessageConverter接口的实现类。 默认情况下,将使用SimpleMessageConverter。 (如果您不希望进行这样的自动消息转换,

那么请自己通过#setMessageConverter MessageConverter设置为null)

如果目标监听器方法返回一个非空对象(通常是消息内容类型,例如String或byte数组),它将被包装在一个Rabbit Message 中,并发送使用来自Rabbit ReplyTo属性或通过#setResponseRoutingKey(String)指定的routingKey的routingKey来传送消息。(使用rabbitmq 来实现异步rpc功能时候会使用到这个属性)。

注意:发送响应消息仅在使用ChannelAwareMessageListener入口点(通常通过Spring消息监听器容器)时可用。 用作MessageListener不支持生成响应消息。

源码分析

image

继承自AbstractAdaptableMessageListener类,实现了MessageListenerChannelAwareMessageListener接口

MessageListenerChannelAwareMessageListener接口的onMessage方法就是具体容器监听队列处理队列消息的方法

image

实操

从源码分析小节中的成员变量,我们可以看出使用MessageListenerAdapter处理器进行消息队列监听处理

也可以通过setQueueOrTagToMethodName方法为不同的队列设置不同的消息处理方法。

MessageListenerAdapteronMessage方法

在生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等Pro

Barista接口: Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一 通道进行发送消息还是从中接收消息

8.4 扩展 - 注解

使用Spring Cloud Stream非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常适合,但是使用SpringCloudStream框架有一个非常大的问题就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题

这个原因是因为SpringCloudStream框架为了和Kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的!这点就是在当前版本Spring Cloud Stream的定位

8.5 实操

Pro

将消息发布到指定目的地是由发布订阅消息模式传递。发布者将消息分类为主题,每个主题由名称标识。订阅方对一个或多个主题表示兴趣。中间件过滤消息,将感兴趣的主题传递给订阅服务器。订阅方可以分组,消费者组是由组ID标识的一组订户或消费者,其中从主题或主题的分区中的消息以负载均衡的方式递送。

Con

9 总结

本文我们学习了Spring AMQP的相关知识,通过实战对RabbitMQ集成Spring有了直观的认识,这样为

我们后续的学习、工作使用都打下了坚实的基础,最后我们整合了SpringBoot与Spring Cloud Stream,更方便更高效的集成到我们的应用服务中去!

参考

SpringAMQP 用户管理组件 RabbitAdmin 以及声明式配置

Spring Boot - RabbitMQ源码分析

SpringAMQP 之 RabbitTemplate

SpringAMQP 消息容器 - SimpleMessageListenerContainer

MessageListenerAdapter详解

SpringAMQP 消息转换器 - MessageConverter

RabbitMQ 与 SpringBoot2.X 整合

Spring Cloud Stream

更多硬核干货请关注JavaEdge公众号

上一篇下一篇

猜你喜欢

热点阅读