阿暴的全栈之路

RabbitMQ概念和机制

2021-10-10  本文已影响0人  阿暴阿暴

RabbitMQ概念和机制

RabbitMQ是一个开源的消息队列系统,是AMQP(高级消息队列协议)标准的实现,由Erlang开发

概念

Broker

RabbitMQ服务节点

进程模型

Client

生产者、消费者

Connection

Broker和Client通过Connection连接(TCP),NIO,共享connection,channel

连接缓存模式

配置生产和消费使用不同的Connection

 rabbitTemplate.setUsePublisherConnection(true);

Queue

queue通过channel收发Message

常用参数

Message

持久化

待补充🤙

存储

所有队列中的消息都以append的方式写到一个文件中,当这个文件的大小超过指定的限制大小后,关闭这个文件再创建一个新的文件供消息的写入。文件名(*.rdq)从0开始然后依次累加。当某个消息被删除时,并不立即从文件中删除相关信息,而是做一些记录,当垃圾数据达到一定比例时,启动垃圾回收处理,将逻辑相邻的文件中的数据合并到一个文件中。

读写和删除

待补充🤙

垃圾回收

由于执行消息删除操作时,并不立即对在文件中对消息进行删除,也就是说消息依然在文件中,仅仅是垃圾数据而已。当垃圾数据超过一定比例后(默认比例为50%),并且至少有三个及以上的文件时,rabbitmq触发垃圾回收。垃圾回收会先找到符合要求的两个文件(根据#file_summary{}中left,right找逻辑上相邻的两个文件,并且两个文件的有效数据可在一个文件中存储),然后锁定这两个文件,并先对左边文件的有效数据进行整理,再将右边文件的有效数据写入到左边文件,同时更新消息的相关信息(存储的文件,文件中的偏移量),文件的相关信息(文件的有效数据,左边文件,右边文件),最后将右边的文件删除

消息生命周期

Publish,Publish Confirm,Ready,UnAck,Ack,删除

Exchange

路由,连接Message和Queue(通过routing key)

模式:direct直连;fanout广播;topic组播

VisualHost

Exchange,Queues逻辑分区

机制

Message消费过程

Cilent发送消息
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,JSON.toJSONBytes(new Object()));
    
rabbitTemplate.convertAndSend(queueName, content);
创建Consumer

Client根据consumer数量(基于concurrency配置(默认1))创建多线程(使用线程池管理),一个consumer

一个线程,每个线程轮询处理本地队列中的消息,被consumer持有的线程不会释放,循环处理本地阻塞队列

线程池优化

默认使用new SimpleAsyncTaskExecutor()

可用线程数需大于Consumer所需线程,否则会阻塞等待,并报Warning信息

线程池不可配置阻塞队列,否则当核心线程数满了之后,阻塞队列会导致消费者不能启动

总结:不推荐使用自定义配置的线程池,若使用,每次增加队列时均需要注意配置好线程数。

使用自定义的线程池

@Slf4j
@Configuration
public class RabbitConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(2);
        factory.setPrefetchCount(1);
        factory.setDefaultRequeueRejected(true);
        factory.setTaskExecutor(taskExecutor());
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }


    @Bean("correctTaskExecutor")
    @Primary
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(100);
        // 设置最大线程数
        executor.setMaxPoolSize(100);
        // 设置队列容量
        executor.setQueueCapacity(0);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(300);
        // 设置默认线程名称
        executor.setThreadNamePrefix("thread-xx-");
        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

}
Consumer扩容过程

待补充🤙

Consumer获取消息
Consumer消费消息
消费过程中Broker上Message状态变化

异常情况:Connection断开后或(消息拒绝)unack消息会置为Ready,等待重新调度

消息丢失问题

生产者丢失

解决方案

Broker丢失

解决方案

消费者丢失

解决方案

重复消息

消费端幂等

Message增加唯一key_id

顺序消息

单Consumer,单线程消费

消息堆积

Broker流控

SpringBoot、SpringCloud中使用

<dependency> 
   <groupId>org.springframework.boot</groupId>  
   <artifactId>spring-boot-starter-amqp</artifactId> 
</dependency>
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true  # 确保消息不丢失
    publisher-returns: true # 确保消息不丢失
    listener:
      simple:
        concurrency: 1
        max-concurrency: 3
        # 消费者预取1条数据到内存,默认为250条
        prefetch: 1
        # 确定机制
        acknowledge-mode: manual
@Configuration
public class RabbitConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(1);
        factory.setDefaultRequeueRejected(true);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}
开启confirm模式
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        //设置virtualHost。
        connectionFactory.setVirtualHost("/");
        //消息的确认机制(confirm);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        //setCacheMode:设置缓存模式,共有两种,CHANNEL和CONNECTION模式。
        //1、CONNECTION模式,这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,
        // 除了可以有多个Connection,其它都跟CHANNEL模式一样。
        //2、CHANNEL模式,程序运行期间ConnectionFactory会维护着一个Connection,
        // 所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,
        // 操作rabbitmq之前都必须先获取到一个Channel,
        // 否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),
        // 这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);   //设置CONNECTION模式,可创建多个Connection连接
        //设置每个Connection中缓存Channel的数量,不是最大的。操作rabbitmq之前(send/receive message等)
        // 要先获取到一个Channel.获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,
        // 当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。
        connectionFactory.setChannelCacheSize(10);
        //单位:毫秒;配合channelCacheSize不仅是缓存数量,而且是最大的数量。
        // 从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数
        //同时,在CONNECTION模式,这个值也会影响获取Connection的等待时间,
        // 超时获取不到Connection也会抛出AmqpTimeoutException异常。
        connectionFactory.setChannelCheckoutTimeout(600);

        //仅在CONNECTION模式使用,设置Connection的缓存数量。
        connectionFactory.setConnectionCacheSize(3);
        //setConnectionLimit:仅在CONNECTION模式使用,设置Connection的数量上限。
        connectionFactory.setConnectionLimit(10);
        return connectionFactory;
    }
  @Autowired
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        //客户端开启confirm模式
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }

死信队列

如何死信

性能测试

待补充🤙

参考

上一篇 下一篇

猜你喜欢

热点阅读