关于java 锁的一些理解

2021-11-28  本文已影响0人  简单coder

因为本职工作是做 iOS 的,java 项目也只是以前兴趣使然做的,所以理解一直不是很深(毕竟我做的iOS项目本身体量不大),对并发加锁这块儿也是一知半解,昨天研究了挺多方案,感觉一下子明白了许多,所以记录一下自己的感想.也给每个前端(iOS,安卓,web 前端)都介绍一下后端的加锁机制.

既然是自己的感悟,我也不会讲解每个锁的原理,深入代码分析,因为我也不懂,半桶水讲原理,只会越讲越笨,我要讲的是,每种锁的场景,使用的注意,希望 iOSer 们能够对后端,对并发,对锁有个大致的框架理解,在看到某种业务的时候,心里能够有个小点子,唉,这个业务大概需要什么的加锁,怎么实现.

synchronized

java 有synchronized和 lock 两种实现锁的方式, synchronized我以前使用过,就是使用synchronized关键词给方法加锁,但是,这种锁一但锁住,那么其他的线程读写都得等待这个线程的释放锁,万一这个线程执行的是一些 iO 操作,或者是一些比较耗时的查询,结果就会很麻烦,别人看来,你的项目就是"卡卡的",所以基本不会有人使用synchronized这种方式.

lock

lock是一个接口,并且他的接口很简单.

// 获取锁  
void lock()   

// 如果当前线程未被中断,则获取锁,可以响应中断  
void lockInterruptibly()   

// 返回绑定到此 Lock 实例的新 Condition 实例  
Condition newCondition()   

// 仅在调用时锁为空闲状态才获取该锁,可以响应中断  
boolean tryLock()   

// 如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁  
boolean tryLock(long time, TimeUnit unit)   

// 释放锁  
void unlock()  

看不懂的地方没有关系,我会慢慢讲解,需要注意的是,lock 一但执行 lock()加锁后,执行过程中如果出现异常,是不会自动释放锁的,所以为了防止死锁(锁不释放,导致其他所有线程都在等待),锁一定一定要被释放掉,所以一版来说,一个加锁的代码模板是这样的.

Lock lock = ...;
lock.lock();
try{
    //处理任务
}catch(Exception ex){

}finally{
    lock.unlock();   //释放锁
}

finally 可以保证,不管你 try 或者 catch 中,抛出了任何的异常,或者是 return 了任何的返回结果,一定一定会进入 finally 代码块,这样我们就可以简单的实现了加解锁.

tryLock() & tryLock(long time, TimeUnit unit)

lock 还可以做一些其他的功能,比如先尝试获取锁
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true;如果获取失败(即锁已被其他线程获取),则返回false,也就是说,这个方法无论如何都会立即返回(在拿不到锁时不会一直在那等待)。
  tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false,同时可以响应中断。如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
简单来说,这可以做一些业务的场景,比如 在通话场景中,A 与 C 通话,B想与 A 通话,发现 A 被锁住,这时候我们不需要进行等待,直接返回告诉 B 通话繁忙.当然,我们这是无条件的锁,所以D想跟 E 通话,也会因为 A 与 C 通话的锁一起被 return 掉,这是后话,我会在后续的细粒度锁里讲明.
所以,使用 tryLock()tryLock(long time, TimeUnit unit)的代码模板一版是这样的:

Lock lock = ...;
if(lock.tryLock()) {
     try{
         //处理任务
     }catch(Exception ex){

     }finally{
         lock.unlock();   //释放锁
     } 
}else {
    //如果不能获取锁,则直接做其他事情
}

lockInterruptibly()

这个我没使用过,不太清楚场景.

ReentrantLock

ReentrantLock,即 可重入锁。ReentrantLock是唯一实现了Lock接口的类,并且ReentrantLock提供了更多的方法,比如初始化的 公平锁和非公平锁(默认是非公平锁)

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
可重入锁

如果锁具备可重入性,则称作为 可重入锁 。像 synchronized和ReentrantLock都是可重入锁,可重入性在我看来实际上表明了 锁的分配机制:基于线程的分配,而不是基于方法调用的分配。举个简单的例子,当一个线程执行到某个synchronized方法时,比如说method1,而在method1中会调用另外一个synchronized方法method2,此时线程不必重新去申请锁,而是可以直接执行方法method2。

公平锁

