Android开发经验谈Android开发

Java 并发"锁"的本质(一步步实现锁)

2020-12-21  本文已影响0人  小鱼人爱编程

前言

线程并发系列文章:

Java 线程基础
Java “优雅”地中断线程
Java 线程状态
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有误
Java Unsafe/CAS/LockSupport 应用与原理
Java 并发"锁"的本质(一步步实现锁)

在上篇分析了CAS、线程挂起/唤醒相关知识后,常规的做法本篇就需要分析Synchronized与AQS的源码了。不过此次并不打算这样做,这么做就会陷入源码的枯燥讲解中,不了解前因后果,转折过于生硬。因此本篇先从实际需求一步步推导为什么需要"锁"?如何自己实现"锁"等步骤,最后才自然过渡到系统提供了哪些"锁"及其原理与应用。
通过本篇文章,你将了解到:

1、互斥访问变量
2、CAS访问变量
3、互斥访问临界区
4、线程挂起/唤醒策略
5、线程同步策略
6、总结

1、互斥访问变量

先看一段代码:

    static int a = 0;
    private static void inc() {
        a++;
    }

如上线程1、线程2同时访问这段代码,对共享变量a进行自增操作,我们知道a的结果是不可控的。


image.png

要想结果可控,那么要求线程不能同时对a进行操作,也就是需要对a进行互斥访问。

2、CAS访问变量

通过上篇文章分析可知,CAS可以实现互斥地访问共享变量,于是代码改为如下:

    static AtomicInteger a = new AtomicInteger(0);
    private static void inc() {
        a.incrementAndGet();
    }

AtomicInteger 里面包装了变量a,其底层通过CAS互斥访问变量a,因此实现了多线程互斥访问变量。
CAS 细节请移步:Java Unsafe/CAS/LockSupport 应用与原理

3、互斥访问临界区

以上分析的是多线程访问单个变量的场景,考虑另一种情况:当需要互斥访问的变量不仅是a、还有b、c等变量,你可能会这么做:

    static AtomicInteger a = new AtomicInteger(0);
    static AtomicInteger b = new AtomicInteger(0);
    static AtomicInteger c = new AtomicInteger(0);
    private static void inc() {
        a.incrementAndGet();
        b.incrementAndGet();
        c.incrementAndGet();
    }

有多少共享变量就需要多少个AtomicInteger 封装。现在还只是有3个共享变量而已,若是有更多的呢?这么做显然无法满足更进一步的需求。
既然a、b、c都需要互斥访问,那么能否在入口处统一做互斥处理就好了呢?


image.png

进入临界区

对多个共享变量的操作放入到临界区,那么问题来了如何实现临界区的互斥访问呢?我们依旧想到了CAS。

设置共享变量x,初始值为0,每个想要进入临界区的线程都先要访问x,将x修改为1,若是成功,则能够进入临界区,否则一直死循环不断尝试修改。

用代码来实现如下:

    static AtomicInteger x = new AtomicInteger(0);
    static int a = 0;
    static int b = 0;
    static int c = 0;
    private static void inc() {
        //尝试将x从0修改为1,若是失败则一直重试
        while(!x.compareAndSet(0, 1));
        //走到这里说明已经修改成功
        {
            //临界区
            a++;
            b++;
            c++;   
        }
    }

当多个线程同时进入inc()方法后,先尝试修改x的值,若是成功则退出循环,否则一致尝试。当其中一个线程成功将x从0修改为1后,其它线程继续尝试此操作将会失败。而只有成功修改了x值的线程才能进入临界区,因此对于临界区的互斥访问已经实现了。

退出临界区

