开发设计java解决方案

如何应对高并发:悲观锁,乐观锁,Redis

2019-05-27  本文已影响631人  Wayne_Dream

根据上一篇Demo测试情况反映,当有多个线程同时抢购时,会发生超发现象,所谓超发现象,就是原本设置库存为30000件,但是,当抢购完成后发现库存余量变成了负数,即发货量大于库存量的情况:

超发现象

造成这种现象的原因:当多个线程请求数据库查询库存余量时,显示有余量,但是当进行扣减库存时,库存已经用完了,但那个线程并不知道,依旧去扣减库存,造成库存为负数的情况,于是乎就出现了超发现象。

测试方法:根据书上是html中使用js,for循环异步请求,发现并不会造成超发现象,后改为在浏览器中同时开启多个窗口访问/test进行抢购,模拟多个用户抢购的场景,内存爆炸...

为了解决这种问题,下面将介绍三种解决方法:

1、悲观锁

发生超发现象的根本原因是共享数据被多个线程所修改,无法保证其执行顺序,如果一个数据库事务读取到一个产品后,就将数据直接锁定,不允许其他线程进行读写操作,直至当前数据库事务完成才释放这条数据的锁,就不会发生超发现象,但是执行效率性能将大大下降。

修改ProductMapper中的SQL语句:

@Mapper
public interface ProductMapper {

    @Select("SELECT id, product_name as productName, stock, price, version, note FROM t_product where id=#{id} for update")
    ProductPo getProduct(Long id);

    @Update("UPDATE t_product SET stock = stock - #{quantity} WHERE id = #{id}")
    int decreaseProduct(@Param("id") Long id, @Param("quantity") int quantity);
}

在select语句末尾添加了for update,这样,在数据库事务执行的过程中,就会锁定查询出来的数据,其他事务将不能再对其进行读写,单个请求直至数据库事务完成,才会释放这个,下图可见其stock为0,没有发生超发现象,但执行效率下降了,通过购买记录可以得知,相比之前没加锁慢了1/5。

库存

2、乐观锁

为了解决悲观锁带来的性能下降的问题,我们来讨论一下乐观锁的原理

乐观锁是一种不使用数据库锁和不阻塞线程并发的方案,下图是以本Demo为例的乐观锁流程:

流程图

这种方案就是多线程的概念CAS(Compare and Swap),然而这样的方案会引发一种ABA问题

T1时刻:线程1读取商品库存为A

T2时刻:线程2读取商品库存为A

T3时刻:线程2计算购买商品总价格

T4时刻:当前库存为A,与线程2保存的旧值一致,因此线程2可减库存(当前库存A--->B),此时线程1在当前库存为B的情况下计算剩余商品价格(单价*B)。

T5时刻:线程2取消购买,线程2回退(当前库存B--->A),线程1计算的剩余商品价格错误。

T6时刻:线程1比较旧值与当前数据库库存,发现都为A,返回之前计算好的(单价*B)结果,造成了错误。

从上面的分析中看到一个现象A--->B--->A的过程,就是所谓的ABA问题,解决此问题的方法为加入版本号的限制,只要在操作过程中修改共享值,无论业务正常,回退,还是异常,版本号只能递增,不能回退递减。每次通过比较数据的版本号来查看此数据是否被修改过。

@Mapper
public interface ProductMapper {

    @Select("SELECT id, product_name as productName, stock, price, version, note FROM t_product where id=#{id}")
    ProductPo getProduct(Long id);

    //********************change******************************
    @Update("UPDATE t_product SET stock = stock - #{quantity}, version = version + 1 WHERE id = #{id} and version = #{version}")
    int decreaseProduct(@Param("id") Long id, @Param("quantity") int quantity, @Param("version") int version);
}
     @Override
     // 启动Spring数据库事务机制
     @Transactional
     public boolean purchase(Long userId, Long productId, int quantity) {
         // 获取产品
         ProductPo product = productMapper.getProduct(productId);
         // 比较库存和购买数量
         if (product.getStock() < quantity) {
         // 库存不足
         return false;
         }
         //**************************change*******************************
         // 扣减库存,加入了version
         productMapper.decreaseProduct(productId, quantity, product.getVersion());
         //***************************************************************
         // 初始化购买记录
         PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
         // 插入购买记录
         purchaseRecordMapper.insertPurchaseRecord(pr);
         return true;
     }
