JAVA并发——线程同步器AQS

2021-07-19  本文已影响0人  海晨忆

个人博客:haichenyi.com。感谢关注

前言

  每一项技术的提出都是为了解决某一个问题,带着问题来理解技术,使得印象你对这个技术的理解印象更加深刻。

并发

  举个栗子:现在有一个需求,网络请求分两批(A,B两个批次),A批次并行请求,B批次串行请求按顺序一个一个请求,有一个总超时时间,B批次每一次请求都有一个超时时间,A批次并发请求先请求,在规定时间内没有返回,再开始请求B批次,谁先返回用谁的。

  这个是我们项目里面简化过后的一个逻辑,实际逻辑,比这个还要复杂。怎么实现这个功能呢?

  带着这个问题来进入我们的正题,什么是并发?

  1. 并发是一种现象:同时运行多个程序或者多个任务需要被处理的现象。
  2. 这些任务可能是并行的,也可能是串行的,和CPU的核心数无关,是操作系统进程调度和CPU上下文切换达到的结果。
  3. 解决并发的思路就是把分解,把一个大任务分解成多个小任务来执行

  像我们上面所说的需求就是分解成一个一个的网络请求,一部分并行请求,一部分串行请求。并行请求简单来说,对应我们应用里面就是多线程,多线程同时执行;串行请求对应我们应用里面就是单线程,一个线程执行完了,另一个线程才开始。这里不考虑多进程的问题。

并发为什么会造成线程不安全的问题

  我们先来聊聊cpu是怎么执行指令:

  1. 首先,cpu执行指令的过程中,不可避免会执行读写操作,而这个操作都是从主存(也就是物理内存)中去读写
  2. 但是,cpu执行指令速度很快,程序运行过程中的临时变量都是放在主存当中的,如果全部都是从主存中去读写,读写很耗时,这样就浪费了cpu的性能
  3. 最后,为了解决这个问题,就出现了高速缓存的概念。我们先把变量读取到告诉缓存中,然后,再高速缓存中操作完之后,再刷新到主存当中。

  java的内存模型规定,所有的变量都在主存当中,类似于物理内存,每个线程都有自己的工作空间,也就是对应上面的高速缓存,每一个线程都有一个自己的高速缓存。

  线程对变量的操作必须在自己的工作空间内,不能直接操作主存,而且,一个线程也不能访问另一个线程的工作空间。

  那么,我们如果多个线程同时对一个变量做加1操作,如下面的add1方法。我们thread1和thread2把a的值同时复制到自己的工作空间中时,都是0,然后同时进行加1操作,同时刷新到主存当中,那最后,我们获取到的最终的值就是1,而不是我们想要的2.这就是并发造成的线程不安全的问题

简单的同步器

  我们项目里面遇到的并发问题,基本上就是多线程访问同一变量的问题,比方说,简单的举个栗子,两个线程对同一个int值做加1操作,然后打印出来。

    public int a = 0;
    @org.junit.Test
    public void add1() {
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                a++;
                System.out.println("thread1:a=" + a);
            }
        });

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                a++;
                System.out.println("thread2:a=" + a);
            }
        });
        thread1.start();
        thread2.start();
    }

  因为这里直接用的基本类型,极端情况下,线程1,线程2同时执行,里面的a++操作也是同时执行,那这里两个打印都是1,虽然,这里我没有复现出来。但是,这种情况肯定是存在的。那么,怎么避免这种情况呢?

  我们可以写一个简单的线程同步器,就是加锁操作,如下:

    private Object lock = new Object();
    public int a = 0;
    @org.junit.Test
    public void add1(){
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                //先获取到lock 对象的锁
                synchronized (lock){
                    try {
                     //在lock对象上执行wait()方法,让其进入休眠,等待有人唤醒自己
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    a++;
                    System.out.println("thread1:a=" + a);
                }
            }
        });

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
            //获取lock的锁
                synchronized (lock){
                    a++;
                    System.out.println("thread2:a=" + a);
                    //唤醒正在lock对象上等待的线程
                    lock.notify();
                }
            }
        });
        thread1.start();
        thread2.start();
    }

  这里,我们用到的object类的wait和notify方法。等待和唤醒。当线程1执行到lock.wait();方法时,线程1会进入等待状态。当线程2执行lock.notify();时,会唤醒线程1,执行线程1的后续a++,打印操作。

  那么,这里会有一个问题,如果线程2先执行,线程1后执行,那么线程1将永远的等待下去,这也是这样写的一个弊端。这还只是2个线程,实际项目中往往比这个复杂多了。为了解决这弊端,就引出了我们的线程同步器AQS(AbstractQueuedSynchronizer)

  并且,这个同步器,java.util包下面都已经给好了实现类,比方说:Semaphore,ReentrantLock,CountDownLatch等等都是,并且,我们用到的java线程池ThreadPoolExecutor中的Worker的实现也是。如下图:

worker图片.png

线程同步器AQS(AbstractQueuedSynchronizer)

  线程同步器就是为了解决并发引起的线程不安全的问题。线程安全的三大特性:原子性,可见性,有序性。

原理

  简单点来说就是,它维护一个状态state,还有一个CLH队列。

  CLH时一个双端队列,队列中每一个节点都放着正在等待获取资源的线程。当线程现在通过CAS原子算法比较预期值的方式去获取资源,也就是判断这里的state状态,是不是有等待获取资源的线程可以使用,如果时有,那就直接使用,如果没有,那就会将这个线程封装成一个节点Node,插入到CLH队列的尾部等待被唤醒。其他线程执行完之后,调用release释放一部分资源,那么,正在等待的队列就会被唤醒,去执行自己的任务。大致是这个意思,当然,AQS还有中断等其他的操作

简单的AQS同步器

  还是类似于上面那个例子,不过,都是访问同一个变量,业务背景换一下,换成一个库存秒杀,通过访问服务器,一共10件物品,我现在有20个线程去同时请求,哪些能抢到,哪些不能抢到?

public int count = 10;

    @org.junit.Test
    public void test3() {
        for (int i = 1; i <= 20; i++) {
            final int finalI = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    buy(finalI);
                }
            }).start();
        }
    }


    public void buy(int i) {
        if (count != 0) {
            count--;
            System.out.println("第" + i + "个用户抢到了," + "库存还剩:" + count + "件");
        } else {
            System.out.println("第" + i + "个用户抢到了," + "已经被抢光了");
        }
    }

  就像上面这样模拟一个简单的秒杀场景,库存10件,20个用户抢,实际情况肯定不止。我们看一下这样写的打印,如下图:

秒杀图1.png

  我们看到这个最终虽然有10个用户的确抢到了,但是,我们看一下打印,打印是从8个开始的,不是从9开始的。我们理想的应该是如下这个图:

秒杀图2.png

  这个图是我把buy方法加上了synchronized关键字,我给它锁住了。当执行buy方法的时候,另一个线程如果也进来执行buy,它会等待,等待前一个buy方法执行完,它才开始执行。

  那么,直接用这个关键字就好了呀,还要啥自行车?天真,存在即合理。synchronized关键字锁比较重,不适合这种秒杀场景。

  言归正传,上面两个从8开始,结合我们前面说的并发的问题,是不是就可以联想到,如果,多个线程同时,同一时刻访问,然后,数据库的库存同时减1,是不是就会出现一件商品,卖给多个人的情况?

  有同学就会想,那么这么巧,同时访问,同一时刻,那你想想天猫双十一,那些节假日的秒杀场景,会出现什么问题?

  这个时候线程同步器就出现了,我们不能对用户做限制,我们不能说张三你必须在某一个时间内访问,李四在某一个时间内访问,所以,我们能控制的只有服务器,也就是这里的buy方法。

  也就是说,我们这里用的是悲观锁的方式,进入buy方法就立刻加锁,运行完buy方法就解锁。后面应用里面再聊这个悲观锁,乐观锁之类的。我们就把buy方法改成如下这样了:

    MyOwnLock ownLock = new MyOwnLock();

    public void buy(int i) {
        //加锁
        ownLock.lock();
        if (count != 0) {
            count--;
            System.out.println("第" + i + "个用户抢到了," + "库存还剩:" + count + "件");
        } else {
            System.out.println("第" + i + "个用户抢到了," + "已经被抢光了");
        }
        //解锁
        ownLock.unLock();
    }
    