公平锁即 尽量 以请求锁的顺序来获取锁。比如,同是有多个线程在等待一个锁,当这个锁被释放时,等待时间最久的线程(最先请求的线程)会获得该所,这种就是公平锁。而非公平锁则无法保证锁的获取是按照请求锁的顺序进行的,这样就可能导致某个或者一些线程永远获取不到锁。
我们一般情况下不去考虑公平锁情况(不知道12306抢票会不会为了大家,实现的是公平锁),既然实现了 lock 接口,那么其实他的使用基本就和上面的 lock 模板一模一样,这里不再过多说明,我要跟你们讲的是另外一个类,ReentrantReadWriteLock.

ReentrantReadWriteLock

业务场景是我们希望查询,读取是不会加锁的,abc 三个线程的读取不会收到影响,但是 d 线程执行写操作,那么 abc 线程就要一起处于等待状态,等到 d 线程释放后,才能继续读取.这样会大大增加读取的效率,这样的情况写,ReentrantReadWriteLock就被设计出来.
ReentrantReadWriteLock实现了ReadWriteLock接口,该接口只有两个方法:

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

提供一个写锁,提供一个读锁,我们也不去看他的代码,瞎猜一下就能大概知道,如果当前的写锁锁住的话,读锁就处于等待状态,如果写锁空闲的话,读锁就可以进行访问,反正就是这么个意思,所以他的代码模板应该是这样的

// 加读锁
        lock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " be ready to read data!");
            Thread.sleep((long) (Math.random() * 1000));
            System.out.println(Thread.currentThread().getName() + " have read data :" + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放读锁
            lock.readLock().unlock();
        }
// 加写锁
        lock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + " be ready to write data!");
            Thread.sleep((long) (Math.random() * 1000));
            this.data = data;
            System.out.println(Thread.currentThread().getName() + " have write data: " + data);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放写锁
            lock.writeLock().unlock();
        }

上面是 java 的基础概念,但是这还远远不够,还记得我们之前提到的 abcd 打电话业务场景吗,这里重新介绍一下改场景.

在通话场景中,A 与 C 通话,B想与 A 通话,发现 A 被锁住,这时候我们不需要进行等待,直接返回告诉 B 通话繁忙.当然,我们这是无条件的锁,所以D想跟 E 通话,也会因为 A 与 C 通话的锁一起被 return 掉.

这时候,java 本身的代码已经不够我们需求的实现了,我们急需更加细粒度的锁,针对某些订单 id,商品 id 的订单,通话场景的 a,b的userId进行加锁,在这样的需求下,细粒度锁就被设计出来了.

SegmentLock分段锁

我们当然不能为每个用户的 id 都创建一个锁,那样会极其耗费资源,性能也会变得极差,但是我们也可以做出一些取舍,我们先预创建一批固定的锁,然后将每个用户分段划分到这些锁里,既然同一个用户一定要分配到同一个锁,那么我们可以尝试用这个 key 的 hash 值去做取锁操作.所以SegmentLock锁代码如下:

public class SegmentLock<T> {

    /**
     * 默认预先创建的锁数量.
     */
    private int DEFAULT_LOCK_COUNT = 20;

    private final ConcurrentHashMap<Integer, ReentrantLock> lockMap = new ConcurrentHashMap<>();

    public SegmentLock() {
        init(null, false);
    }

    public SegmentLock(Integer count, boolean isFair) {
        init(count, isFair);
    }

    private void init(Integer count, boolean isFair) {
        if (count != null && count != 0) {
            this.DEFAULT_LOCK_COUNT = count;
        }
        // 预先初始化指定数量的锁
        for (int i = 0; i < this.DEFAULT_LOCK_COUNT; i++) {
            this.lockMap.put(i, new ReentrantLock(isFair));
        }
    }

    public ReentrantLock get(T key) {
        return this.lockMap.get((key.hashCode() >>> 1) % DEFAULT_LOCK_COUNT);
    }

    public void lock(T key) {
        ReentrantLock lock = this.get(key);
        lock.lock();
    }

    public void unlock(T key) {
        ReentrantLock lock = this.get(key);
        lock.unlock();
    }
}

为什么要无符号位移呢,hashCode 值有可能是得到一个负数,取余之后还是一个负数,用一个负数索引去 Map 中取值得到就是 null,会导致后面在使用时产生 NPE(abs再取余都行)
使用的代码模板基本与之前一致.

