springbootSpring 学习分布式

Spring-boot 集成Redis应用(二) --分布式锁以

2018-09-19  本文已影响238人  NealLemon

Spring-boot 集成Redis应用(二) --分布式锁以及压测介绍

一.基础环境

二.基本介绍

相信各位小伙伴在学习Redis时,都了解到Redis不仅仅是一个内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。上一篇集成应用已经简单的介绍了 Redis作为消息队列配合基于Servlet 3的异步请求处理的简单示例。本篇将介绍Redis在单机部署的场景下的分布式锁。分布式锁的思想来源于Redis官网,下面给出中文翻译相当棒链接,方便大家了解分布式锁。《Redis官方文档》用Redis构建分布式锁。在这就不讲解中心思想了。那么我会以一个模拟秒杀系统的简单Demo来一步一步的展示redis分布式锁的应用。

三.无布式锁时的秒杀代码以及压测结果

组件依赖

由于是无redis锁的情况下的秒杀demo,则只需要引入spring-boot基础依赖即可

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
代码

由于是一个简单的秒杀下单的demo代码,那么只需要两个接口,一个是下单接口(order),一个是查询订单接口(query),Controller层代码如下:

/**
 * @author Neal
 * 测试无分布式锁controller 层
 */
@RestController
@RequestMapping("/order")
public class NoDistributeController {

    //无分布式锁service
    @Autowired
    NoDistributeService redisDistributeService;

    /**
     * 查询剩余订单结果接口
     * @param pid  订单编号
     * @return
     */
    @GetMapping("/query/{pid}")
    public String query(@PathVariable String pid) {
        return redisDistributeService.queryMap(pid);
    }

    /**
     * 下单接口
     * @param pid  订单编号
     * @return
     */
    @GetMapping("/{pid}")
    public String order(@PathVariable String pid) {
        redisDistributeService.order(pid);
        return redisDistributeService.queryMap(pid);
    }
}

service层代码如下:

/**
 * @author Neal
 * 测试无分布式锁service 层
 */
@Service
public class NoDistributeService {

    //模拟商品信息表
    private static Map<String,Integer> products;

    //模拟库存表
    private static Map<String,Integer> stock;

    //模拟订单表
    private static Map<String,String> orders;

    static {
        products = new HashMap<>();
        stock = new HashMap<>();
        orders = new HashMap<>();
        //模拟订单表数据 订单编号 112233 库存 100000
        products.put("112233",100000);
        //模拟库存表数据 订单编号112233 库存100000
        stock.put("112233",100000);
    }

    /**
     * 模拟查询秒杀成功返回的信息
     * @param pid 商品编号
     * @return  返回拼接的秒杀商品结果字符串
     */
    public String queryMap(String pid) {
        return "秒杀商品限量:" +  products.get(pid) + "份,还剩:"+stock.get(pid) +"份,成功下单:"+orders.size() + "人";
    }

    /**
     * 下单方法
     * @param pid  商品编号
     */
    public void order(String pid) {
        //从库存表中获取库存余量
        int stockNum = stock.get(pid);
        //如果库存为0 则输出库存不足
        if(stockNum == 0) {
            System.out.println("商品库存不足");
        }else{ //如果有库存
            //往订单表中插入数据 生成UUID作为用户ID pid
            orders.put(UUID.randomUUID().toString(),pid);
            //线程休眠 模拟其他操作
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //减库存操作
            stock.put(pid,stockNum-1);
        }
    }
}

代码上有注释,我就不过多的啰嗦解释了,那么让我们来模拟秒杀环境,使用 ApacheBench 来模拟并发,来看看结果如何:

  1. 进入到 ApacheBench 的bin目录下,我的路径是 Apache24\bin。

  2. 输入命令以及参数:ab -n 1000 -c 100 http://127.0.0.1:8080/order/112233 该命令的含义是 向指定的URL发送 1000次请求 100 并发量。 结果如图

    无锁压测图1
无锁压测图2

虽然只耗时了2.596秒,但是处理的结果却不尽如人意。

  1. 然后使用查询接口查询一下下单和库存结果是否一致,请求查询接口:http://localhost:8080/order/query/112233。结果如图
    无锁查询结果

可以看出下单人数和库存余量明显不符,就这就是无锁时,在高并发环境中会引起的问题。

Redis分布式锁下的秒杀代码以及压测结果

前言

网上看了很多的例子,都是使用redis的SETNX命令来实现的,Redis 官网并不推荐使用SETNX命令,而是推荐使用SET,因为从2.6.12版本以后,Redis对SET命令增加了一系列的选项。

所以本例也是使用上述文章所推荐的加锁解锁方法。

加锁:使用命令 SET resource_name my_random_value NX PX 30000 这个命令的作用是在只有这个key不存在的时候才会设置这个key的值(NX选项的作用),超时时间设为30000毫秒(PX选项的作用) 这个key的值设为“my_random_value”。这个值必须在所有获取锁请求的客户端里保持唯一。

