我的微服务设计方案Redis

synchronized锁和redis分布式锁的使用

2019-11-06  本文已影响0人  扮鬼之梦

准备工作

1.商品库存都以50为例存在redis中

image

2.商品购买接口

Controller

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @GetMapping("/buy")
    public String buy(){
        //查询当前商品库存
        Integer store = (Integer) redisTemplate.opsForValue().get("store");

        if(store > 0){
            //库存不为0则库存减一
            store =  store - 1;
            redisTemplate.opsForValue().set("store",store);
            System.out.println("购买成功,购买后库存为:"+ store);
        }else{
            System.out.println("购买失败");
        }
        return "ok";
    }
}

3.使用jMeter进行压测

设置访问地址:http://localhost/lock-service/test/buy

image
设置并发数,这里是0s内发送200个请求。
image

测试

1.未加锁

image
image

结果:原库存为50,但成功卖出了200件商品,卖出200件商品后库存变为20,商品超卖了。

2.加synchronized锁

重点为锁对象的选取,这里使用商品id字符串,对应的常量池中的引用做为锁对象。

学习synchronized的时候一般都是用this做为锁对象的,这里如果使用this,购买不同商品时,也会争夺同一把锁,效率较低。

@GetMapping("/buy")
public String buy(){
    //使用id的字符串做为锁对象,intern方法 (返回常量池中该字符串的引用)
    String poductId = "100";
    String lock = poductId.intern();
    synchronized (lock.getClass()){
        //查询当前商品库存
        Integer store = (Integer) redisTemplate.opsForValue().get("store");
        if(store > 0){
            //库存不为0则库存减一
            store =  store - 1;
            redisTemplate.opsForValue().set("store",store);
            System.out.println("购买成功,购买后库存为:"+ store);
        }else{
            System.out.println("购买失败");
        }
    }
    return "ok";
}
image
image

结果:原库存为50,成功卖出了50件商品,卖出50件商品后库存变为0,加锁成功

3.加synchronized锁,并启动多个商品服务

image

我这里启动两个商品购买服务,锁还是使用synchronized锁,使用SpringClouldGateway负载均衡调用商品服务。

商品服务实例1


image

商品服务实例2


image

结果:通过实例1和实例2的结果可以看出,至少36、35、22、18存在着重复卖出,商品超卖了。因为两个商品服务是启动在两个jvm中,synchronized无法实现跨虚拟机加锁,所以分布式系统中不能使用synchronized锁。

4.加Redis分布式锁,并启动多个商品服务

官方文档

a.引入依赖,SpringBoot版本自选

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>5.2.0.RELEASE</version>
</dependency>

b.yml配置

spring: 
  redis:
    database: 0 # Redis数据库索引(默认为0)
    host: 127.0.0.1
    port: 6379
    jedis:
      pool:
        max-active: 30 # 连接池最大连接数(使用负值表示没有限制)
        max-idle: 60 # 连接池中的最大空闲连接
        min-idle: 20 # 连接池中的最小空闲连接

c.注入Bean对象

@Bean
RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) {
    return new RedisLockRegistry(connectionFactory, "redisLock", 5000L);
}

注 RedisLockRegistry 提供两个构造方法, 上述示例, 最后一个参数为 默认过期时间

To avoid “hung” locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but you can configure this value on the registry. Locks are normally held for a much smaller time.

d.使用Redis锁

@Autowired
private RedisLockRegistry redisLockRegistry;

@GetMapping("/buy")
public String buy(){
    String poductId = "100";
    Lock lock = redisLockRegistry.obtain("buyRedis:" + poductId);
    try{
        lock.lock();
        //查询当前商品库存
        Integer store = (Integer) redisTemplate.opsForValue().get("store");

        if(store > 0){
            //库存不为0则库存减一
            store =  store - 1;
            redisTemplate.opsForValue().set("store",store);
            System.out.println("购买成功,购买后库存为:"+ store);
        }else{
            System.out.println("购买失败");
        }
    }finally {
        lock.unlock();
    }

    return "ok";
}

e.启动两个商品服务进行测试

因为我设置50个商品时,总是在某一个实例里卖完了所有商品(执行太快了),所以我把商品数量设成了200.
实例1


image

实例2


image
image.png

结果:原库存为200,成功卖出了200件商品,卖出200件商品后库存变为0,加分布式锁成功。

自己实现一个redis锁

1.代码

@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @GetMapping("/buy")
    public String buy(){
        String lockKey = "productKey_100";

    //这个锁是阻塞型的锁且会自动增加锁的生存时间
        lock(lockKey);

        try {
        //设置购买操作的时间大于锁的默认生存时间9s,测试锁的续命效果
            Thread.sleep(12000L);
            buyProduct();
        }catch (Exception e){
            System.out.println("休眠错误");
        }finally {
            unlock(lockKey);
        }

        return "ok";
    }

    /**
     * 购买商品
     */
    private void buyProduct(){
        //查询当前商品库存
        Integer store = (Integer) redisTemplate.opsForValue().get("store");

        if(store > 0){
            //库存不为0则库存减一
            store =  store - 1;
            redisTemplate.opsForValue().set("store",store);
            System.out.println("购买成功,购买后库存为:"+ store);
        }else{
            System.out.println("购买失败");
        }
    }

    /**
     * 加锁
     */
    private void lock(String lockKey){
        String uuid = UUID.randomUUID().toString();
        while(true){
            if(redisTemplate.opsForValue().setIfAbsent(lockKey,uuid , 9, TimeUnit.SECONDS)){
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        Timer timer = new Timer();

                        TimerTask timerTask = new TimerTask() {
                            @Override
                            public void run() {
                //锁存在则将生存时间重置为9s
                                String o =(String) redisTemplate.opsForValue().get(lockKey);
                                if(uuid.equals(o)){
                                    redisTemplate.expire(lockKey,9,TimeUnit.SECONDS);
                                }else{
                                    timer.cancel();
                                }
                            }
                        };
            //定时器启动3s后执行第一次,之后每隔3s执行一次
                        timer.schedule(timerTask,3000L,3000L);
                    }
                });

                thread.run();
                break;
            }
        }
    }

    /**
     * 解锁锁
     */
    private void unlock(String lockKey){
        redisTemplate.delete(lockKey);
    }
}

2.效果

设置30个库存,并发发送30个购买请求,执行了12*30秒,得到结果如下


image
image
image

30个商品都成功卖出,卖出后库存为0,加锁成功,锁的阻塞效果和锁的续命效果也成功了。

容易踩的坑

1.事务中加锁,会导致锁失效

原因:由于事务是加锁前开启的,锁是事务未提交前释放的,此时其他线程拿到锁之后进行锁住的代码块,读取的库存数据不是最新的。

解决方法:我们可以在@Transactional注释的方法之前就加上锁,在还没有开事务之前就加锁,那么就可以保证线程的安全性,从而不会出现脏读和数据不一致性等情况。

2.锁对象的选取

锁对象的选取要和实际业务关联,例如商品购买,就要和商品关联。一律使用当前类(this)做锁对象效率不高。

同时对于同一商品的锁对象要是相同对象(同一引用的对象)。

3.单实例服务可以使用非分布式的锁,多实例服务只能使用分布式锁

分布式锁的实现方案有很多

上一篇下一篇

猜你喜欢

热点阅读