ReentrantLock lock = segmentLock.get(userId);
lock.lock();
try{
    //处理任务
}catch(Exception ex){

}finally{
    lock.unlock();   //释放锁
}

这样一来,就基本解决了abcd 分别加锁的场景.但是,我们仔细思考下,我们总共也就DEFAULT_LOCK_COUNT,如果在用户数量不多的情况下,还是够用的,但是,写代码总是需要有前瞻性的,万一你们公司发展起来了呢,日活用户有个50w,万一一个音视频通话业务有个200个人的并发,那么总会有a 跟 z 用户使用了同一把锁,那样又是白白的等待. 所以我们需要更加细粒度的锁,在这样的条件下,haskLock 被设计出来了.

哈希锁HashLock

假设我们确确实实要为每个 key(可以是商品 id,可以是用户 id)创建一个单独的锁,这样就可以解决刚刚所述的问题,但是又会产生锁创建过多的问题,我们需要一个机制,去及时地释放掉那些不使用的锁的内存.
设计代码如下.

public class HashLock<T> {
    private boolean fair = false;
    private final SegmentLock<T> segmentLock = new SegmentLock<>();
    private final ConcurrentHashMap<T, ReentrantLockCount> lockMap = new ConcurrentHashMap<>();

    public HashLock() {

    }

    public HashLock(boolean fair) {
        this.fair = fair;
    }

    public void lock(T key) {
        ReentrantLockCount lock;
        // 通过分段锁来保证获取锁时的线程安全
        this.segmentLock.lock(key);
        try {
            lock = this.lockMap.get(key);
            if (lock == null) {
                lock = new ReentrantLockCount(this.fair);
                this.lockMap.put(key, lock);
            } else {
                // map 中已经存在说明锁已经创建,直接数量加一
                lock.incrementAndGet();
            }
        } finally {
            this.segmentLock.unlock(key);
        }
        lock.lock();
    }

    public void unlock(T key) {
        ReentrantLockCount reentrantLockCount = this.lockMap.get(key);
        // 判断加锁的次数等于一的话可以将 map 中的锁移除
        if (reentrantLockCount.getCount() == 1) {
            this.segmentLock.lock(key);
            try {
                if (reentrantLockCount.getCount() == 1) {
                    this.lockMap.remove(key);
                }
            } finally {
                this.segmentLock.unlock(key);
            }
        }
        reentrantLockCount.unlock();
    }

    static class ReentrantLockCount {
        private ReentrantLock reentrantLock;
        // 记录加锁的次数
        private AtomicInteger count = new AtomicInteger(1);

        public ReentrantLockCount(boolean fair) {
            this.reentrantLock = new ReentrantLock(fair);
        }

        public void lock() {
            this.reentrantLock.lock();
        }

        public void unlock() {
            this.count.decrementAndGet();
            this.reentrantLock.unlock();
        }

        public int incrementAndGet() {
            return this.count.incrementAndGet();
        }

        public int getCount() {
            return this.count.get();
        }
    }
}

这个类的设计是,我们维护一个 hashMap,用 key作为 map 的 key,如果 key 中取不到这个锁,我们就new 一个放进去,并且用这个锁使用,如果2个线程访问,那么第二个访问到的会在维护的 count+1,在释放锁的时候,
如果 count==1,从这个 hashmap 中移除该锁,以此达到释放锁的目的.当然,获取锁的这个过程是会被并发影响的,为了保证获取锁的线程安全,我们再用一个分段锁对获取锁的过程加锁,释放锁同理.

哈西锁的设计需要考虑到及时的移除hashMap 中不使用的锁,这个过程有可能出问题,所以代码设计需要十分眼睛,包括 count 也需要设计成AtomicInteger.

优点:很好地解决了不同用户共用锁的问题
缺点:需要通过分段锁来维护锁的获取和移除,同时还要维护加锁的次数,分段锁这里锁的数量会成为性能的瓶颈,而且稍有不慎锁没释放成功可能会产生内存泄漏的问题。

哈西锁虽然能处理掉我们大部分的需求,但是,设计过程复杂,容易内存泄露,分段锁进行取锁解锁保护也是一个性能的瓶颈,我们需要一个更好的及时释放锁的机制,这样的条件下,弱引用锁被设计出来了

弱引用锁