产品表

发现stock并没有降为0,原因是加入了版本号的判断,所以大量的请求得到了失败的结果,而且失败率有点高。要解决这个方法,就设定为如果失败,就重试,直至成功,但是这样又会造成大量SQL执行,影响性能,所以一般可以使用限制时间或者重入次数的方法来克服。

时间戳限制重入的乐观锁:

将一个请求限制在100ms的生存期,如果在100ms内发生版本号冲突而导致不能更新的,则会重新尝试请求,否则请求失败。

修改service下PurchaseserviceImpl的purchase方法

 @Override
     // 启动Spring数据库事务机制
     @Transactional
     public boolean purchase(Long userId, Long productId, int quantity) {

         long start = System.currentTimeMillis();
         while (true){
             long end = System.currentTimeMillis();
             if (end - start >100){
                 return false;
             }
             // 获取产品
             ProductPo product = productMapper.getProduct(productId);
             // 比较库存和购买数量
             if (product.getStock() < quantity) {
                 // 库存不足
                 return false;
             }
             // 扣减库存
             int result = productMapper.decreaseProduct(productId, quantity, product.getVersion());

             // 如果数据更新失败,说明数据在多线程中被其他线程修改
             // 导致失败,着通过循环重入尝试购买商品
             if (result == 0){
                 continue;
             }
             // 初始化购买记录
             PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
             // 插入购买记录
             purchaseRecordMapper.insertPurchaseRecord(pr);
             return true;
         }
     }

这种方法在测试中效果并不是很好,执行速度很慢,冲突现象并没有减少,反而增多,可能是我测试方法并不好,只开了三个网页来模拟并发,不太懂JS,Demo用的JS是发送异步请求的,但用单窗口测试了好多次都没出现超发现象,只能人肉模拟并发。

限定次数重入的乐观锁:

     @Override
     // 启动Spring数据库事务机制
     @Transactional
     public boolean purchase(Long userId, Long productId, int quantity) {

         long start = System.currentTimeMillis();
         for (int i=0; i<3; i++){
             // 获取产品
             ProductPo product = productMapper.getProduct(productId);
             // 比较库存和购买数量
             if (product.getStock() < quantity) {
                 // 库存不足
                 return false;
             }
             // 扣减库存
             int result = productMapper.decreaseProduct(productId, quantity, product.getVersion());

             // 如果数据更新失败,说明数据在多线程中被其他线程修改
             // 导致失败,着通过循环重入尝试购买商品
             if (result == 0){
                 continue;
             }
             // 初始化购买记录
             PurchaseRecordPo pr = this.initPurchaseRecord(userId, product, quantity);
             // 插入购买记录
             purchaseRecordMapper.insertPurchaseRecord(pr);
             return true;
         }
         return false;
     }

这种方式比上一种限定时间好,速度和单纯使用乐观锁差不多,并且消除了冲突。

3、Redis处理高并发

在高并发环境中,直接操作数据库的方式过于缓慢,因为数据库是一个写入磁盘的过程,这个速度没有写入内存的Redis快,Redis的机制也能够帮助我们克服超发现象,但是,因为其命令方式运算能力比较薄弱,所以往往采用Redis Lua去代替它原有的命令方式。Redis Lua在Redis的执行中是局内原子性的,但他被执行时不会被其他客户端发送过来的命令打断,通过这样一种机制,可以在需要高并发的环境下考虑使用Redis去代替数据库作为响应用户的数据载体。但是Redis存储具有不稳定性,所以还需要有一定的机制将Redis存储的数据刷入数据库。

下面先来配置一下Redis:

application.properties

#配置redis
spring.redis.jedis.pool.min-idle=5
spring.redis.jedis.pool.max-active=10
spring.redis.jedis.pool.max-idle=10
spring.redis.jedis.pool.max-wait=2000
spring.redis.port=6379
spring.redis.host=127.0.0.1
#我的Redis没有密码
#spring.redis.password=123456
spring.redis.timeout=1000

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.4.RELEASE</version>
    <exclusions>
        <!--不依赖Redis的异步客户端lettuce -->
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!--引入Redis的客户端驱动jedis -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