进入临界区的线程总有退出来的时候,退出时需要将x修改回来,以便其它线程能够进入临界区,因此再增加释放x的操作:

    static AtomicInteger x = new AtomicInteger(0);

    static int a = 0;
    static int b = 0;
    static int c = 0;
    private static void inc() {
        //尝试将x从0修改为1,若是失败则一直重试
        while(!x.compareAndSet(0, 1));
        //走到这里说明已经修改成功
        {
            //临界区
            a++;
            b++;
            c++;
        }
        //此处不需要一直尝试,因为同一时刻始终只有一个线程访问
        x.compareAndSet(1, 0);
    }

由以上可知,进入临界区前先用CAS修改x的值,修改成功后进入临界区。当退出临界区后再用CAS修改x变回原来的值,这就实现了互斥访问临界区的过程。
想要对任何临界区进行访问,都可以使用这种方法,想一想这是不是相当于进临界区前先拿到"锁",其它没拿到"锁"的线程一直尝试拿"锁",当拥有锁的线程退出临界区后释放"锁",其它就可以拿到"锁"了。


image.png

将"锁"的代码抽象出来,访问临界区如下:

    static int a = 0;
    static int b = 0;
    static int c = 0;
    //构造Lock对象
    static Lock lock = new MyLock();
    private static void inc() {
        //获取锁
        lock.lock();
        {
            //临界区
            a++;
            b++;
            c++;
        }
        //释放锁
        lock.unlock();
    }
    
    //抽象锁结构
    interface Lock {
        void lock();
        void unlock();
    }
    
    static class MyLock implements Lock{
        AtomicInteger x = new AtomicInteger(0);
        
        @Override
        public void lock() {
            while(!x.compareAndSet(0, 1));
        }
        
        @Override
        public void unlock() {
            x.compareAndSet(1, 0);
        }
        
    }

因此互斥访问临界区的步骤:

1、获取锁
2、进入临界区
3、退出临界区
4、释放锁

4、线程挂起/唤醒策略

上面以两个线程访问临界区为例,当线程1成功获取锁进入临界区后,线程2拿不到锁但是会一直尝试。试想一下:

  • 不只是两个线程竞争锁,而是很多线程同时竞争锁。
  • 临界区执行时间很长,锁很难被释放出来。

那么没获取到锁的线程一直无限循环去尝试获取,如此一来很浪费CPU,能不能让没获取到锁的线程先挂起,当释放锁的时候再把它唤醒呢?

线程挂起

竞争锁失败的线程先将自己挂起,而其它释放锁的线程如何找到之前被挂起的线程呢?将挂起的线程放入队列里,当另一个线程释放锁后从队列里取出当初被挂起的线程并唤醒它,被唤醒的线程继续去竞争锁。


image.png

流程清晰了,看看如何用代码实现:

    static class MyLock implements Lock{
        AtomicInteger x = new AtomicInteger(0);
        
        //阻塞队列
        LinkedBlockingQueue<Thread> blockList = new LinkedBlockingQueue<>();

        @Override
        public void lock() {
            //while 循环是为了线程被唤醒后继续竞争锁
            while (true) {
                if (x.get() > 0) {
                    //说明已经有线程正在持有锁
                    //此处暂时不考虑重入
                } else {
                    //无锁状态
                    if (x.compareAndSet(0, 1)) {
                        //成功获取锁
                        return;
                    } else {
                        //获取锁失败
                    }
                }
                //走到此,说明没获取到锁
                //加入队列
                blockList.offer(Thread.currentThread());
                //挂起线程
                LockSupport.park();   
            }
        }
    }

上面是加锁的流程:

1、先判断当前锁是否可用,若是可用则获取锁。
2、若获取成功则退出,否则加入阻塞队列,并将自己挂起。

线程唤醒

再来看看,如何唤醒一个被挂起的线程:

        @Override
        public void unlock() {
            if (x.get() > 0) {
                //说明当前持有锁
                //释放锁
                x.compareAndSet(1, 0);
                //从阻塞队列里取出线程唤醒
                //此处取队头元素
                Thread thread = blockList.poll();
                if (thread != null) {
                    LockSupport.unpark(thread);
                }
            }
        }