利用弱引用的特性,这样就能够拿掉分段锁,把锁对象的资源回收交给 Java 虚拟机,然后对于已经被回收的锁进行移除,能有效避免不小心发生内存泄漏的问题。

利用弱引用的特性,这样就能够拿掉分段锁,把锁对象的资源回收交给 Java 虚拟机,然后对于已经被回收的锁进行移除,能有效避免不小心发生内存泄漏的问题。
有点类似 iOS 的retainCount(在 clear 的时候,如果retainCount==1,仅map 持有就移除).
代码实现:

public class WeakHashLock<T> {

    /**
     * map 中锁数量阈值.
     */
    private static final int LOCK_SIZE_THRESHOLD = 1000;
    private ReferenceQueue<ReentrantLock> queue = new ReferenceQueue<>();
    private ConcurrentHashMap<T, WeakRefLock<T, ReentrantLock>> lockMap = new ConcurrentHashMap<>();

    public ReentrantLock get(T key) {
        // 可以设置一个阈值,当锁的数量超过这个阈值时移除一部分被回收的锁
        if (this.lockMap.size() > LOCK_SIZE_THRESHOLD) {
            clearEmptyRef();
        }

        WeakRefLock<T, ReentrantLock> weakRefLock = this.lockMap.get(key);
        ReentrantLock lock = weakRefLock == null ? null : weakRefLock.get();
        while (lock == null) {
            lockMap.putIfAbsent(key, new WeakRefLock<>(new ReentrantLock(), this.queue, key));
            // 再次从 Map 中获取,保证同一用户获取的锁是一致的
            weakRefLock = lockMap.get(key);
            lock = weakRefLock == null ? null : weakRefLock.get();
            if (lock != null) {
                return lock;
            }
            // 这里注意如果堆资源过于紧张可能会返回空的情况,需要移除一部分被回收的锁
            clearEmptyRef();
        }
        return lock;
    }

    @SuppressWarnings("unchecked")
    public void clearEmptyRef() {
        Reference<? extends ReentrantLock> ref;
        while ((ref = this.queue.poll()) != null) {
            WeakRefLock<T, ? extends ReentrantLock> weakRefLock = (WeakRefLock<T, ? extends ReentrantLock>) ref;
            this.lockMap.remove(weakRefLock.key);
        }
    }


    static final class WeakRefLock<T, K> extends WeakReference<K> {

        private final T key;

        public WeakRefLock(K referent, ReferenceQueue<? super K> queue, T key) {
            super(referent, queue);
            this.key = key;
        }
    }
}

弱引用锁的读锁,使用了putIfAbsent

1.使用 put 方法添加键值对,如果 map 集合中没有该 key 对应的值,则直接添加,并返回 value;如果已经存在对应的值,则会覆盖旧值,value 为新的值,返回值为 value。
2.使用 putIfAbsent 方法添加键值对,如果 map 集合中没有该 key 对应的值,则直接添加,并返回 null,如果已经存在对应的值,则依旧为原来的值,返回值为 value(旧的值)。

利用了弱引用的特性,拿掉了锁的获取和创建时维护加锁次数的判断过程,在获取锁时直接从 Map 中获取,如果拿到为空则创建,同时这里要解释一下代码里面清理被回收的锁的过程。第一处在 Map 中的锁数量超过设定的阈值后将已经被回收的锁进行移除,主要是为了不让 Map 中存放过多的已经被回收的锁占用资源,第二处移除主要是以防资源过于紧张的情况,刚刚创建的弱引用锁立即就被回收了,这时急需移除一部分已经被回收的锁。当然如果资源真的都已经紧张到这个程度了的话,也应该考虑考虑提高一下机器的配置了。

总结

希望大家看完文章后,能够在心里对锁的使用有一个初步的印象,脑子里有一个树状脑图,知道什么时候需要什么锁.如果有点忘记,看看下面我提供的大纲,脑子里再回想一下.

喝水不忘挖井人,致谢:

java 锁基础
java 引用类型
细粒度锁

ps: 下一篇不定时预告,可能是 java 我自己写的一些总结,也可能是flutter的总结,最近一段时间完完全全没更新 blog,主要是闲暇时间全部精力放在 flutter 的学习上,偶尔放在 java 上,flutter 也算初步有所得,到时候看能不能写个长篇出来.

上一篇 下一篇

猜你喜欢

热点阅读