PurchaseServiceImpl.java,使用Redis Lua响应请求

@Service
public class PurchaseServiceImpl implements PurchaseService {


    @Autowired
    private ProductMapper productMapper = null;

    @Autowired
    private PurchaseRecordMapper purchaseRecordMapper = null;
   
    private PurchaseRecordPo initPurchaseRecord(Long userId, ProductPo product, int quantity) {
        PurchaseRecordPo pr = new PurchaseRecordPo();
        pr.setNote("购买日志,时间:" + System.currentTimeMillis());
        pr.setPrice(product.getPrice());
        pr.setProductId(product.getId());
        pr.setQuantity(quantity);
        double sum = product.getPrice() * quantity;
        pr.setSum(sum);
        pr.setUserId(userId);
        return pr;
    }

    @Autowired
    StringRedisTemplate stringRedisTemplate = null;
    String purchaseScript =
            // 先将产品编号保存到集合中
            " redis.call('sadd', KEYS[1], ARGV[2]) \n"
                    // 购买列表
                    + "local productPurchaseList = KEYS[2]..ARGV[2] \n"
                    // 用户编号
                    + "local userId = ARGV[1] \n"
                    // 产品key
                    + "local product = 'product_'..ARGV[2] \n"
                    // 购买数量
                    + "local quantity = tonumber(ARGV[3]) \n"
                    // 当前库存
                    + "local stock = tonumber(redis.call('hget', product, 'stock')) \n"
                    // 价格
                    + "local price = tonumber(redis.call('hget', product, 'price')) \n"
                    // 购买时间
                    + "local purchase_date = ARGV[4] \n"
                    // 库存不足,返回0
                    + "if stock < quantity then return 0 end \n"
                    // 减库存
                    + "stock = stock - quantity \n"
                    + "redis.call('hset', product, 'stock', tostring(stock)) \n"
                    // 计算价格
                    + "local sum = price * quantity \n"
                    // 合并购买记录数据
                    + "local purchaseRecord = userId..','..quantity..','"
                    + "..sum..','..price..','..purchase_date \n"
                    // 保存到将购买记录保存到list里
                    + "redis.call('rpush', productPurchaseList, purchaseRecord) \n"
                    // 返回成功
                    + "return 1 \n";
    // Redis购买记录集合前缀
    private static final String PURCHASE_PRODUCT_LIST = "purchase_list_";
    // 抢购商品集合
    private static final String PRODUCT_SCHEDULE_SET = "product_schedule_set";
    // 32位SHA1编码,第一次执行的时候先让Redis进行缓存脚本返回
    private String sha1 = null;

    @Override
    public boolean purchaseRedis(Long userId, Long productId, int quantity) {
        // 购买时间
        Long purchaseDate = System.currentTimeMillis();
        Jedis jedis = null;
        try {
            // 获取原始连接
            jedis = (Jedis) stringRedisTemplate
                    .getConnectionFactory().getConnection().getNativeConnection();
            // 如果没有加载过,则先将脚本加载到Redis服务器,让其返回sha1
            if (sha1 == null) {
                sha1 = jedis.scriptLoad(purchaseScript);
            }
            // 执行脚本,返回结果
            Object res = jedis.evalsha(sha1, 2, PRODUCT_SCHEDULE_SET,
                    PURCHASE_PRODUCT_LIST, userId + "", productId + "",
                    quantity + "", purchaseDate + "");
            Long result = (Long) res;
            return result == 1;
        } finally {
            // 关闭jedis连接
            if (jedis != null && jedis.isConnected()) {
                jedis.close();
            }
        }
    }

    @Override
    // 当运行方法启用新的独立事务运行
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    // 保存购买记录,持久化到数据库
    public boolean dealRedisPurchase(List<PurchaseRecordPo> prpList) {
        for (PurchaseRecordPo prp : prpList) {
            purchaseRecordMapper.insertPurchaseRecord(prp);
            productMapper.decreaseProduct(prp.getProductId(), prp.getQuantity());
        }
        return true;
    }
}

使用定时机制,定时将数据持久化到数据库:

首先设置启动文件:

@SpringBootApplication(scanBasePackages = "com.wayne.springboot")
@MapperScan(annotationClass = Mapper.class, basePackages = "com.wayne.springboot")
// 启动springboot的定时机制,为此需要一个定时的方法来提供服务
// 把Redis的数据导入到数据库
@EnableScheduling
public class SpringBootShoppingApplication{