解锁: 使用LUA脚本语言

if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end

这段脚本的意思是:删除这个key当且仅当这个key存在而且值是我期望的那个值。

LUA脚本的原子性 原文链接

Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。这和使用 MULTI / EXEC 包围的事务很类似。在其他别的客户端看来,脚本的效果(effect)要么是不可见的(not visible),要么就是已完成的(already completed)。

另一方面,这也意味着,执行一个运行缓慢的脚本并不是一个好主意。写一个跑得很快很顺溜的脚本并不难,因为脚本的运行开销(overhead)非常少,但是当你不得不使用一些跑得比较慢的脚本时,请小心,因为当这些蜗牛脚本在慢吞吞地运行的时候,其他客户端会因为服务器正忙而无法执行命令。

组件依赖

相关jedis依赖

<!--Jedis 相关依赖-->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
   <exclusions>
      <exclusion>
         <groupId>io.lettuce</groupId>
         <artifactId>lettuce-core</artifactId>
      </exclusion>
   </exclusions>
</dependency>
<dependency>
   <groupId>redis.clients</groupId>
   <artifactId>jedis</artifactId>
</dependency>
代码

1.配置jedis。网上有很多配置jedis的例子,我就给一个简单的配置实现。我使用的是自定义的配置,首先在 application.properties中添加Jedis配置参数

#jedis相关配置
#Redis IP
jedis.host=192.168.56.101
#Redis 端口
jedis.port=6379
#Redis 密码
jedis.password=123456
jedis.timeout=3
jedis.poolMaxTotal=10
jedis.poolMaxIdle=10
jedis.poolMaxWait=3

2.声明自定义配置Bean,使用springboot 注解ConfigurationProperties来加载application.properties中的配置参数。

/**
 * @author Neal
 * 自定义Jedis配置bean
 */
@Component
@ConfigurationProperties(prefix = "jedis")
public class MyJedisBean {

    private String host;

    private int port;

    private String password;

    private int timeout;

    private int poolMaxTotal;

    private int poolMaxIdle;

    private int poolMaxWait;

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public int getPoolMaxTotal() {
        return poolMaxTotal;
    }

    public void setPoolMaxTotal(int poolMaxTotal) {
        this.poolMaxTotal = poolMaxTotal;
    }

    public int getPoolMaxIdle() {
        return poolMaxIdle;
    }

    public void setPoolMaxIdle(int poolMaxIdle) {
        this.poolMaxIdle = poolMaxIdle;
    }

    public int getPoolMaxWait() {
        return poolMaxWait;
    }

    public void setPoolMaxWait(int poolMaxWait) {
        this.poolMaxWait = poolMaxWait;
    }
}

3.生成JedisPool组件bean

这里就是把JedisPool的相关配置参数配置到JedisPoolConfig并且利用JedisPool的构造方法来声明对象。

/**
 * @author Neal
 * 初始化jedis 连接池
 */
@Component
public class MyJedisConfig {

    /**
     * 自定义jedis配置bean
     */
    @Autowired
    private MyJedisBean myJedisBean;

    @Bean
    public JedisPool jedisPoolFactory() {

        //声明jedispool 配置类
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(myJedisBean.getPoolMaxIdle());
        jedisPoolConfig.setMaxTotal(myJedisBean.getPoolMaxTotal());
        jedisPoolConfig.setMaxWaitMillis(myJedisBean.getPoolMaxWait() *1000);
        /**
         * 利用Jedis的构造方法 生成 jedispool
         */
        JedisPool jedisPool = new JedisPool(jedisPoolConfig,myJedisBean.getHost(),myJedisBean.getPort(),myJedisBean.getTimeout()*1000,myJedisBean.getPassword(),0);
        return jedisPool;
    }

}

4.实现Redis分布式锁方法

该类中只有加锁(redisLock)和解锁(redisUnlock)两个方法。 声明的静态变量 都是根据Redis原生命令 SET resource_name my_random_value NX PX 30000 声明的命令字符串。在加锁和解锁时 resource_name 对应的是 商品ID, my_random_value 对应的是我们用UUID 生成的模拟用户ID。

/**
 * @author Neal
 * 分布式锁
 */
@Component
public class MyRedisLock {
    //Only set the key if it does not already exist.
    private static final String IF_NOT_EXIST = "NX";
    // Set the specified expire time, in milliseconds.
    private static final String SET_EXPIRE_TIME = "PX";
    //超时时间为 500毫秒
    private static final int EXPIRE_TIME = 500;
    //加锁成功后返回的标识
    private static final String ON_LOCK = "OK";
   //LUA 解锁脚本
    private static final String LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    /**
     * 获取配置好的jedis pool 组件
     */
    @Autowired
    private JedisPool jedisPoolFactory;

