Spring-boot 集成Redis应用(二) --分布式锁以
Spring-boot 集成Redis应用(二) --分布式锁以及压测介绍
一.基础环境
-
jdk 1.8
-
maven 3.5.3
-
spring-boot 2.0.4
-
redis 4.0.11
-
ApacheBench 2.3
以上工具需要提前安装以及熟悉基本操作,在本文中不会讲解如何安装
二.基本介绍
相信各位小伙伴在学习Redis时,都了解到Redis不仅仅是一个内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。上一篇集成应用已经简单的介绍了 Redis作为消息队列配合基于Servlet 3的异步请求处理的简单示例。本篇将介绍Redis在单机部署的场景下的分布式锁。分布式锁的思想来源于Redis官网,下面给出中文翻译相当棒链接,方便大家了解分布式锁。《Redis官方文档》用Redis构建分布式锁。在这就不讲解中心思想了。那么我会以一个模拟秒杀系统的简单Demo来一步一步的展示redis分布式锁的应用。
三.无布式锁时的秒杀代码以及压测结果
组件依赖
由于是无redis锁的情况下的秒杀demo,则只需要引入spring-boot基础依赖即可
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
代码
由于是一个简单的秒杀下单的demo代码,那么只需要两个接口,一个是下单接口(order),一个是查询订单接口(query),Controller层代码如下:
/**
* @author Neal
* 测试无分布式锁controller 层
*/
@RestController
@RequestMapping("/order")
public class NoDistributeController {
//无分布式锁service
@Autowired
NoDistributeService redisDistributeService;
/**
* 查询剩余订单结果接口
* @param pid 订单编号
* @return
*/
@GetMapping("/query/{pid}")
public String query(@PathVariable String pid) {
return redisDistributeService.queryMap(pid);
}
/**
* 下单接口
* @param pid 订单编号
* @return
*/
@GetMapping("/{pid}")
public String order(@PathVariable String pid) {
redisDistributeService.order(pid);
return redisDistributeService.queryMap(pid);
}
}
service层代码如下:
/**
* @author Neal
* 测试无分布式锁service 层
*/
@Service
public class NoDistributeService {
//模拟商品信息表
private static Map<String,Integer> products;
//模拟库存表
private static Map<String,Integer> stock;
//模拟订单表
private static Map<String,String> orders;
static {
products = new HashMap<>();
stock = new HashMap<>();
orders = new HashMap<>();
//模拟订单表数据 订单编号 112233 库存 100000
products.put("112233",100000);
//模拟库存表数据 订单编号112233 库存100000
stock.put("112233",100000);
}
/**
* 模拟查询秒杀成功返回的信息
* @param pid 商品编号
* @return 返回拼接的秒杀商品结果字符串
*/
public String queryMap(String pid) {
return "秒杀商品限量:" + products.get(pid) + "份,还剩:"+stock.get(pid) +"份,成功下单:"+orders.size() + "人";
}
/**
* 下单方法
* @param pid 商品编号
*/
public void order(String pid) {
//从库存表中获取库存余量
int stockNum = stock.get(pid);
//如果库存为0 则输出库存不足
if(stockNum == 0) {
System.out.println("商品库存不足");
}else{ //如果有库存
//往订单表中插入数据 生成UUID作为用户ID pid
orders.put(UUID.randomUUID().toString(),pid);
//线程休眠 模拟其他操作
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//减库存操作
stock.put(pid,stockNum-1);
}
}
}
代码上有注释,我就不过多的啰嗦解释了,那么让我们来模拟秒杀环境,使用 ApacheBench 来模拟并发,来看看结果如何:
-
进入到 ApacheBench 的bin目录下,我的路径是 Apache24\bin。
-
输入命令以及参数:ab -n 1000 -c 100 http://127.0.0.1:8080/order/112233 该命令的含义是 向指定的URL发送 1000次请求 100 并发量。 结果如图
无锁压测图1
虽然只耗时了2.596秒,但是处理的结果却不尽如人意。
- 然后使用查询接口查询一下下单和库存结果是否一致,请求查询接口:http://localhost:8080/order/query/112233。结果如图
无锁查询结果
可以看出下单人数和库存余量明显不符,就这就是无锁时,在高并发环境中会引起的问题。
Redis分布式锁下的秒杀代码以及压测结果
前言
网上看了很多的例子,都是使用redis的SETNX命令来实现的,Redis 官网并不推荐使用SETNX命令,而是推荐使用SET,因为从2.6.12版本以后,Redis对SET命令增加了一系列的选项。
-
EX
seconds – Set the specified expire time, in seconds. -
PX
milliseconds – Set the specified expire time, in milliseconds. -
NX
– Only set the key if it does not already exist. -
XX
– Only set the key if it already exist. -
EX
seconds – 设置键key的过期时间,单位时秒 -
PX
milliseconds – 设置键key的过期时间,单位时毫秒 -
NX
– 只有键key不存在的时候才会设置key的值 -
XX
– 只有键key存在的时候才会设置key的值注意: 由于
SET
命令加上选项已经可以完全取代SETNX, SETEX, PSETEX的功能,所以在将来的版本中,redis可能会不推荐使用并且最终抛弃这几个命令。 原文地址
所以本例也是使用上述文章所推荐的加锁解锁方法。
加锁:使用命令 SET resource_name my_random_value NX PX 30000 这个命令的作用是在只有这个key不存在的时候才会设置这个key的值(NX选项的作用),超时时间设为30000毫秒(PX选项的作用) 这个key的值设为“my_random_value”。这个值必须在所有获取锁请求的客户端里保持唯一。
解锁: 使用LUA脚本语言
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
这段脚本的意思是:删除这个key当且仅当这个key存在而且值是我期望的那个值。
LUA脚本的原子性 原文链接
Redis 使用单个 Lua 解释器去运行所有脚本,并且, Redis 也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。这和使用 MULTI / EXEC 包围的事务很类似。在其他别的客户端看来,脚本的效果(effect)要么是不可见的(not visible),要么就是已完成的(already completed)。
另一方面,这也意味着,执行一个运行缓慢的脚本并不是一个好主意。写一个跑得很快很顺溜的脚本并不难,因为脚本的运行开销(overhead)非常少,但是当你不得不使用一些跑得比较慢的脚本时,请小心,因为当这些蜗牛脚本在慢吞吞地运行的时候,其他客户端会因为服务器正忙而无法执行命令。
组件依赖
相关jedis依赖
<!--Jedis 相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
代码
1.配置jedis。网上有很多配置jedis的例子,我就给一个简单的配置实现。我使用的是自定义的配置,首先在 application.properties中添加Jedis配置参数
#jedis相关配置
#Redis IP
jedis.host=192.168.56.101
#Redis 端口
jedis.port=6379
#Redis 密码
jedis.password=123456
jedis.timeout=3
jedis.poolMaxTotal=10
jedis.poolMaxIdle=10
jedis.poolMaxWait=3
2.声明自定义配置Bean,使用springboot 注解ConfigurationProperties来加载application.properties中的配置参数。
/**
* @author Neal
* 自定义Jedis配置bean
*/
@Component
@ConfigurationProperties(prefix = "jedis")
public class MyJedisBean {
private String host;
private int port;
private String password;
private int timeout;
private int poolMaxTotal;
private int poolMaxIdle;
private int poolMaxWait;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getPoolMaxTotal() {
return poolMaxTotal;
}
public void setPoolMaxTotal(int poolMaxTotal) {
this.poolMaxTotal = poolMaxTotal;
}
public int getPoolMaxIdle() {
return poolMaxIdle;
}
public void setPoolMaxIdle(int poolMaxIdle) {
this.poolMaxIdle = poolMaxIdle;
}
public int getPoolMaxWait() {
return poolMaxWait;
}
public void setPoolMaxWait(int poolMaxWait) {
this.poolMaxWait = poolMaxWait;
}
}
3.生成JedisPool组件bean
这里就是把JedisPool的相关配置参数配置到JedisPoolConfig并且利用JedisPool的构造方法来声明对象。
/**
* @author Neal
* 初始化jedis 连接池
*/
@Component
public class MyJedisConfig {
/**
* 自定义jedis配置bean
*/
@Autowired
private MyJedisBean myJedisBean;
@Bean
public JedisPool jedisPoolFactory() {
//声明jedispool 配置类
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(myJedisBean.getPoolMaxIdle());
jedisPoolConfig.setMaxTotal(myJedisBean.getPoolMaxTotal());
jedisPoolConfig.setMaxWaitMillis(myJedisBean.getPoolMaxWait() *1000);
/**
* 利用Jedis的构造方法 生成 jedispool
*/
JedisPool jedisPool = new JedisPool(jedisPoolConfig,myJedisBean.getHost(),myJedisBean.getPort(),myJedisBean.getTimeout()*1000,myJedisBean.getPassword(),0);
return jedisPool;
}
}
4.实现Redis分布式锁方法
该类中只有加锁(redisLock)和解锁(redisUnlock)两个方法。 声明的静态变量 都是根据Redis原生命令 SET resource_name my_random_value NX PX 30000 声明的命令字符串。在加锁和解锁时 resource_name 对应的是 商品ID, my_random_value 对应的是我们用UUID 生成的模拟用户ID。
/**
* @author Neal
* 分布式锁
*/
@Component
public class MyRedisLock {
//Only set the key if it does not already exist.
private static final String IF_NOT_EXIST = "NX";
// Set the specified expire time, in milliseconds.
private static final String SET_EXPIRE_TIME = "PX";
//超时时间为 500毫秒
private static final int EXPIRE_TIME = 500;
//加锁成功后返回的标识
private static final String ON_LOCK = "OK";
//LUA 解锁脚本
private static final String LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
/**
* 获取配置好的jedis pool 组件
*/
@Autowired
private JedisPool jedisPoolFactory;
/***
* redis 加锁方法
* @param id 商品ID
* @param uuid 模拟用户ID
* @return 返回 true 加锁成功 false 解锁成功
*/
public boolean redisLock(String id,String uuid) {
//从 jedis 连接池中 获取jedis
Jedis jedis = jedisPoolFactory.getResource();
boolean locked = false;
try{
//使用Jedis 加锁
locked = ON_LOCK.equals(jedis.set(id,uuid,IF_NOT_EXIST,SET_EXPIRE_TIME,EXPIRE_TIME));
}finally {
//将连接放回连接池
jedis.close();
}
return locked;
}
/***
* redis 解锁方法
* @param id 商品ID
* @param uuid 模拟用户ID
* @return 由于是使用LUA脚本,则会保证原子性的特质
*/
public void redisUnlock(String id,String uuid) {
//从 jedis 连接池中 获取jedis
Jedis jedis = jedisPoolFactory.getResource();
try{
//使用Jedis 的 eval解锁
Object result = jedis.eval(LUA_SCRIPT, Collections.singletonList(id),Collections.singletonList(uuid));
if(1L == (Long)result) {
System.out.println("客户ID为:《" + uuid + "》 解锁成功!");
}
}finally {
jedis.close();
}
}
}
核心的加锁代码已经介绍完了,下面就是关于 在秒杀service层加锁与解锁相关的代码了。
5.Controller层
controller层与之前的无锁controller没有变化,还是一样的代码。
/**
* @author Neal
* 测试分布式锁controller 层
*/
@RestController
@RequestMapping("/distribute")
public class RedisDistributeController {
@Autowired
private RedisDistributeService redisDistributeService;
@Autowired
private JedisPool jedisPoolFactory;
@GetMapping("/query/{pid}")
public String query(@PathVariable String pid) {
return redisDistributeService.queryMap(pid);
}
@GetMapping("/{pid}")
public String order(@PathVariable String pid) {
redisDistributeService.order(pid, UUID.randomUUID().toString());
return redisDistributeService.queryMap(pid);
}
}
6.service层
在service层中的下单方法(order)中加入了 加锁与解锁的操作。
/**
* @author Neal
* 测试分布式锁service 层
*/
@Service
public class RedisDistributeService {
//模拟商品信息表
private static Map<String,Integer> products;
//模拟库存表
private static Map<String,Integer> stock;
//模拟订单表
private static Map<String,String> orders;
//redis 锁组件
@Autowired
MyRedisLock myRedisLock;
static {
products = new HashMap<>();
stock = new HashMap<>();
orders = new HashMap<>();
products.put("112233",100000);
stock.put("112233",100000);
}
/**
* 模拟查询秒杀成功返回的信息
* @param pid 商品名称
* @return
*/
public String queryMap(String pid) {
return "秒杀商品限量:" + products.get(pid) + "份,还剩:"+stock.get(pid) +"份,成功下单:"+orders.size() + "人";
}
/**
* 下单方法
* @param pid 商品名称
*/
public void order(String pid,String uuid) {
//redis 加锁
if(!myRedisLock.redisLock(pid,uuid)) { //如果没获得锁则直接返回,不执行下面的代码
System.out.println("客户ID为:《"+ uuid +"》未获得锁");
return;
}
System.out.println("客户ID为:《"+ uuid +"》获得锁");
//从库存表中获取库存余量
int stockNum = stock.get(pid);
if(stockNum == 0) {
System.out.println("商品库存不足");
}else{
//往订单表中插入数据
orders.put(uuid,pid);
//线程休眠 模拟其他操作
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//减库存操作
stock.put(pid,stockNum-1);
}
//redis 解锁
myRedisLock.redisUnlock(pid,uuid);
}
}
<u>代码已经介绍完了那么下面开始压测,并看看结果是否跟没加锁的代码有区别</u>
7.压测
压测方式跟上面讲述的一样,我就不在啰嗦了,直接上结果图。
有锁压测图1 有锁压测图2 结果
由图中可以看出,同样的压测命令,秒杀的结果却只有13个秒杀成功,但是库存余量与秒杀的数量是对应的上的,不会出现库存与秒杀数量不一致问题。
结论:
在写Redis锁的时候看了很多前辈的博文以及教学视频,发现之前用的都是基于SETNX,解锁也是各有千秋,最后还是参考了 Redis的官方实践。
DEMO代码地址
Redis命令大全