分布式@架构师编程技巧分布式服务

Redis实现高并发扣减库存,秒杀功能(可线上使用)

2023-02-18  本文已影响0人  运气爆棚lsw

常见秒杀方案设计:
1.数据库行锁
2.分布式锁+分段锁提升效率
3.Redis单线程机制,将库存放在Redis里面使用
set count 1000
decrby count 1 扣减库存,返回正数就可扣减库存
4.Redis+Lua脚本,查询库存和扣减库存放到Lua脚本里面去执行
这是一个原子操作,解决高并发下线程安全问题
总结:简单利用redis的LUA脚本功能,一次性操作,实现原子性

Redis+Lua实现高并发秒杀功能

1、导入相关依赖
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

       <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.4</version>
        </dependency>

2、RedisConfig Bean初始化配置

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.redisson.config.SingleServerConfig;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;

    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
        final Config config = new Config();

        SingleServerConfig singleServerConfig = config.useSingleServer()
                .setAddress("redis://" + host + ":" + port);
        if (StringUtils.isNotBlank(password)) {
            singleServerConfig.setPassword(password);
        }
        System.out.println("------------- redisson -----------------------");
        System.out.println(config.getTransportMode());
        return Redisson.create(config);
    }


    /**
     * 重写Redis序列化方式,使用Json方式:
     * 数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的
     * RedisTemplate默认使用的是JdkSerializationRedisSerializer
     * StringRedisTemplate默认使用的是StringRedisSerializer
     * <p>
     * Spring Data JPA为我们提供了下面的Serializer:
     * GenericToStringSerializer、Jackson2JsonRedisSerializer
     * JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。
     * 在此我们将自己配置RedisTemplate并定义Serializer
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);

        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer =
        new Jackson2JsonRedisSerializer<Object>(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        // 设置值(value)的序列化采用Jackson2JsonRedisSerializer。
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        // 设置键(key)的序列化采用StringRedisSerializer。
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}
3、Redis+Lua脚本实现秒杀扣减库存
public interface IStockCallback {

    /**
     * 扣减Redis库存
     *
     * @param batchNo 商品唯一编号
     * @param expire  过期时间
     * @param num     扣减库存数量
     * @return 剩余库存数量
     */
    long getStock(String batchNo, long expire, int num);

    /**
     * 初始化库存
     *
     * @param commodityId 业务
     * @return 库存
     */
    int initStock(long commodityId);
}

下面是下单扣减库存业务的代码块,在扣库存的时候,不能超发,也不能扣到负数,
然后再同步到MYSQL里,初始化库存数量,这个可以从DB里取实际的量,
LUA脚本保证原子性,查询剩余库存和扣减逻辑是一个原子性操作

@Slf4j
@Service
public class RedisStockService implements IStockCallback {

    /**
     * 库存还未初始化
     */
    public static final long UNINITIALIZED_STOCK = -3L;
    /**
     * 判断商品是否存在KEY标识
     */
    public static final long EXIST_FLAG = -2L;
    /**
     * 配置库存Redis缓存Key前缀
     */
    public static final String REDIS_KEY = "REDIS_KEY:STOCK:";
    /**
     * 执行扣库存的Lua脚本
     */
    public static final String STOCK_LUA;
    /**
     * Redisson 客户端
     */
    @Resource
    private RedissonClient redissonClient;
    /**
     * Redis 客户端
     */
    @Resource
    private RedisTemplate<String, Integer> redisTemplate;

    static {
        /*
         * @desc 扣减库存Lua脚本
         * 库存(stock)-1:表示不限库存
         * 库存(stock) 0:表示没有库存
         * 库存(stock)大于0:表示剩余库存
         *
         * @params 库存key
         * @return
         *      -3:库存未初始化
         *      -2:库存不足
         *      -1:不限库存
         *      大于等于0: 剩余库存(扣减之后剩余的库存), 直接返回-1
         */
        final StringBuilder strBuilder = new StringBuilder();
        strBuilder.append("if (redis.call('exists', KEYS[1]) == 1) then");
        strBuilder.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        strBuilder.append("    local num = tonumber(ARGV[1]);");
        strBuilder.append("    if (stock == -1) then");
        strBuilder.append("        return -1;");
        strBuilder.append("    end;");
        strBuilder.append("    if (stock >= num) then");
        strBuilder.append("        return redis.call('incrby', KEYS[1], 0 - num);");
        strBuilder.append("    end;");
        strBuilder.append("    return -2;");
        strBuilder.append("end;");
        strBuilder.append("return -3;");
        STOCK_LUA = strBuilder.toString();
    }