    /***
     * redis 加锁方法
     * @param id  商品ID
     * @param uuid  模拟用户ID
     * @return 返回 true 加锁成功 false 解锁成功
     */
    public boolean redisLock(String id,String uuid) {
        //从 jedis 连接池中 获取jedis
        Jedis jedis = jedisPoolFactory.getResource();
        boolean locked = false;
        try{
            //使用Jedis 加锁
            locked = ON_LOCK.equals(jedis.set(id,uuid,IF_NOT_EXIST,SET_EXPIRE_TIME,EXPIRE_TIME));
        }finally {
            //将连接放回连接池
            jedis.close();
        }
        return locked;
    }

    /***
     * redis 解锁方法
     * @param id  商品ID
     * @param uuid  模拟用户ID
     * @return  由于是使用LUA脚本,则会保证原子性的特质
     */
    public void redisUnlock(String id,String uuid) {
        //从 jedis 连接池中 获取jedis
        Jedis jedis = jedisPoolFactory.getResource();
        try{
            //使用Jedis 的 eval解锁
            Object result = jedis.eval(LUA_SCRIPT, Collections.singletonList(id),Collections.singletonList(uuid));
            if(1L == (Long)result) {
                System.out.println("客户ID为:《" + uuid + "》   解锁成功!");
            }
        }finally {
            jedis.close();
        }
    }
}

核心的加锁代码已经介绍完了,下面就是关于 在秒杀service层加锁与解锁相关的代码了。

5.Controller层

controller层与之前的无锁controller没有变化,还是一样的代码。

/**
 * @author Neal
 * 测试分布式锁controller 层
 */
@RestController
@RequestMapping("/distribute")
public class RedisDistributeController {

    @Autowired
    private RedisDistributeService redisDistributeService;

    @Autowired
    private JedisPool jedisPoolFactory;

    @GetMapping("/query/{pid}")
    public String query(@PathVariable String pid) {
        return redisDistributeService.queryMap(pid);
    }

    @GetMapping("/{pid}")
    public String order(@PathVariable String pid) {
        redisDistributeService.order(pid, UUID.randomUUID().toString());
        return redisDistributeService.queryMap(pid);
    }
}

6.service层

在service层中的下单方法(order)中加入了 加锁与解锁的操作。

/**
 * @author Neal
 * 测试分布式锁service 层
 */
@Service
public class RedisDistributeService {


    //模拟商品信息表
    private static Map<String,Integer> products;

    //模拟库存表
    private static Map<String,Integer> stock;

    //模拟订单表
    private static Map<String,String> orders;

    //redis 锁组件
    @Autowired
    MyRedisLock myRedisLock;

    static {
        products = new HashMap<>();
        stock = new HashMap<>();
        orders = new HashMap<>();
        products.put("112233",100000);
        stock.put("112233",100000);
    }

    /**
     * 模拟查询秒杀成功返回的信息
     * @param pid 商品名称
     * @return
     */
    public String queryMap(String pid) {
        return "秒杀商品限量:" +  products.get(pid) + "份,还剩:"+stock.get(pid) +"份,成功下单:"+orders.size() + "人";
    }

    /**
     * 下单方法
     * @param pid  商品名称
     */
    public void order(String pid,String uuid) {

        //redis 加锁
        if(!myRedisLock.redisLock(pid,uuid)) {  //如果没获得锁则直接返回,不执行下面的代码
            System.out.println("客户ID为:《"+ uuid +"》未获得锁");
            return;
        }

        System.out.println("客户ID为:《"+ uuid +"》获得锁");
        //从库存表中获取库存余量
        int stockNum = stock.get(pid);
        if(stockNum == 0) {
            System.out.println("商品库存不足");
        }else{
            //往订单表中插入数据
            orders.put(uuid,pid);
            //线程休眠 模拟其他操作
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //减库存操作
            stock.put(pid,stockNum-1);
        }

        //redis 解锁
        myRedisLock.redisUnlock(pid,uuid);
    }
}

<u>代码已经介绍完了那么下面开始压测,并看看结果是否跟没加锁的代码有区别</u>

7.压测

压测方式跟上面讲述的一样,我就不在啰嗦了,直接上结果图。


有锁压测图1 有锁压测图2 结果

由图中可以看出,同样的压测命令,秒杀的结果却只有13个秒杀成功,但是库存余量与秒杀的数量是对应的上的,不会出现库存与秒杀数量不一致问题。

结论:

在写Redis锁的时候看了很多前辈的博文以及教学视频,发现之前用的都是基于SETNX,解锁也是各有千秋,最后还是参考了 Redis的官方实践。
DEMO代码地址
Redis命令大全

上一篇下一篇

猜你喜欢

热点阅读