RabbitMQ指南(三)

2020-11-18  本文已影响0人  纳样流年

一、存储机制

无论是持久化的消息,还是非持久化的消息都是可以被写入到磁盘中的。持久化的消息在达到队列的时候被写入磁盘,并且如果可以,持久化的消息也会在内存中保存一份。非持久化的消息一般只是保存在内存中,在内存吃紧的时候会被换入到磁盘中。这两类消息的落盘处理都在RabbitMQ的“持久层”中完成。

持久层是一个逻辑概念,实际上包括了两个部分

消息可以直接存储在rabbit_queue_indxe中,也可以存储在rabbit_msg_store中(具体存储的位置要取决于消息的大小,较小的存在rabbit_queue_index中,较大的存储在rabbit_msg_store,默认阈值为4096B,可以通过queue_index_embed_msgs_below参数可以改变大小)。

RABBITMQ_HOME/var/lib/mnesia/rabbit@$HOSTNAME目录下包含queues,msg_store_persisent,msg_store_transient三个文件夹,分别作用为

RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射(Index)和文件的相关信息。在消息删除的时候只是在ETS表删除指定消息的相关信息,仅仅是标记为垃圾数据而已,其后满足一定阈值之后才会触发垃圾回收完成物理删除。

二、队列结构

RabbitMQ消息投递时,如果目的队列是空的,而且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。当消息无法直接投递给消费者时(比如消费者繁忙),需要暂时将消息存储队列,以便后续投递。消息存储队列后,不是固定不变的,它会随着系统的负载在队列中不断的流动,状态不断的变化。RabbitMQ中的队列消息可能会处于一下4中状态。

对于持久化的消息,一定会进入gamma状态,并且开启了publisher confirm机制的时候,只有到了gamma状态才会确认消息已经被接收。

惰性队列

从RabbitMQ 3.6版本开始引入了惰性队列的概念。惰性队列尽可能地将消息存入磁盘中,并在消费者消费到相应的消息的时候才会被加载到内存中。它的一个重要的设计目标——支持更长的队列。

默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能地存在内存中,这样可以更快的将消息发送给消费者。即便是持久化消息也会在内存中保持一个副本。当Rabbitmq需要释放内存的时候,会将内存中的消息换页到磁盘中(这个过程耗时而且阻塞队列,无法接收新的消息)。

惰性队列会将消息直接存入文件系统中,而不管是持久化或是非持久化的消息,这样减少了内存的使用,但是却增加了I/O的使用。但是如果消息是持久化的,那么I/O操作是不可避免的,因此惰性队列与持久化消息是最佳搭档

注意:惰性队列如果存储非持久化的消息,内存的使用率会很稳定,但是重启后消息依旧会丢失。

惰性队列会将所有消息都落盘,而默认队列只会落盘持久化的消息。

惰性队列与普通队列性能对比

惰性队列与普通队列相比,只有很小的内存开销。例如发送1千万条大小为1kb的消息,普通队列会占用1.2GB内存,而惰性队列只需要占用1.5MB队列。
对于普通队列,如果发送1千万条消息,需要耗时800秒,平均1300条/秒。但是惰性队列耗时仅为421秒,平均2400条/秒。性能偏差的原因是普通队列会由于内存不足而不得不将消息换页到磁盘中

三、 内存及磁盘警告与流控

内存占用超过配置的阈值磁盘剩余空间低于配置的阈值时,RabbitMQ都会暂时阻塞集群中所有生产者客户端的连接(Connection)并停止接收客户端发来的消息,以免服务崩溃。与此同时客户端与服务端的心跳检测也会失效,此时Connection的状态可能有两种情况:

在集群中,如果一个Broker节点的内存或磁盘受到限制,那么整个集群的生产者都会被阻塞。

内存警告

默认情况下vm_memory_high_watermark=0.4,表示内存阈值为40%,超过阈值就会发生内存警告并阻塞所有的生产者连接。如果该值设置为0,那么生产者都会被停止发送消息。推荐取值[0.4-0.66],不建议超过0.7。
vm_memory_high_watermark也支持以绝对值的方式设置:

