工作cap运维

RabbitMQ

2019-06-26  本文已影响0人  冬狮郎
目录:
1、什么是MQ  
2、MQ是干什么用的?
3、MQ衡量标准
4、主流竞品分析
    4.1、ActiveMQ
    4.2、Kafka
    4.3、RocketMQ
5、技术背景知识介绍
    5.1、AMQP高级消息队列协议
    5.2、Erlang语言
6、RabbitMQ
    6.1、RabbitMQ的优势:
    6.2、RabbitMQ的整体架构
    6.3、RabbitMQ的消息流转
    6.4、RabbitMQ各组件功能
    6.5、RabbitMQ的多种Exchange类型
          6.5.1、direct
          6.5.2、fanout
          6.5.3、topic
    6.6、TTL
    6.7、死信队列DLX
    6.8、消费端ACK与NACK
    6.9、生产者Confirm机制
    6.10、Return消息机制
    6.11、消费端自定义监听(推模式和拉模式pull/push)
    6.12、如何保证幂等性
    6.13、如何保证可靠性?
    6.14、消费端如何限流
    6.15、Channel模式和Connection模式
    6.16、消费端的Concurrency和Prefetch模式
    6.17、RabbitMQ集群

什么是MQ?

消息总线(Message Queue),是一种跨进程、异步的通信机制,用于上下游传递消息。由消息系统来确保消息的可靠传递。

MQ是干什么用的?

应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等...

MQ衡量标准

服务性能、数据存储、集群架构

主流竞品分析

当前市面上mq的产品很多,比如RabbitMQ、Kafka、ActiveMQ、ZeroMQ和阿里巴巴捐献给Apache的RocketMQ。甚至连redis这种NoSQL都支持MQ的功能。

ActiveMQ

ActiveMQ是apache出品,最流行的,能力强劲的开源消息总线,并且它一个完全支持JMS规范的消息中间件。其丰富的API、多种集群构建模式使得它成为业界老牌消息中间件,在中小型企业中应用广泛。

但是其性能稍差,在面对高并发的情况下,会出现消息阻塞、堆积、延迟等问题。

默认采用了基于内存的kahaDB进行存储,如果需要保证消息的可靠性,也可以选择关系行数据库进行存储。

集群架构模式如下:


ActiveMQ集群.png

Master-Slave模式:通过zookeeper对主从进行管理,正常情况下,从节点不会提供服务。当主节点出现问题后,zookeeper会高效的将主节点下掉,从节点来提供服务。

NetWork模式:两套主从Master-Slave节点。由网络联通,将其变为分布式的集群架构。

Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点就是基于Pull的模式来处理消息消费追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。能够支持廉价的服务器上以每秒100k条数据的吞吐量。(有ack机制,可以保证不丢失,不能保证不重复。)

高效的读写基于操作系统低层的Page Cache。仅仅使用内存管理,不存在内存和磁盘之间的IO操作。

集群架构模式如下:

Kafka集群.png

通过replicate进行节点间数据的复制,尽量保证数据的可靠性。

RocketMQ

RocketMQ是阿里开源的消息中间件,目前也已经孵化为Apache顶级项目,它是纯Java开发,具有高吞吐量、高可靠性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,它对消息的可靠传输以及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binlog分发等场景。

在2.0版本,RocketMQ集群也是通过Zookeeper进行管理。在3.0之后,放弃Zookeeper,使用NameServer进行集群的管理和协调。

能够保障消息的顺序消费,提供了丰富的消息拉取等处理模式,消费者可以高效进行水平扩展,能够承载上亿级别数据量级。

可以支持多种集群架构模式:Master-Slave模式、双Master-Slave模式、多主多从模式等等。

支持多种刷盘策略:同步双写、异步复制。借助了零拷贝等技术。

集群架构模式如下:

RocketMQ集群.png 收费版本集群

技术背景知识介绍

AMQP高级消息队列协议

AMQP(Advanced Message Queuing Protocol)高级消息队列协议:高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言灯条件的限制。

AMQP中消息的路由过程和JMS存在一些差别。AMQP中增加了Exchange和Binging的角色。生产者把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器的消息应该发送到哪个队列。

AMQP消息路由过程

Erlang语言

Erlang语言最初用于交换机领域的架构模式,这样使得RabbitMQ在Broker之间进行数据交互的性能非常优秀(Erlang有着和原生Socket一样的延迟)。

RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。

RabbitMQ的优势:

RabbitMQ的整体架构

