用rabbitMq解决web高并发的学习笔记

2018-09-30  本文已影响0人  DONG999
  1. 引入pom.xml
<!-- https://docs.spring.io/spring-amqp/docs/2.0.2.RELEASE/reference/html/
        https://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-messaging.html#boot-features-amqp
         -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
  1. application.yml 配置部分
    #rabbitmq
    rabbitmq:
      host: 10.0.0.2
      port: 5672
      username: springboot
      password: password
      publisher-confirms: true
      publisher-returns: true
      template:
         mandatory: true
      #https://github.com/spring-projects/spring-boot/blob/v2.0.5.RELEASE/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java
      listener:
         concurrency: 2
         #最小消息监听线程数
         max-concurrency: 2
         #最大消息监听线程数
mybatis:
  mapper-locations: classpath:mapping/*.xml
···
  1. 创建配置文件
package com.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
//错误的 import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

/**
 * 
 * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
 * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息的载体,每个消息都会被投到一个或多个队列。
 * 
 * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
 * 
 * Routing Key:路由关键字,
 * 
 * exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
 * 
 * Producer:消息生产者,就是投递消息的程序. Consumer:消息消费者,就是接受消息的程序.
 * Channel:消息通道,在客户端的每个连接里,可建立多个channel.
 *
 * 
 */

@Configuration
public class RabbitConfig {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    public static final String EXCHANGE_A = "my-mq-exchange_A";
    
    public static final String QUEUE_A = "QUEUE_A";
    
    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";


    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        
        
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    // 必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * * 针对消费者配置 * 
     * 1. 设置交换机类型 * 
     * 2. 将队列绑定到交换机 
     * 
     * FanoutExchange:
     * 将消息分发到所有的绑定队列,无routingkey的概念 
     * 
     * HeadersExchange :通过添加属性key-value匹配
     * DirectExchange:按照routingkey分发到指定队列 
     * TopicExchange:多关键字匹配
     * 
     * 如果需要使用的其他的交换器类型,spring中都已提供实现,所有的交换器均实现org.springframework.amqp.core.AbstractExchange接口。
常用交换器类型如下:

        Direct(DirectExchange):direct 类型的行为是"先匹配, 再投送". 
        即在绑定时设定一个 routing_key, 消息的routing_key完全匹配时, 才会被交换器投送到绑定的队列中去。
        
        Topic(TopicExchange):按规则转发消息(最灵活)。
        
        Headers(HeadersExchange):设置header attribute参数类型的交换机。
        
        Fanout(FanoutExchange):转发消息到所有绑定队列。
 
     * 
     */
    
    


    

    
    
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_A);
    }

    /**
     * 获取队列A
     * 
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); // 队列持久
    }



    /**
     * 一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
     * 
     * @return
     */
    @Bean
    public Binding binding() {

        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

    

}
  1. Producer
package com.redis;

import java.util.UUID;
import org.springframework.amqp.core.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;

@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback , ReturnCallback{
    
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    // 由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
    private RabbitTemplate rabbitTemplate;

    /** * 构造方法注入rabbitTemplate */
    @Autowired
    public MsgProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
        // rabbitTemplate如果为单例的话,那回调就是最后设置的内容
        
        /**
         * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。
         * ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
         */
 
        
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println(message.getMessageProperties().getCorrelationIdString() + " 发送失败");
        
        System.out.println("消息主体 message : "+message);
        System.out.println("消息主体 message : "+replyCode);
        System.out.println("描述:"+replyText);
        System.out.println("消息使用的交换器 exchange : "+exchange);
        System.out.println("消息使用的路由键 routing : "+routingKey);
 
        
    }
    
     /**
      * rabbitTemplate.send(message);   //发消息,参数类型为org.springframework.amqp.core.Message
        rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送
        rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。
     
      * @param content
      */

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        
        System.out.println("开始发送消息c : " + content.toLowerCase() + " ,correlationId= " + correlationId);
        String response = rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId).toString();
        System.out.println("结束发送消息c : " + content.toLowerCase());
        System.out.println("消费者响应c : " + response + " 消息处理完成");
 
        //logger.info(" 发送消息TO A:" + content);
        // 把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
        //rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }

    /** * 回调 */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回调id:" + correlationData);
        if (ack) {
            logger.info("消息成功消费");
        } else {
            logger.info("消息消费失败:" + cause);
        }
        
        
    
    }

}

  1. Receiver
package com.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(String content) {
        System.out.println("接收处理队列A当中的消息: " + content);
    }
}


  1. unit test
 

import java.util.Date;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.redis.MsgProducer;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqTest {

    @Autowired
    private MsgProducer sender;
    
    @Test
    public void sendTest() throws Exception {
        //while(true){
            String msg = new Date().toString();
            sender.sendMsg(msg);
            Thread.sleep(6000);
        //}
    } 

}

  1. 可以测试通过
image.png
  1. 高并发的访问量, 除了使用nginx/haproxy本身实现外, 尝试了google guava 简单好用, 来控制前端用户请求量, 范例
package com.test.ratelimit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.util.concurrent.RateLimiter;
 
 


public class ComplexDemo {
    
    private static RateLimiter rateLimiter = RateLimiter.create(10);
    
    private static AtomicInteger suc = new AtomicInteger(0), fail = new AtomicInteger(0);

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        
 
        List<Runnable> tasks = new ArrayList<Runnable>();  
        for (int i = 0; i < 100; i++) {  
            tasks.add(new UserRequest(i));  
        }  
        ExecutorService threadPool = Executors.newCachedThreadPool();  
        for (Runnable runnable : tasks) {
            threadPool.execute(runnable);  
        } 
        
        
    }
    
    
    private static boolean startGo(int i) {
         //基于令牌桶算法的限流实现类
        
        /**
         * 一秒出10个令牌,0.1秒出一个,100个请求进来,假如100个是同时到达, 那么最终只能成交10个,90个都会因为超时而失败。
         *       
         */
        
        /** 
         * tryAcquire(long timeout, TimeUnit unit) 
         * 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话, 
         * 或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) 
         */  
        
        //判断能否在1秒内得到令牌,如果不能则立即返回false,不会阻塞程序  
        if (!rateLimiter.tryAcquire(1000, TimeUnit.MILLISECONDS)) {  
            System.out.println("暂时无法获取令牌, 排队失败" + i);  
            fail.getAndIncrement();
            System.out.println("SUC/FAIL=" + suc.get() + "/" + fail.get());  
            return false;
        }  
        if (update() > 0) {  
            System.out.println("成功" + i);  
            suc.getAndIncrement();
            System.out.println("FAIL/SUC=" + fail.get() + "/" + suc.get());  
            return true;
        }  
        
        System.out.println("数据不足,失败");  
        return false;
    }
    
    
    private static int update() {
        return 1;
    }
    
     private static class UserRequest implements Runnable {  
            private int id;  
      
            public UserRequest(int id) {  
                this.id = id;  
            }  
      
            public void run() {  
                startGo(id) ;
            }  
        }  

}

测试结果: 
...
成功8
FAIL/SUC=89/10
成功7
FAIL/SUC=89/11

总结, rabbit mq, 这里只用Direct。 Topic匹配灵活, 可以用到其他场景。

REF: http://www.rabbitmq.com/install-rpm.html

上一篇 下一篇

猜你喜欢

热点阅读