{ vm_memory_high_watermark, { absolute, 1024MiB } }

在某个Broker节点达到内存阈值并阻塞生产者之前,它会尝试将队列中消息换页到磁盘以释放内存空间。

磁盘警告

当磁盘剩余空间低于阈值的时候,RabbitMQ会阻塞生产者,默认阈值为50MB。

当磁盘剩余空间低于阈值的时候,Rabbitmq会停止内存中的消息的换页动作,因此一个谨慎的做法是将阈值设置为与操作系统所显示的内存大小一致。

RabbitMQ会定期检查一次剩余空间,检测的频率与上一次执行检测到的磁盘剩余空间有关。正常10s/次,随着剩余空间与阈值越近,频率越高,最高可以到1s/10次。
可以通过disk_free_limit来设置阈值:

{ disk_free_limit, "1GB"} //绝对值
{ disk_free_limit,{ mem_relative,1.0 }} //与内存相关 建议取值[1.0,2.0]

流控

当RabbitMQ的内存磁盘达到阈值后,生产者被阻塞,直到对应项了恢复正常。除了这两个阈值之外,RabbitMQ还引入了流控机制。
流控机制用来避免消息的发送频率过快而导致服务器难以支持的情况(用于限制生产者的投递)。

内存和磁盘警告是相当于全局的控制,一旦触发会阻塞集群中所有的Connection。而流控是针对单个Connection的。

当Connection处于flow状态时,意味着Connection的状态每秒在blockedunblocked中来回切换,这样可以将消息发送速率控制在一个服务器能支持的范围之内。处于flow状态的Connection和running状态并没有什么不同,只是表示相应的发送速率被降低,限流了而已。对于客户端而言,它看到的是服务器的带宽变小了。

四、镜像队列

如果RabbitMQ集群是由多个Broker节点组成的,那么从服务的整体可用性来说是具有一定的弹性的。但是尽管交换机和绑定关系能够在单点故障问题上幸免遇难,但是队列和其上存储的消息却不行,队列进程及存储的消息仅仅维持在单个节点上,所以一个节点失效,其上的队列就不再可用。

引入镜像队列机制,可以将队列镜像到其他节点上,如果集群中的一个节点丢失了,队列能自动切换到另一个节点的镜像队列上,以此保证可用性。

通常一个镜像队列都包含一个主节点(master)和若干从节点(slave),如下图所示:

主从结构

注意:镜像队列支持生产者的确认与事务机制。只有当全部的节点确认了消息才算消息被确认了

镜像队列的宕机处理

slave挂掉之后,除了与此slave相连接的客户端全部断开之外,没有其他的影响。

master挂掉之后,会有以下的连锁反应:

  1. master连接的客户端全部断开。
  2. 选举最老slave为新的master
  3. 新的master重新入队所有uack的消息,因此可能导致消息被重复消费。
  4. 如果客户端连接着slave,并且Basic.Consume消费时制定了x-cancel-on-ha-failover参数,那么会收到master宕机的通知。

镜像队列的数据同步

新节点加入到已经存在的镜像队列时,默认情况下ha-sync-mod=manual,镜像队列中的消息不会主动同步到新的slave中,除非显示的调用同步命令。当调用同步命令后,整个镜像队列会处于阻塞状态,无法进行其他操作,直到同步完成。
镜像队列中最后一个停止的节点会是master,启动顺序也必须是master先启动。如果slave先启动,它会等30秒,等待master启动,然后加入集群中。如果30秒内master没有启动,那么slave也会停止。当所有节点同时离线时候(断电等情况导致),每个节点都认为自己不是master节点,此时可以在30秒内启动全部节点,完成镜像队列的恢复。

五、负载均衡

