GO在秒杀场景下的并发问题(库存超卖)

2020-03-16  本文已影响0人  Bill_Li_GB

先来一下小概念

CAP定理

任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项

《在多线程的软件世界里,对共享资源的争抢过程(Data Race)就是并发》

《而对共享资源数据进行访问保护的最直接办法就是引入锁!》

那么库存超卖是如何产生

比如下单时由于以下的情况导致了实际库存只有1000件商品,却出现了两笔999件商品的订单。1000件卖出了1998件商品,那么多出来的998件商品怎么办呢,没货可发了。此时多出来的就是超卖现象。所以看看以下两种情况。


超卖现象

以上一种情况库存出现负数,一个出现实际售出数量已经大于现有库存了。如果并发足够大,100个人都购买10份,并发发生。那么库存只有15的,就会卖出1000份。应该能说明问题了。

如何解决

一、分布式锁

分布式锁应该是怎么样的

基本原理:用一个状态值表示锁,对锁的占用和释放通过状态值来标识。

线程锁:主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。
进程锁:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。
分布式锁:当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。

分布式锁设计考虑维度

单点问题:单点问题又称为单点故障,(HA/keepalived)主备模式\主从模式
失效时间:一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁
非阻塞的:没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
非重入的:同一个线程在没有释放锁之前无法再次获得该锁。(主机信息和线程信息)

分布式式锁的实现方式

操作简单、容易理解。但是存在单点问题、数据库性能开销较大、不可重入。

问题

1、锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
3、锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

问题

1、锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在缓存中,其他线程无法再获得到锁。
2、锁只能是非阻塞的,无论成功还是失败都直接返回。
3、锁是非重入的,一个线程获得锁之后,在释放锁之前,无法再次获得该锁,因为使用到的key在缓存中已经存在。无法再执行put操作。

/**
 * 基于Redis实现分布式锁实现
 * 
 * 1、 采用Redis集群模式,避免单点故障问题。
 * 单例模式采用双重锁机制保障Reids集群实例线程安全,避免并发量大的时候消耗内存和CPU资源。
 * vehicle-common. JedisClusterFactory只支持Redis集群模式。
 * 2、 锁失效机制,尝试获取锁机制,死锁防止机制,避免锁操作失败影响其他线程。
 * 3、 线程获取锁排队机制,锁获取获取数限制机制,尝试获得锁机制,基于等待阻塞。
 * 4、 全局唯一事件码,标识同一个线程,解决锁可重入。
 * 获取唯一的事件编码规则:8位长度的随机字符串+YYYYMMDDHHSSmm+并发计数器。
 * 持有锁,且可重新获得锁。
 *
 */

    /**
     * 在时间范围内尝试获取锁,超时则放弃返回失败。
     * 
     * @param timeout
     * @param unit
     * @return
     */
    @Override
    public boolean tryLock(long timeout, TimeUnit unit) {
        boolean holdLock = true;
        //获取redis集群连接
        jedisCluster = coordinator.getJedisCluster();
        try {
            Long waitCount = jedisCluster.lpush(lockName, eventId);
            jedisCluster.expire(lockName, maxLiveSeconds);

            if (waitCount == 1) {
                return holdLock;
            }

            if (waitCount > maxWaitLimit) {
                holdLock = false;
                throw new LockException(LOCK_ERROR_CODE,
                        String.format("RedisDistributeLock Lock[%s] Too many wait", lockName));
            }

            if (timeout == 0) {
                holdLock = false;
                return holdLock;
            }

            holdLock = coordinator.await(lockName, eventId, unit.toMillis(timeout));

            if (!holdLock) {
                throw new LockException(LOCK_ERROR_CODE,
                        String.format("RedisDistributeLock Lock[%s] Timeout", lockName));
            }
            return holdLock;
        } finally {
            if (!holdLock) {
                jedisCluster.lrem(lockName, 1, eventId);
            }
        }
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        if (Objects.isNull(jedisCluster)) {
            throw new LockException(LOCK_ERROR_CODE, "cann't unlock,because Lock not found ,redis conn is null");
        }
        try {
            Long lrem = jedisCluster.lrem(lockName, 1, eventId);
            LOGGER.debug("unlock lockName:{},eventId:{},lrem:{}", lockName, eventId, lrem);
            coordinator.notifyNext(jedisCluster, lockName);
        } finally {

        }
    }