整体架构.png

RabbitMQ的消息流转

消息流转

RabbitMQ各组件功能

MacDown Screenshot

RabbitMQ的多种Exchange类型

Exchange分发消息时,根据类型的不同分发策略有区别。目前共四种类型:direct、fanout、topic、headers(headers匹配AMQP消息的header而不是路由键(Routing-key),此外headers交换器和direct交换器完全一致,但是性能差了很多,目前几乎用不到了。所以直接看另外三种类型。)。

direct

MacDown Screenshot

消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发到对应的队列中。路由键与队列名完全匹配。

fanout

MacDown Screenshot

每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。fanout交换器不处理该路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。

topic

MacDown Screenshot

topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键(routing-key)和绑定键(bingding-key)的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:"#"和"*"。#匹配0个或多个单词,匹配不多不少一个单词。

TTL

TTL(Time To Live):生存时间。RabbitMQ支持消息的过期时间,一共两种。

死信队列DLX

死信队列(DLX Dead-Letter-Exchange):利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0之前支持的immediate参数的功能。

消息变成死信的几种情况:

死信队列设置:需要设置死信队列的exchange和queue,然后通过routing key进行绑定。只不过我们需要在队列加上一个参数即可

Map<String, Object> arguments = Maps.newHashMapWithExpectedSize(3);
arguments.put("x-message-ttl", dlx-ttl);
arguments.put("x-dead-letter-exchange","exchange-name");
arguments.put("x-dead-letter-routing-key", "routing-key");
Queue ret = QueueBuilder.durable("queue-name".withArguments(arguments).build();

只需要通过监听该死信队列即可处理死信消息。还可以通过死信队列完成延时队列。

消费端ACK与NACK

消费端进行消费的时候,如果由于业务异常可以进行日志的记录,然后进行补偿。由于服务器宕机等严重问题,我们需要手动进行ACK保障消费端消费成功。

消费端重回队列是为了对没有成功处理消息,把消息重新返回到Broker。一般来说,实际应用中都会关闭重回队列,也就是设置为false。

// deliveryTag:消息在mq中的唯一标识
// multiple:是否批量(和qos设置类似的参数)
// requeue:是否需要重回队列。或者丢弃或者重回队首再次消费。
public void basicNack(long deliveryTag, boolean multiple, boolean requeue) 

生产者Confirm机制

如何实现Confirm确认消息?

确认机制流程

1、在channel上开启确认模式:channel.confirmSelect()
2、在channel上开启监听:addConfirmListener,监听成功和失败的处理结果,根据具体的结果对消息进行重新发送或记录日志处理等后续操作。

Return消息机制

Return Listener用于处理一些不可路由的消息

我们的消息生产者,通过指定一个Exchange和Routing,把消息送达到某一个队列中去,然后我们的消费者监听队列进行消息的消费处理操作。

但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达消息,就需要使用到Returrn Listener。

基础API中有个关键的配置项Mandatory:如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。

通过chennel.addReturnListener(ReturnListener rl)传入已经重写过handleReturn方法的ReturnListener。

消费端自定义监听(推模式和拉模式pull/push)

Basic.Consume将信道(Channel)置为接收模式,直到取消队列的订阅为止。
在接受模式期间,RabbitMQ会不断的推送消息给消费者。
当然推送消息的个数还是受Basic.Qos的限制。
如果只想从队列获得单条消息而不是持续订阅,建议还是使用Basic.Get进行消费。
但是不能将Basic.Get放在一个循环里来代替Basic.Consume,这样会严重影响RabbitMQ的性能。
如果要实现高吞吐量,消费者理应使用Basic.Consume方法。
两种模型对比

如何保证幂等性

消费端实现幂等性,就意味着我们的消息永远不会消费多次,即使我们收到了多条一样的信息。

select count(1) from table where id = id+指纹码
优点:实现简单
缺点:高并发下有数据库写入的性能瓶颈
解决:跟进ID进行分库分表进行算法路由
问题1:是否需要落库。如果落库,如何保证数据的一致性和原子性?
问题2:如果不进行落库,缓存种的数据如果设置定时同步的策略?

如何保证可靠性?

什么是生产端的可靠性投递?

解决方案

缺点:对数据库有多次操作。不适用于高并发业务。
拆出一个回调服务。将落库、检查等操作安排至回调服务上。
1:发送者发送信息至MQ,消费者为下游业务方。
      1.1:成功后,作为发送者发送信息至MQ,消费者为回调服务。
              1.1.1 回调服务接受数据后,落库。
       1.2:失败,等待发送者的延时投递信息。
2、发送者发送延迟投递信息至MQ,消费者为回调服务。
      1.1:查库,确认下游服务方消费已成功。
      1.2:查库,确认下游服务方消费失败,通过rpc调用发送者的接口重新发送。

消息发送者发送的两条信息是同时发送的。

减少了对库的操作,同时解耦,保证了性能,不能百分百保证可靠性

消费端如何限流

当海量消息瞬间推送过来,单个客户端无法同时处理那么多数据,严重会导致系统宕机。这时,需要削峰。

RabbitMQ提供了一种qos(服务质量保证)功能。即在非自动确认消息的前提下(非ACK),如果一定数目的消息(通过基于consume或者channel设置qos的值)未被确认前,不进行消费新的消息。

// prefetchSize:消息体大小限制;0为不限制
// prefetchCount:RabbitMQ同时给一个消费者推送的消息个数。即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack。默认是1.
// global:限流策略的应用级别。consumer[false]、channel[true]。
void BasicQos(unit prefetchSize, unshort prefetchCount, bool global);
channel.basicQos(...);

Channel模式和Connection模式

参考:https://www.jianshu.com/p/2c2a7cfdd38a

Connection和Channel是spring-amqp中的概念,并非rabbitmq中的概念,官方文档对Connection和Channel有这样的描述:

Sharing of the connection 
is possible since the "unit of work" for messaging with AMQP 
is actually a "channel"
 (in some ways, 
this is similar to the relationship 
between a Connection and a Session in JMS).

CHANNEL模式:程序运行期间ConnectionFactory会维护着一个Connection,所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,操作rabbitmq之前都必须先获取到一个Channel,否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);