/**
 * @ClassName: MyOwnLock
 * @Description:
 * @Author: 海晨忆
 * @Date: 2021/7/15 11:07
 */
public class MyOwnLock {
    public void lock() {

    }

    public void unLock() {

    }
}

  如上代码,现在,我们要做的就是完善MyOwnLock类的加锁和解锁方法。

  我们想想这个流程,多个用户同时下单,实际上对于代码来讲,就是多个线程同时请求服务器,调用这里的buy方法,去减少库存,我们需要做的就是保证这里减少库存不能出问题。

  怎么保证这个减少库存不能出问题呢?就是我们前面说的并发的问题,也就是这里的线程安全的问题。线程安全的三大特性:原子性,可见性,有序性。我们保证这三个特性就好了。

  第一个线程进来,我们就标记一下,已经有线程进来在执行了,我们就改变这个标记,后面的线程感知到这个改变之后,就必须等待。那这个标记,怎么让其他线程感知到呢?

  volatile关键字的两大特性:可见性,有序性

  可见性,就是可以让其他线程感知到。那么,就解决了这个问题,我们用计数器的方式来做这个标记。我们是悲观锁的方式,始终只能有一个线程访问,必须等这个线程访问完了,其他线程才能访问。

  对应成代码就是,这个计数器变量初始化是0,加锁成功之后,就加1,后面的线程进来的时候,判断这个计数器是不是0,如果不是0,就表示有线程正在访问,不能进行加锁操作;如果是0,就表示没有,可以进行加锁操作。那么我们就开始写代码:

package com.example.myapplication;

/**
 * @ClassName: MyOwnLock
 * @Description:
 * @Author: 海晨忆
 * @Date: 2021/7/15 11:07
 */
public class MyOwnLock {
    private volatile int state;
    private Thread currentHolder;

    public void lock() {
        Thread currentThread = Thread.currentThread();
        int state = getState();
        if (state == 0) {
            setCurrentHolder(currentThread);
        }
    }

    public void unLock() {

    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

    public Thread getCurrentHolder() {
        return currentHolder;
    }

    public void setCurrentHolder(Thread currentHolder) {
        this.currentHolder = currentHolder;
    }
}

  直接用if(state==0)来判断就可以了吗?volatile关键字只能保证可见性,有序性,并不能保证原子性,所以,volatile并不是真正的线程安全,只是大多数情况下还是比较有用的,而,我们这里要保证线程安全,就需要保证原子性,原子性怎么保证呢?

  这里就出现了关键的CAS算法了,Compare And Swap比较互换。这个算法java里面是怎么实现的呢?我们java里面有一个Unsafe类,他的里面全是native方法,提供的都是硬件级别的原子操作。我们用到的就是这个类里面的几个方法,CAS的操作也是这些方法实现的compareAndSwapXXX。具体的可以在网上搜一下这个类,然后仔细的看一下,这里,我简单的介绍一个:

  /***
   * Compares the value of the object field at the specified offset
   * in the supplied object with the given expected value, and updates
   * it if they match.  The operation of this method should be atomic,
   * thus providing an uninterruptible way of updating an object field.
   * 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
   * 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
   * 
   * @param obj the object containing the field to modify.
   *    包含要修改field的对象 
   * @param offset the offset of the object field within <code>obj</code>.
   *         <code>obj</code>中object型field的偏移量
   * @param expect the expected value of the field.
   *               希望field中存在的值
   * @param update the new value of the field if it equals <code>expect</code>.
   *               如果期望值expect与field的当前值相同,设置filed的值为这个新值
   * @return true if the field was changed.
   *              如果field的值被更改
   */
  public native boolean compareAndSwapObject(Object obj, long offset,
                                             Object expect, Object update);

  所以,加锁流程就变成了如下图的样子,

加锁流程图.png

  根据这个流程,代码就变成了如下的样子:

package com.example.myapplication;

/**
 * @ClassName: MyOwnLock
 * @Description:
 * @Author: 海晨忆
 * @Date: 2021/7/15 11:07
 */
public class MyOwnLock {
    private volatile int state;
    private Thread currentHolder;