/**
     * zookeeper节点的监视器
     */
    public void process(WatchedEvent event) {
        if (this.latch != null) {
            if (event.getType() == EventType.NodeDeleted) {
                this.latch.countDown();
            }
        }
    }

    public void lock() {
        try {
            if (this.tryLock()) {
                return;
            } else {
                waitForLock(waitNode, sessionTimeout);
            }
        } catch (KeeperException e) {
            throw new LockException(LOCK_ERR_EXCE, e);
        } catch (InterruptedException e) {
            throw new LockException(LOCK_ERR_EXCE, e);
        }
    }

    public boolean tryLock() {
        try {
            // 创建临时子节点
            myZnode = zk.create(ROOT_PATH + "/" + lockName + LOCK_KEY_SUFFIX, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 取出所有子节点
            List<String> subNodes = zk.getChildren(ROOT_PATH, false);
            if (subNodes.size() == 1) {
                System.out.println("get lock");
                return true;
            }
            // 取出所有lockName的锁
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                if (node.split(LOCK_KEY_SUFFIX)[0].equals(lockName)) {
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);
            // 如果是最小的节点,则表示取得锁
            if (myZnode.equals(ROOT_PATH + "/" + lockObjNodes.get(0))) {
                return true;
            }
            // 如果不是最小的节点,找到比自己小1的节点
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(LOCK_ERR_EXCE, e);
        } catch (InterruptedException e) {
            throw new LockException(LOCK_ERR_EXCE, e);
        }
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(waitNode, time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(ROOT_PATH + "/" + waitNode, true);
        // 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁
        if (stat != null) {
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }

    public void unlock() {
        try {
            zk.delete(myZnode, -1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

通过有序临时节点实现锁机制,自己对应的节点需要最小,则被认为是获得了锁。优点:集群可以透明解决单点问题,避免锁不被释放问题,同时锁可以重入。缺点:性能不如缓存方式,吞吐量会随着zk集群规模变大而下降。

三种分布式实现方式对比

数据库分布式锁实现缺点:
1.db操作性能较差,并且有锁表的风险
2.非阻塞操作失败后,需要轮询,占用cpu资源;
3.长时间不commit或者长时间轮询,可能会占用较多连接资源

Redis(缓存)分布式锁实现缺点:
1.锁删除失败 过期时间不好控制
2.非阻塞,操作失败后,需要轮询,占用cpu资源;

ZK分布式锁实现缺点:
性能不如redis实现,主要原因是写操作(获取锁释放锁)都需要在Leader上执行,然后同步到follower。

《分布式锁这个东西保证了数据的准确性,但是他天然并发能力有点弱,在高并发场景下如何优化分布式锁的并发性能》

二、分布式锁的优化,或者解决超卖的其他方案

分析分布式锁在高并发场景下的问题

从上面的流程可以知道通过分布式锁解决超卖问题是存在很大缺陷的,同一个商品多用户同时下单的时候,会基于分布式锁串行化处理,导致没法同时处理同一个商品的大量下单请求。

所以分布式锁适合普通电商,低并发、秒杀这种场景就显得不太适合了。

如何对分布式锁进行高并发优化

锁数据分段方式:思路是把数据分成若干段,每个段是一个单独的锁,所以多个线程并发修改数据的时候,可以并发的修改不同段的数据。
如果分成20段,那么1秒就可以处理 20 * 50 = 1000个请求了。

三、队列串行化

以上是一些优化高并发场景下解决超卖的一些思路。

四、其他参考

《如何解决秒杀的性能问题和超卖的讨论》
以下链接很有参考价值,可以仔细看看
https://www.jianshu.com/p/ed4f75ef2213

上一篇下一篇

猜你喜欢

热点阅读