上面是解锁流程:

1、先判断当前是否持有锁,若是则释放锁。
2、释放锁后,从阻塞队列里取出线程唤醒。

持有锁的线程从阻塞队列唤醒阻塞的线程后,被唤醒的线程继续尝试竞争锁。

LockSupport 细节请移步:Java Unsafe/CAS/LockSupport 应用与原理

5、线程同步策略

上述分析的是多个线程互斥访问临界区,这些线程对临界区的操作仅仅是互斥,并没有其它依赖关系,想象一种场景:

1、线程1对变量a进行自增操作,当a增加到10的时候暂停自增。
2、线程2对变量b进行自减操作,当a减到0的时候暂停自减。

结合Java 线程基础,我们知道线程1和线程2具有同步关系,也知道线程同步需要加锁对条件变量进行互斥访问。因此,有如下关系:

image.png

从图上可以看出:

1、当需要等待的时候,走红色线条部分。将线程放入到等待队列里、释放锁,并唤醒阻塞队列里的线程。
2、当需要通知的时候,走绿色线条部分,将线程从等待队列里移除,并将之加入到阻塞队列里。
3、线程同步的过程比线程互斥过程新增了等待队列,该队列存储着因某种条件而挂起等待的线程。当另一个线程发现条件满足后,通知等待队列里的线程,让它继续做事。

在线程互斥代码的基础上,增加同步相关的代码:
首先
先抽象出等待-通知接口:

    //抽象同步的等待、通知机制
    interface Condition {
        void waitCondition();

        void notifyCondition();
    }

其次
实现该接口:

    static class MyLock implements Lock {
        AtomicInteger x = new AtomicInteger(0);

        //阻塞队列
        LinkedBlockingQueue<Thread> blockList = new LinkedBlockingQueue<>();

        public Condition newCondition() {
            return new MyCondition();
        }

        @Override
        public void lock() {
            //while 循环是为了线程被唤醒后继续竞争锁
            while (true) {
                if (x.get() > 0) {
                    //说明已经有线程正在持有锁
                    //此处暂时不考虑重入
                } else {
                    //无锁状态
                    if (x.compareAndSet(0, 1)) {
                        //成功获取锁
                        return;
                    } else {
                        //获取锁失败
                    }
                }
                //走到此,说明没获取到锁
                //加入队列
                blockList.offer(Thread.currentThread());
                //挂起线程
                LockSupport.park();
            }
        }

        @Override
        public void unlock() {
            if (x.get() > 0) {
                //说明当前持有锁
                //释放锁
                x.compareAndSet(1, 0);
                //从阻塞队列里取出线程唤醒
                //此处取队头元素
                Thread thread = blockList.poll();
                if (thread != null) {
                    LockSupport.unpark(thread);
                }
            }
        }

        class MyCondition implements Condition {
            LinkedBlockingQueue<Thread> waitList = new LinkedBlockingQueue<>();

            //等待-通知
            @Override
            public void waitCondition() {
                //加入等待队列
                waitList.add(Thread.currentThread());

                //释放锁,让其它线程有机会获得锁
                x.compareAndSet(1, 0);
                Thread thread = blockList.poll();
                if (thread != null) {
                    LockSupport.unpark(thread);
                    //挂起当前线程
                    LockSupport.park();
                }
            }

            @Override
            public void notifyCondition() {
                //收到通知后,说明等待的条件满足了
                //将线程从等待队列里移除
                Thread thread = waitList.poll();
                //并将该线程加入到阻塞队列里
                if (thread != null)
                    blockList.offer(thread);
            }
        }
    }

并提供获取该接口的方法:newCondition()。