CONNECTION模式:这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,除了可以有多个Connection,其它都跟CHANNEL模式一样。

关于CONNECTION模式中,可以存在多个Connection的使用场景,官方文档的描述:

The use of separate connections 
might be useful in some environments, 
such as consuming from an HA cluster,
in conjunction with a load balancer, 
to connect to different cluster members.

setChannelCacheSize:设置每个Connection中(注意是每个Connection)可以缓存的Channel数量,注意只是缓存的Channel数量,不是Channel的数量上限,操作rabbitmq之前(send/receive message等)要先获取到一个Channel,获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。

注意,改变这个值不会影响已经存在的Connection,只影响之后创建的Connection。
有时会出现connection closed错误。rabbitTemplate作者对于这种问题的解决方案,他给的方案很简单,单纯的增加connection数:

connectionFactory.setChannelCacheSize(100);

setChannelCheckoutTimeout:当这个值大于0时,channelCacheSize不仅是缓存数量,同时也会变成数量上限,从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数,到时间仍然获取不到可用的Channel会抛出AmqpTimeoutException异常。

同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,超时获取不到Connection也会抛出AmqpTimeoutException异常。

消费端的Concurrency和Prefetch模式

暂未整理,参考https://www.jianshu.com/p/04a1d36f52ba

只有Prefetch模式才可以设置qoshttps://www.jianshu.com/p/4d043d3045ca

RabbitMQ集群

参考:https://www.jianshu.com/p/b7cc32b94d2a

RabbitMQ最优秀的功能之一就是内建集群,这个功能涉及的目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。RabbitMQ内部利用Erlang提供的分布式通信框架OTP来满足上述需求,使客户端在失去一个RabbitMQ节点连接的情况下,还是能够重新连接到集群中的其他节点继续胜场、消费信息。

RabbitMQ会始终记录以下四中类型的内部元数据:

在单一节点中,RabbitMQ会将所有这些信息存储在内存中,同时将标记为可持久化的队列、交换器、 绑定存储在硬盘上。存到硬盘上可以确保队列和交换器在节点重启后能够重建。而在集群模式下,同样也提供了两种选择:存到硬盘上(独立节点的默认配置),存在内存中。

如果在集群中创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的所有信息,因此当集群节点崩溃时,该节点的队列和绑定就消失了,并且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0之后提供了镜像队列以避免集群节点故障导致的队列内容不可用。

RabbitMQ 集群中可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点可以动态的加入到集群中。

当在集群中声明队列、交换器、绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,但是它的执行比磁盘节点要好。内存节点可以提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这两者呢?

RabbitMQ 只要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但直到该节点恢复,否则无法更改任何东西。

上一篇下一篇

猜你喜欢

热点阅读