    public void lock() {
        if (acquire()){
            return;
        }
        
    }

    public boolean acquire(){
        Thread currentThread = Thread.currentThread();
        int state = getState();
        if (state == 0) {
            if (compareAndSwapState(0,1)) {
                setCurrentHolder(currentThread);
            }
            return true;
        }
        return false;
    }

    public void unLock() {
    }

    public int getState() {
        return state;
    }

    public void setState(int state) {
        this.state = state;
    }

    public Thread getCurrentHolder() {
        return currentHolder;
    }

    public void setCurrentHolder(Thread currentHolder) {
        this.currentHolder = currentHolder;
    }

}

  加锁流程就完了吗?这才哪到哪,我们现在加锁就第一个拿到锁的线程开始用了,那后面没有拿到锁的线程怎么办呢?难道全部丢掉不管吗?

  当然不行。那怎么办呢?我们先用一个队列,把没有拿到锁的线程存起来,排好队,等第一个线程执行完了之后,释放锁的时候,再直接唤醒等待的线程即可;

  把阻塞的线程全放进队列(并且要线程安全,高并发的情况下迅速 入队,出队)中,当T1释放锁是,直接唤醒T2。队列选择用ConcurrentLinkedQueue<Thread>(基于CAS算法,保证入队,出队安全)

  那么,要怎么做呢?要怎么等待呢?最简单的就是写一个死循环,一直循环去判断锁有没有用完。但是,这样会一直占用CPU,消耗性能。

  1. 用sleep?Thread.sleep可能造成等待时间过长,你没法知道休眠的时间,如果,线程1执行完只需要200毫秒,你这里休眠了1000毫秒,那就浪费了800毫秒了。
  2. 那用Thread.yield()可以了吧?答案是不行,这个线程让步,虽然把cpu的时间片让出去给其他线程用了,但是,最后,我们需要唤醒的时候,怎么办呢?

  我们最后唤醒,是要指定唤醒哪一个线程,这里我们还是用到刚才Unsafe里面的两个方法,park和unPark。park阻塞线程,让出cpu的使用权,unPark解除阻塞,唤醒某一个线程。

  这里我们用它的包装类LockSupport。所以,加锁代码就变成这样子了:

    private ConcurrentLinkedQueue<Thread> waiterQueue = new ConcurrentLinkedQueue<>();

    public void lock() {
        if (acquire()) {
            return;
        }
        Thread currentThread = Thread.currentThread();
        waiterQueue.add(currentThread);
        for (; ; ) {
            if (currentThread != waiterQueue.peek() && acquire()) {
                waiterQueue.poll();
                return;
            }
            LockSupport.park(currentThread);
        }
    }

  获取锁的方法也要做相应的调整,如下:

    public boolean acquire() {
        Thread currentThread = Thread.currentThread();
        int state = getState();
        if (state == 0) {
            boolean temp = waiterQueue.size() == 0 || currentThread == waiterQueue.peek();
            if (temp && compareAndSwapState(0, 1)) {
                setCurrentHolder(currentThread);
            }
            return true;
        }
        return false;
    }

  解锁就比较简单了

    public void unLock() {
        if (Thread.currentThread() != currentHolder) {
            throw new RuntimeException("出错啦");
        }
        int state = getState();
        if (compareAndSwapState(state, 0)) {
            setCurrentHolder(null);
            Thread firstThread = waiterQueue.peek();
            if (firstThread != null) {
                LockSupport.unpark(firstThread);
            }
        }
    }

  判断当前线程是不是持有锁的线程,如果不是,那就抛异常
如果是,那就获取状态state,通过cas算法,置换为0的状态。

  最后运行完,结果如下:

最终运行结果图.png

总结:上面我说的简单的线程同步器,只是AQS的一个主要的思想,CAS算法,计数器,线程队列。还有线程中断,独占模式,共享模式,公平锁,不公平锁等等。看完我这个,然后再去看源码应该会更容易懂。

上一篇下一篇

猜你喜欢

热点阅读