最后
来看看如何使用它:

    public static void main(String args[]) {
        try {
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    inc();
                }
            }, "t1");

            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    sub();
                }
            }, "t2");

            t1.start();
            t2.start();

        } catch (Exception e) {

        }
    }
    static int a = 0;
    //构造Lock对象
    static MyLock lock = new MyLock();
    static Condition conditionInc = lock.newCondition();
    static Condition conditionSub = lock.newCondition();


    private static void inc() {
        try {
            while (true) {
                //获取锁
                lock.lock();
                {
                    System.out.println("lock suc in " + Thread.currentThread().getName());
                    if (a < 10) {
                        a++;
                        conditionSub.notifyCondition();
                        System.out.println("notify: a:" + a + " in " + Thread.currentThread().getName());
                    } else {
                        //阻塞等待,并释放锁
                        System.out.println("wait: a:" + a + " in " + Thread.currentThread().getName());
                        conditionInc.waitCondition();
                    }
                }
                //释放锁
                lock.unlock();
            }
        } catch (Exception e) {

        }
    }

    private static void sub() {
        try {
            while (true) {
                lock.lock();
                System.out.println("lock suc in " + Thread.currentThread().getName());
                if (a == 0) {
                    System.out.println("wait: a:" + a + " in " + Thread.currentThread().getName());
                    conditionSub.waitCondition();
                } else {
                    a--;
                    conditionInc.notifyCondition();
                    System.out.println("notify: a:" + a + " in " + Thread.currentThread().getName());
                }
                lock.unlock();
            }
        } catch (Exception e) {

        }
    }

该Demo实现功能:

1、线程1对共享变量a进行自增,当a<10时一直自增,并通知线程2对a自减。当a>=10时,不再自增,在原地等待a的值变小。
2、线程2对共享变量进行自减,当a>0时一直自减,并通知线程1对a自增。
当a=0时,不再自减,在原地等待a的值变大。
3、通过waitCondition/notifyCondition 实现了两个线程之间的有序协作。实际上就是个典型的生产者-消费者模型。

需要注意的时:

当调用waitCondition 时候释放了锁,并唤醒了阻塞队列上的线程。
当调用notifyCondition 时候并没有释放锁,也没有唤醒阻塞队列上的线程。

6、总结

以上从线程互斥访问共享变量、CAS访问共享变量、线程互斥访问临界区、线程互斥锁、最后到线程同步,一步步阐述了"锁"的由来,以及如何实现"锁"。
当然这个"锁"只是一个最基本最简单的"锁",没有考虑到重入、中断取消、公平与非公平抢占、一些竞争锁的性能提升等方面的问题。但是却是囊括了"锁"的基本思想:

线程挂起/唤醒 + CAS + 阻塞/等待队列

了解了基本思想,再去看系统提供的锁实现-->AQS、Synchronized,相信就能够轻松看懂了。
下篇正式进入AQS、Synchronized 的分析。

您若喜欢,请点赞、关注,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android

1、Android各种Context的前世今生
2、Android DecorView 一窥全貌(上)
3、Android DecorView 一窥全貌(下)
4、Window/WindowManager 不可不知之事
5、View Measure/Layout/Draw 真明白了
6、Android事件分发全套服务
7、Android invalidate/postInvalidate/requestLayout 彻底厘清
8、Android Window 如何确定大小/onMeasure()多次执行原因
9、Android事件驱动Handler-Message-Looper解析
10、Android 键盘一招搞定
11、Android 各种坐标彻底明了
12、Android Activity/Window/View 的background
13、Android IPC 之Service 还可以这么理解
14、Android IPC 之Binder基础
15、Android IPC 之Binder应用
16、Android IPC 之AIDL应用(上)
17、Android IPC 之AIDL应用(下)
18、Android IPC 之Messenger 原理及应用
19、Android IPC 之获取服务(IBinder)
20、Android 存储基础
21、Android 10、11 存储完全适配(上)
22、Android 10、11 存储完全适配(下)
23、Java 并发系列不再疑惑

上一篇下一篇

猜你喜欢

热点阅读