RabbitMQ 知识体系结构_0neBean_NOTE

2020-09-26  本文已影响0人  0neBean

本文全部手敲-禁止转载内容

1. 体系结构与工作流程

1.1 概念及工作流程

1.1.1 概念

1.1.2 RabbitMQ 模型架构

RabbitMQ模型架构.png

1.1.3 消息运转流程

RabbitMQ消息队列运转过程.png

1.2 Broker内组件介绍

1.2.1 队列 (Queue)


1.2.2 交换机 (Exchange)

RabbitMQ交换机.png

我们可以暂时理解成生产者将消息投递进了队列,但实际上,这并没有发生,生产者的消息中间还经手了邮递员 (交换机) 的投递,交换机根据你的地址 路由键(RoutingKey) ,并匹配上队列的地址 绑定键(BindingKey) ,之后再进行投递

当然也有不按照地址投递,做专项挂号信投递的邮递员 (交换机) ,直接按照他自己的规则来投递消息

![RabbitMQ路由绑定键.png](https://img.haomeiwen.com/i2862577/7fd39068a996d90e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

1.2.3 路由键 (RoutingKey)

生产者将消息发给交换机的时候,一般会指定一个 RoutingKey ,用来指定这个消息的路由规则,而这个 RoutingKey 需要和交换器的类型和 绑定键(Binding) 联合使用才能生效

1.2.4 绑定 (Binding)

RabbitMQ 中用来绑定队列和消费者的概念,在绑定的时候一般会指定一个 绑定键(BindingKey)

RabbitMQ路由绑定键.png

1.3 RabbitMQ的管理

1.3.1 管理的方式

image-20200403234155814.png image-20200403234231772.png

1.3.2 管理的维度


2. 消息可靠性保障

2.1 与Broker建立连接

客户端通过ConnectionFactory类配置好账号地址等访问信息后,获取Connection,并由Connection来开启channel(信道),后面所有的消息发送消费都是有channel具体来执行的

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n140" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/vhost");
Connection conn = factory.newConnection();
//Connection接口被用来创建一个Channel:
Channel channel = conn.createChannel();
//在创建之后,Channel可以用来发送或者接收消息了。</pre>

2.2 消息的发送和消费

2.2.1 消息的发送

AMQP发送消息.png
public interface Channel extends ShutdownNotifier {
    ...
public void basicPublish(String exchange, 
                         String routingKey, 
                         boolean mandatory, 
                         BasicProperties props, byte[] body) throws IOException
        ....
}

2.2.2 消息的消费

2.2.2.1 推模式消费

AMQP推模式消费消息.png
public interface Channel extends ShutdownNotifier {
    ...
public void basicPublish(String exchange, 
                         String routingKey, 
                         boolean mandatory, 
                         BasicProperties props, byte[] body) throws IOException
        ....
}

2.2.2.2 拉模式消费

AMQP拉模式消费消息.png
  public interface Channel extends ShutdownNotifier {
      ....
    public GetResponse basicGet(String queue, boolean autoAck) throws IOException;
      ....
}

2.3 生产者保障消息到达Broker

2.3.1 消息发送失败处理机制

上一节的内容介绍了,如果消息在Broker中没有投递目标,如何退回或者进入备份交换机,但是如果消息压根没有进入在Broker,消息在发送到Broker的过程中就丢失了,如何处理这种情况?

RabbitMQ提供了两种解决方案


2.3.1.1 事务机制

事务机制可以在生产者将消息提交到RabbitMQ失败的时候,抛出异常,可以让生产者处理发送失败的消息,而不至于直接丢了消息


AMQP事务流转.png AMQP事务回滚流转.png
    .....
    
        try {
            channel.txSelect(); // 声明事务
            // 发送消息
            channel.basicPublish("", _queueName, 
                                 MessageProperties.PERSISTENT_TEXT_PLAIN,
                                 message.getBytes("UTF-8"));
            channel.txCommit(); // 提交事务
        } catch (Exception e) {
            channel.txRollback();
            //在这里处理发送失败的消息(记录发送失败的消息或重新发送)
        } finally {
            channel.close();
            conn.close();
        }
    .....

2.3.1.2 发送确认机制

发送确认机制和事务机制相似,但更为轻量,因为确认消息是异步返回的,第一条消息在等确认消息时,第二条消息可以继续发送,效率更高

// 开启发送方确认模式
channel.confirmSelect();
String message = "test";
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {
    System.out.println("消息发送成功" );
}

2.3.1.3 发送确认和事务的弊端

//异步确认demo演示
public void AsyncSendConfirmDemo() {
    //开启消息确认
    channel.confirmSelect();
    //消息发送后,无论成功失败,Broker 一定会返回结果
    channel.addConfirmListener(new ConfirmListener() {
        public void handleAck(long deliveryTag, boolean multiple)
            throws IOException {
            //删除本地该条发送记录
            if (multiple) {
                    confirmSet.headSet(deliveryTag + 1).clear();
            } else {
                confirmSet.remove(deliveryTag);
            }
        }

        public void handleNack(long deliveryTag boolean multiple) throws IOException {
            //删除本地该条发送记录
            if (multiple) {
                confirmSet.headSet(deliveryTag + 1).clear();
            } else {
                confirmSet.remove(deliveryTag);
            }
            //注意这里处理消总重发的场景
            // 譬如:重发三次后,记录失败消息,停止重发
        }
    });

    //while(true)模拟一直发送消息的场景
    while (true) {
        //从信道获取发送消息的序号
        long nextSeqNo = channel.getNextPublishSeqNo();
        channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey,
                             MessageProperties.PERSISTENT_TEXT_PLAIN, 
                             ConfirmConfig.msg_10B.getBytes());
        //将消息序号放入集合中保存
        confirmSet.add(nextSeqNo);
    }

}
微信截图_20200330213819.png