    public static void main(String[] args) {
        SpringApplication.run(SpringBootShoppingApplication.class, args);
    }
}

一个定时的方法来提供服务把Redis的数据导入到数据库:

import com.wayne.springboot.pojo.PurchaseRecordPo;
import com.wayne.springboot.service.PurchaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

@Service
public class TaskServiceImpl implements TaskService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate = null;
    @Autowired
    private PurchaseService purchaseService = null;

    private static final String PRODUCT_SCHEDULE_SET = "product_schedule_set";
    private static final String PURCHASE_PRODUCT_LIST = "purchase_list_";
    // 每次取出1000条,避免一次取出消耗太多内存
    private static final int ONE_TIME_SIZE = 1000;

    @Override
    // 每天半夜1点钟开始执行任务
//    @Scheduled(cron = "0 0 1 * * ?")
    // 下面是用于测试的配置,每分钟执行一次任务
    @Scheduled(fixedRate = 1000 * 30)
    public void purchaseTask() {
        System.out.println("定时任务开始......");
        Set<String> productIdList
                = stringRedisTemplate.opsForSet().members(PRODUCT_SCHEDULE_SET);
        List<PurchaseRecordPo> prpList =new ArrayList<>();
        for (String productIdStr : productIdList) {
            Long productId = Long.parseLong(productIdStr);
            String purchaseKey = PURCHASE_PRODUCT_LIST + productId;
            BoundListOperations<String, String> ops
                    = stringRedisTemplate.boundListOps(purchaseKey);
            // 计算记录数
            long size = stringRedisTemplate.opsForList().size(purchaseKey);
            Long times = size % ONE_TIME_SIZE == 0 ?
                    size / ONE_TIME_SIZE : size / ONE_TIME_SIZE + 1;
            for (int i = 0; i < times; i++) {
                // 获取至多TIME_SIZE个抢红包信息
                List<String> prList = null;
                if (i == 0) {
                    prList  = ops.range(i * ONE_TIME_SIZE,
                            (i + 1) * ONE_TIME_SIZE);
                } else {
                    prList = ops.range(i * ONE_TIME_SIZE + 1,
                            (i + 1) * ONE_TIME_SIZE);
                }
                for (String prStr : prList) {
                    PurchaseRecordPo prp
                            = this.createPurchaseRecord(productId, prStr);
                    prpList.add(prp);
                }
                try {
                    // 采用该方法采用新建事务的方式,这样不会导致全局事务回滚
                    purchaseService.dealRedisPurchase(prpList);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
                // 清除列表为空,等待重新写入数据
                prpList.clear();
            }
            // 删除购买列表

            stringRedisTemplate.delete(purchaseKey);
            // 从商品集合中删除商品
            stringRedisTemplate.opsForSet()
                    .remove(PRODUCT_SCHEDULE_SET, productIdStr);
        }
        System.out.println("定时任务结束......");
    }

    private PurchaseRecordPo createPurchaseRecord(
        Long productId, String prStr) {
        String[] arr = prStr.split(",");
        Long userId = Long.parseLong(arr[0]);
        int quantity = Integer.parseInt(arr[1]);
        double sum = Double.valueOf(arr[2]);
        double price = Double.valueOf(arr[3]);
        Long time = Long.parseLong(arr[4]);
        Timestamp purchaseTime = new Timestamp(time);
        PurchaseRecordPo pr = new PurchaseRecordPo();
        pr.setProductId(productId);
        pr.setPurchaseTime(purchaseTime);
        pr.setPrice(price);
        pr.setQuantity(quantity);
        pr.setSum(sum);
        pr.setUserId(userId);
        pr.setNote("购买日志,时间:" + purchaseTime.getTime());
        return pr;
    }
}

到这里基本完成,启动项目前先启动redis服务器,并初始化Redis:

hmset product_1 id 1 stock 10000 price 2.00

然后启动并访问浏览器localhost:8080/test,因为设定的间隔为30s,所以等30s去查看数据库。性能相比之前要快上数倍。

产品表

源码存放在github-spring-boot-shopping

上一篇下一篇

猜你喜欢

热点阅读