    /**
     * 执行扣减库存业务
     *
     * @param batchNo 库存唯一标识
     * @param expire  库存过期时间
     * @param num     扣减库存的数量
     * @return 返回扣减库存后剩余库存数量
     */
    @Override
    public long getStock(String batchNo, long expire, int num) {
        // 商品库存唯一标识
        final String key = REDIS_KEY + batchNo;

        /*
         * 从redis中获取key对应的过期时间;
         * 1、如果该值有过期时间,就返回相应的过期时间;
         * 2、如果该值没有设置过期时间,就返回-1;
         * 3、如果没有该值,就返回-2;
         *
         * 注意:这里为了方便模拟,实际线上。通过缓存预热的方式通过DB查询实际的库存数据
         * 添加到Redis中
         */
        Long expire1 = redisTemplate.opsForValue().getOperations().getExpire(key);
        if (Objects.equals(EXIST_FLAG, expire1)) {
            redisTemplate.opsForValue().set(key, 100, expire, TimeUnit.SECONDS);
            System.out.println("Redis无初始库存,设置库存数据 = " + expire1);
        }

        // 初始化商品库存
        Integer stock = redisTemplate.opsForValue().get(key);

        // 设置分布式锁
        final RLock rLock = redissonClient.getLock(REDIS_KEY + ":LOCK");
        try {
            if (rLock.tryLock(1, TimeUnit.SECONDS)) {
                stock = redisTemplate.opsForValue().get(key);
                log.info("--- 当前Key:[{}]加锁成功,当前最新库存:{}---", key, stock);
                // 调一次扣库存的操作
                Long stock1 = stock(key, num);
                System.out.println("stock1 = " + stock1);

                stock = redisTemplate.opsForValue().get(key);
                int batchNoLock = Objects.requireNonNull(stock);
                log.info("--- 当前剩余库存:{}", batchNoLock);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            if (rLock != null && rLock.isHeldByCurrentThread()) {
                rLock.unlock();
            }
        }
        return stock;
    }

    /**
     * 扣库存这步特别注意,分布式连接有问题,需要依赖包里,去掉lettuce组件
     * 初始化库存数量,这个可以从DB里取实际的量
     *
     * @param key 库存key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
     */
    private Long stock(String key, int num) {
        // 脚本里的KEYS参数
        List<String> keys = new ArrayList<>();
        keys.add(key);

        // 脚本里的ARGV参数
        List<String> argvList = new ArrayList<>();
        argvList.add(Integer.toString(num));

        // 执行扣减库存LUA脚本
        return redisTemplate.execute((RedisCallback<Long>) connection -> {
            Object nativeConnection = connection.getNativeConnection();
            // 集群模式
            if (nativeConnection instanceof JedisCluster) {
                return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, argvList);
            }
            // 单机模式
            else if (nativeConnection instanceof Jedis) {
                return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, argvList);
            }
            return UNINITIALIZED_STOCK;
        });
    }

    /**
     * 获取初始的库存
     * 初始化库存数量,这个可以从DB里取实际的量
     *
     * @param commodityId 业务ID
     * @return 初始库存
     */
    @Override
    public int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 30;
    }
}
3、调用接口并发Controller,测试分布式库存扣减
   @Resource
    private RedisStockService redisStockService;

    /**
     * 下单扣减库存
     *
     * @param stockDTO 下单请求参数
     * @return 扣减库存结果
     */
    @PostMapping("/buyProductCreateOrder")
    public Object buyProductCreateOrder(@RequestBody StockDTO stockDTO) {
        try {
            return redisStockService.getStock(stockDTO.getBatchNo(), stockDTO.getExpire(), stockDTO.getNum());
        } catch (Exception e) {
            return e.getMessage();
        }
    }
上一篇下一篇

猜你喜欢

热点阅读