2.3.2 消息投递失败处理机制

2.3.2.1 消息的退回

basicPublish api中的参数 mandatory 设置为 true 时,交换机若无法根据自己类型和路由键找到相应的队列时,

消息就会被退回到生产者设置的回调里

这一过程并非RabbitMQ回调了生产者,而是AMQP协议,也就是发送消息的一个部分

发送消息回退.png
Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.addReturnListener(new ReturnListener() {
            
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties,
                                     byte[] body) throws IOException {
                
                log.info("replyText:"+replyText + "exchange:"+exchange +
                "routingKey:"+routingKey + "message:"+new String(body));
                
            }
            
        });

2.3.2.2 备份交换机

消息退回需要在代码中添加退回消息监听的逻辑,如果不想侵入代码,可以使用备份交换机(alternate-exchange),将没有投递对象的消息,转发到备份交换机中

备份交换器-1586066161589.png
//创建主交换机,并指定myAe交换机为其备份交换机
Map<string, object> args = new HashMap<>();
args.Add("alternate-exchage", "myAe");
channel.ExchangeDeclare(EXCHANGE_NAME, "topic", true, false, args);
//创建主队列和主交换机绑定
channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, BINDING_KEY, null);
//创建备份交换机和备份队列
channel.ExchangeDeclare("myAe", "fanout", true, false, null);
channel.QueueDeclare("unroutedQueue", true, false, false, null);
channel.QueueBind("unroutedQueue", "myAe", null);

2.4 消费者保障消息被正确消费

2.4.1 消费者消费确认-ACK

收到消费者客户端确认再删除当前消息

2.4.1.1 确认消息(acknowledgement)

 public interface Channel extends ShutdownNotifier {
      ....
    //确认消费成功
public void basicAck(long deliveryTag, boolean multiple) throws IOException;
      ....
}

2.4.1.2 拒绝消息(reject)

 public interface Channel extends ShutdownNotifier {
      ....
//拒绝消息   
public void basicReject(long deliveryTag, boolean requeue) throws IOException;
//批量拒绝消息
public void basicNack(long deliveryTag,
                          boolean multiple, boolean requeue) throws IOException;
      ....
}
//重新加入队列,并分配给和之前不同的消费者
public RecoverOk basicRecover() throws IOException;

2.4.1.3 分布式ID与重复消息

雪花算法:

image-20200405105410946.png

3. 高可用

3.1 持久化

持久化是指,将发送到Broker中的消息,保存到磁盘上,即使Broker关机或宕机,重启后,消息数据依然存在,,RabbitMQ持久化默认是关闭的,需要通过设置开启

3.1.1 持久化的实现

image-20200331154524124.png image-20200331154451459.png
//创建消息,将消息内容转为json 设置成消息体
Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
// 设置消息持久化  
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
发送该消息
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
rabbitTemplate.convertAndSend(exchageName, routingKey, message);

3.1.2 持久化的数据安全

3.2 集群

3.2.1 什么是集群

集群就是多个Broker组成的一个应用架构,但是队列和消息只存放在一台机器上

RabbitMQ Cluster 原理.png RabbitMQ Cluster 集群.png

3.2.2 镜像队列

镜像队列会将集群中的队列宿主上队列镜像到其他节点上,当队列宿主故障时,镜像队列节点会重新选举出新的队列宿主机器,来确保集群队列的数据安全


3.2.4 集群的负载均衡

集群每个节点都可以连接客户端,但是访问的请求并不是平分到每一个节点上,所以需要单独的负载均衡来将流量平分到每一台节点上

RabbitMQ Cluster 负载均衡.png

4. 常用特性

4.1 死信队列

死信交换机(DLX Dead-Letter-Exchange),本质是普通的交换机,当消息在一个队列中变成死信后,将会被转发到另一个交换机,这个交换机就是死信交换机,与死信交换机绑定的队列,就是死信队列,当消息变成死信需要满足以下任意条件:

channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//指定死信发送的Exchange
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);

//要进行死信队列的声明
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");

4.2 延时消息

//定义队列时,设置队列失效时间
//设置死信队列见上一小节
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

4.3 RPC实现

在发送消息时,指定消息的唯一ID和回调队列,消费者消费后,会向回调队列发送包含该ID的消费回复


//创建一个回调队列
String callbackQueueName = channel.queueDeclare.getQueue(); 
//为消息创建全局唯一ID
String corrID = UUID.randomUUID.toString();
//在基础属性中指定回调队列名称,和唯一ID
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName)
    .correlationID(corrID).build(); 
channel.basicPublish ("","rpc_queue" ,props,message.getBytes());

5. 生产中常见问题解决思路

5.1 消息积压

//声明队列时指定队列优先级
Map<String,Object> args = new HAshMap<String,Object>();
args.put("x-max-priority",10);
channel.queue.Declare("queue.priority",true,false,false,args);

////发送消息时,指定消息优先级
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties-Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish ("exchange一priority","rk一priority",
                                            properties,("messages").getBytes());
//持续增长的消息积压会耗尽硬件资源,触发内存磁盘预警和流控,拒绝掉一些请求,优先保障RabbitMQ存活下来,隔绝系统之间层与层的压力,熔断流量,保证系统整体可用

5.2 QPS低

5.3 实现顺序消息

上一篇下一篇

猜你喜欢

热点阅读