面对大量的业务访问,高并发请求时,可以才需集群的策略来对负载能力做出进一步的提升,但是这里还会存在一个负载均衡不均匀的问题。例如集群有A,B,C三个节点,那么所有的客户端都与其中单个节点建立TCP连接,那么该节点的的网络负载必然会大大的增加,而其他节点又会造成资源闲置浪费的问题,因此负载均衡显得尤为必要。
对于RabbitMQ而言,客户端与集群建立的TCP连接不是与集群中所有的节点建立连接,而是挑选其中一个节点建立连接。因此引入负载均衡后,客户端与集群节点的关系应如下图所示:


RabbitMQ 负载均衡

负载均衡通常分软件负载均衡硬件负载均衡。与软件负载均衡相比,硬件负载均衡具有更好的负载均衡效果,但是成本比较高,适合流量高的大型网站系统。因此该小结主要列举一些适用于RabbitMQ集群使用的软件负载均衡技术。目前主流的方式有如下几种:

客户端负载均衡

在客户端连接时使用负载均衡算法来实现负载均衡。主流的算法有如下几种

轮询法

将请求按照顺序轮流的分配到服务器上,该算法不关系服务器的实际连接数和系统的负载状态。伪代码如下(C#):

class RoundRobin
{
    public List<string> ServerAddresses
    {
        get;
        set;
    }

    private object _syncRoot = new object();

    private int _position = 0;

    public string GetServerAddress()
    {
        if (ServerAddresses?.Count == 0)
        {
            throw new ArgumentNullException(nameof(ServerAddresses));
        }

        lock (_syncRoot)
        {
            string address;
            if (_position >= ServerAddresses.Count)
            {
                _position = 0;
            }
            address = ServerAddresses[_position];
            _position++;
            return address;
        }
    }
}
加权轮询法

相较于普通的轮询法,该算法可以根据每台服务器的配置和抗压能力的不同来赋予不同的权重。伪代码如下(C#):

class WeightRoundRobin
{
    private object _syncRoot = new object();

    private int _position = 0;

    public List<string> ServerAddresses
    {
        get;
        private set;
    }

    public Dictionary<string, int> ServerAddressesConfig
    {
        get;
        private set;
    }

    public WeightRoundRobin(Dictionary<string, int> addressesConfig)
    {
        ServerAddressesConfig = addressesConfig;
        ServerAddresses = new List<string>();
        foreach (var kv in addressesConfig)
        {
            for (int i = 0; i < kv.Value; i++)
            {
                ServerAddresses.Add(kv.Key);
            }
        }
    }

    public string GetServerAddress()
    {
        if (ServerAddresses?.Count == 0)
        {
            throw new ArgumentNullException(nameof(ServerAddresses));
        }

        lock (_syncRoot)
        {
            string address;
            if (_position >= ServerAddresses.Count)
            {
                _position = 0;
            }
            address = ServerAddresses[_position];
            _position++;
            return address;
        }
    }
}
随机法
class DefaultRandom
{
    private Random _random;

    public List<string> ServerAddresses
    {
        get;
        private set;
    }

    public DefaultRandom(HashSet<string> addresses)
    {
        _random = new Random();
        ServerAddresses = new List<string>();
        foreach (var val in addresses)
        {
            ServerAddresses.Add(val);
        }
    }

    public string GetServerAddress()
    {
        if (ServerAddresses?.Count == 0)
        {
            throw new ArgumentNullException(nameof(ServerAddresses));
        }

        int index = _random.Next(0, ServerAddresses.Count);
        return ServerAddresses[index];
    }
}
加权随机法
class WeightRoundRobin
{
    private object _syncRoot = new object();

    private int _position = 0;

    public List<string> ServerAddresses
    {
        get;
        private set;
    }

    public Dictionary<string, int> ServerAddressesConfig
    {
        get;
        private set;
    }

    public WeightRoundRobin(Dictionary<string, int> addressesConfig)
    {
        ServerAddressesConfig = addressesConfig;
        ServerAddresses = new List<string>();
        foreach (var kv in addressesConfig)
        {
            for (int i = 0; i < kv.Value; i++)
            {
                ServerAddresses.Add(kv.Key);
            }
        }
    }

    public string GetServerAddress(string ClientNO)
    {
        if (ServerAddresses?.Count == 0)
        {
            throw new ArgumentNullException(nameof(ServerAddresses));
        }

        Monitor.Enter(_syncRoot);
        try
        {
            if (_position >= ServerAddresses.Count)
            {
                _position = 0;
            }
            string address = ServerAddresses[_position];
            _position++;
            return address;
        }
        finally
        {
            Monitor.Exit(_syncRoot);
        }
    }
}
源地址哈希法

该算法根据获取客户端的IP地址,通过哈希函数计算得到一个数值,用该数值对服务器列表的大小进行取摸运算,得到的结果便是客户端要访问服务器的序号。使用该算法可以达到同一个IP地址的客户端,在服务器列表不变的前提下,每次请求都会映射到同一台服务器进行访问,伪代码如下(C#):

class DefaultHash
{
    public List<string> ServerAddresses
    {
        get;
        private set;
    }

    public DefaultHash(HashSet<string> addresses)
    {
        ServerAddresses = new List<string>();
        foreach (var val in addresses)
        {
            ServerAddresses.Add(val);
        }
    }

    public string GetServerAddress(string ClientNO)
    {
        if (ServerAddresses?.Count == 0)
        {
            throw new ArgumentNullException(nameof(ServerAddresses));
        }

        if (string.IsNullOrWhiteSpace(ClientNO))
        {
            throw new ArgumentNullException(nameof(ClientNO));
        }

        int hashCode = ClientNO.GetHashCode() & int.MaxValue;
        return ServerAddresses[hashCode % ServerAddresses.Count];
    }
}
最小连接数法

该算法比较灵活和智能,由于后端服务器的配置不同,处理的请求也有差异,因此请求的处理有快有慢。该算法根据服务器当前的连接情况,动态的选取当前挤压连接数最少的一台服务器来处理当前的请求,尽可能地提高后端服务的利用效率。伪代码如下(C#):

class ServerNode
{
    public ServerNode(string name)
    {
        if (string.IsNullOrWhiteSpace(name))
        {
            throw new ArgumentNullException(nameof(name));
        }
        Name = name;
    }

    public ServerNode(string name, int connections) : this(name)
    {
        _connections = connections;
    }

    public string Name
    {
        get;
        set;
    }

    private int _connections;
    public int Connections
    {
        get
        {
            return _connections;
        }
    }

    public int Increment()
    {
        return Interlocked.Increment(ref _connections);
    }

    public int Decrement()
    {
        return Interlocked.Decrement(ref _connections);
    }
}

class LeastConnection
{
    private LinkedList<ServerNode> _connectionTable;

    public LeastConnection(HashSet<string> addresses)
    {
        _connectionTable = new LinkedList<ServerNode>();

        Random random = new Random(); // Mock initial connections data

        foreach (var val in addresses)
        {
            _connectionTable.AddLast(new ServerNode(val, random.Next(0, 10)));
        }
    }

    public string GetServerAddress()
    {
        if (_connectionTable?.Count == 0)
        {
            throw new ArgumentNullException(nameof(_connectionTable));
        }

        var em = _connectionTable.GetEnumerator();
        ServerNode leastNode = null;
        while (em.MoveNext())
        {
            #if DEBUG
            Console.WriteLine($"{em.Current.Name} - {em.Current.Connections}");
            #endif
            if (leastNode == null)
            {
                leastNode = em.Current;
            }
            else if (em.Current.Connections < leastNode.Connections)
            {
                leastNode = em.Current;
            }
        }
        leastNode.Increment();
        return leastNode.Name;
    }
}

HAProxy负载均衡

HAProxy提供高可用性、负载均衡及基于TCP和HTTP应用的代理,支持虚拟主机而且免费。

上一篇 下一篇

猜你喜欢

热点阅读