JAVA并发——线程同步器AQS
个人博客:haichenyi.com。感谢关注
前言
每一项技术的提出都是为了解决某一个问题,带着问题来理解技术,使得印象你对这个技术的理解印象更加深刻。
并发
举个栗子:现在有一个需求,网络请求分两批(A,B两个批次),A批次并行请求,B批次串行请求按顺序一个一个请求,有一个总超时时间,B批次每一次请求都有一个超时时间,A批次并发请求先请求,在规定时间内没有返回,再开始请求B批次,谁先返回用谁的。
这个是我们项目里面简化过后的一个逻辑,实际逻辑,比这个还要复杂。怎么实现这个功能呢?
带着这个问题来进入我们的正题,什么是并发?
- 并发是一种现象:同时运行多个程序或者多个任务需要被处理的现象。
- 这些任务可能是并行的,也可能是串行的,和CPU的核心数无关,是操作系统进程调度和CPU上下文切换达到的结果。
- 解决并发的思路就是把分解,把一个大任务分解成多个小任务来执行
像我们上面所说的需求就是分解成一个一个的网络请求,一部分并行请求,一部分串行请求。并行请求简单来说,对应我们应用里面就是多线程,多线程同时执行;串行请求对应我们应用里面就是单线程,一个线程执行完了,另一个线程才开始。这里不考虑多进程的问题。
并发为什么会造成线程不安全的问题
我们先来聊聊cpu是怎么执行指令:
- 首先,cpu执行指令的过程中,不可避免会执行读写操作,而这个操作都是从主存(也就是物理内存)中去读写
- 但是,cpu执行指令速度很快,程序运行过程中的临时变量都是放在主存当中的,如果全部都是从主存中去读写,读写很耗时,这样就浪费了cpu的性能
- 最后,为了解决这个问题,就出现了高速缓存的概念。我们先把变量读取到告诉缓存中,然后,再高速缓存中操作完之后,再刷新到主存当中。
java的内存模型规定,所有的变量都在主存当中,类似于物理内存,每个线程都有自己的工作空间,也就是对应上面的高速缓存,每一个线程都有一个自己的高速缓存。
线程对变量的操作必须在自己的工作空间内,不能直接操作主存,而且,一个线程也不能访问另一个线程的工作空间。
那么,我们如果多个线程同时对一个变量做加1操作,如下面的add1方法。我们thread1和thread2把a的值同时复制到自己的工作空间中时,都是0,然后同时进行加1操作,同时刷新到主存当中,那最后,我们获取到的最终的值就是1,而不是我们想要的2.这就是并发造成的线程不安全的问题
简单的同步器
我们项目里面遇到的并发问题,基本上就是多线程访问同一变量的问题,比方说,简单的举个栗子,两个线程对同一个int值做加1操作,然后打印出来。
public int a = 0;
@org.junit.Test
public void add1() {
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
a++;
System.out.println("thread1:a=" + a);
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
a++;
System.out.println("thread2:a=" + a);
}
});
thread1.start();
thread2.start();
}
因为这里直接用的基本类型,极端情况下,线程1,线程2同时执行,里面的a++操作也是同时执行,那这里两个打印都是1,虽然,这里我没有复现出来。但是,这种情况肯定是存在的。那么,怎么避免这种情况呢?
我们可以写一个简单的线程同步器,就是加锁操作,如下:
private Object lock = new Object();
public int a = 0;
@org.junit.Test
public void add1(){
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
//先获取到lock 对象的锁
synchronized (lock){
try {
//在lock对象上执行wait()方法,让其进入休眠,等待有人唤醒自己
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
a++;
System.out.println("thread1:a=" + a);
}
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
//获取lock的锁
synchronized (lock){
a++;
System.out.println("thread2:a=" + a);
//唤醒正在lock对象上等待的线程
lock.notify();
}
}
});
thread1.start();
thread2.start();
}
这里,我们用到的object类的wait和notify方法。等待和唤醒。当线程1执行到lock.wait();方法时,线程1会进入等待状态。当线程2执行lock.notify();时,会唤醒线程1,执行线程1的后续a++,打印操作。
那么,这里会有一个问题,如果线程2先执行,线程1后执行,那么线程1将永远的等待下去,这也是这样写的一个弊端。这还只是2个线程,实际项目中往往比这个复杂多了。为了解决这弊端,就引出了我们的线程同步器AQS(AbstractQueuedSynchronizer)
并且,这个同步器,java.util包下面都已经给好了实现类,比方说:Semaphore,ReentrantLock,CountDownLatch等等都是,并且,我们用到的java线程池ThreadPoolExecutor中的Worker的实现也是。如下图:
worker图片.png线程同步器AQS(AbstractQueuedSynchronizer)
线程同步器就是为了解决并发引起的线程不安全的问题。线程安全的三大特性:原子性,可见性,有序性。
原理
简单点来说就是,它维护一个状态state,还有一个CLH队列。
CLH时一个双端队列,队列中每一个节点都放着正在等待获取资源的线程。当线程现在通过CAS原子算法比较预期值的方式去获取资源,也就是判断这里的state状态,是不是有等待获取资源的线程可以使用,如果时有,那就直接使用,如果没有,那就会将这个线程封装成一个节点Node,插入到CLH队列的尾部等待被唤醒。其他线程执行完之后,调用release释放一部分资源,那么,正在等待的队列就会被唤醒,去执行自己的任务。大致是这个意思,当然,AQS还有中断等其他的操作
简单的AQS同步器
还是类似于上面那个例子,不过,都是访问同一个变量,业务背景换一下,换成一个库存秒杀,通过访问服务器,一共10件物品,我现在有20个线程去同时请求,哪些能抢到,哪些不能抢到?
public int count = 10;
@org.junit.Test
public void test3() {
for (int i = 1; i <= 20; i++) {
final int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
buy(finalI);
}
}).start();
}
}
public void buy(int i) {
if (count != 0) {
count--;
System.out.println("第" + i + "个用户抢到了," + "库存还剩:" + count + "件");
} else {
System.out.println("第" + i + "个用户抢到了," + "已经被抢光了");
}
}
就像上面这样模拟一个简单的秒杀场景,库存10件,20个用户抢,实际情况肯定不止。我们看一下这样写的打印,如下图:
秒杀图1.png我们看到这个最终虽然有10个用户的确抢到了,但是,我们看一下打印,打印是从8个开始的,不是从9开始的。我们理想的应该是如下这个图:
秒杀图2.png这个图是我把buy方法加上了synchronized关键字,我给它锁住了。当执行buy方法的时候,另一个线程如果也进来执行buy,它会等待,等待前一个buy方法执行完,它才开始执行。
那么,直接用这个关键字就好了呀,还要啥自行车?天真,存在即合理。synchronized关键字锁比较重,不适合这种秒杀场景。
言归正传,上面两个从8开始,结合我们前面说的并发的问题,是不是就可以联想到,如果,多个线程同时,同一时刻访问,然后,数据库的库存同时减1,是不是就会出现一件商品,卖给多个人的情况?
有同学就会想,那么这么巧,同时访问,同一时刻,那你想想天猫双十一,那些节假日的秒杀场景,会出现什么问题?
这个时候线程同步器就出现了,我们不能对用户做限制,我们不能说张三你必须在某一个时间内访问,李四在某一个时间内访问,所以,我们能控制的只有服务器,也就是这里的buy方法。
也就是说,我们这里用的是悲观锁的方式,进入buy方法就立刻加锁,运行完buy方法就解锁。后面应用里面再聊这个悲观锁,乐观锁之类的。我们就把buy方法改成如下这样了:
MyOwnLock ownLock = new MyOwnLock();
public void buy(int i) {
//加锁
ownLock.lock();
if (count != 0) {
count--;
System.out.println("第" + i + "个用户抢到了," + "库存还剩:" + count + "件");
} else {
System.out.println("第" + i + "个用户抢到了," + "已经被抢光了");
}
//解锁
ownLock.unLock();
}
/**
* @ClassName: MyOwnLock
* @Description:
* @Author: 海晨忆
* @Date: 2021/7/15 11:07
*/
public class MyOwnLock {
public void lock() {
}
public void unLock() {
}
}
如上代码,现在,我们要做的就是完善MyOwnLock类的加锁和解锁方法。
我们想想这个流程,多个用户同时下单,实际上对于代码来讲,就是多个线程同时请求服务器,调用这里的buy方法,去减少库存,我们需要做的就是保证这里减少库存不能出问题。
怎么保证这个减少库存不能出问题呢?就是我们前面说的并发的问题,也就是这里的线程安全的问题。线程安全的三大特性:原子性,可见性,有序性。我们保证这三个特性就好了。
第一个线程进来,我们就标记一下,已经有线程进来在执行了,我们就改变这个标记,后面的线程感知到这个改变之后,就必须等待。那这个标记,怎么让其他线程感知到呢?
volatile关键字的两大特性:可见性,有序性
可见性,就是可以让其他线程感知到。那么,就解决了这个问题,我们用计数器的方式来做这个标记。我们是悲观锁的方式,始终只能有一个线程访问,必须等这个线程访问完了,其他线程才能访问。
对应成代码就是,这个计数器变量初始化是0,加锁成功之后,就加1,后面的线程进来的时候,判断这个计数器是不是0,如果不是0,就表示有线程正在访问,不能进行加锁操作;如果是0,就表示没有,可以进行加锁操作。那么我们就开始写代码:
package com.example.myapplication;
/**
* @ClassName: MyOwnLock
* @Description:
* @Author: 海晨忆
* @Date: 2021/7/15 11:07
*/
public class MyOwnLock {
private volatile int state;
private Thread currentHolder;
public void lock() {
Thread currentThread = Thread.currentThread();
int state = getState();
if (state == 0) {
setCurrentHolder(currentThread);
}
}
public void unLock() {
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public Thread getCurrentHolder() {
return currentHolder;
}
public void setCurrentHolder(Thread currentHolder) {
this.currentHolder = currentHolder;
}
}
直接用if(state==0)来判断就可以了吗?volatile关键字只能保证可见性,有序性,并不能保证原子性,所以,volatile并不是真正的线程安全,只是大多数情况下还是比较有用的,而,我们这里要保证线程安全,就需要保证原子性,原子性怎么保证呢?
这里就出现了关键的CAS算法了,Compare And Swap比较互换。这个算法java里面是怎么实现的呢?我们java里面有一个Unsafe类,他的里面全是native方法,提供的都是硬件级别的原子操作。我们用到的就是这个类里面的几个方法,CAS的操作也是这些方法实现的compareAndSwapXXX。具体的可以在网上搜一下这个类,然后仔细的看一下,这里,我简单的介绍一个:
/***
* Compares the value of the object field at the specified offset
* in the supplied object with the given expected value, and updates
* it if they match. The operation of this method should be atomic,
* thus providing an uninterruptible way of updating an object field.
* 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
* 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
*
* @param obj the object containing the field to modify.
* 包含要修改field的对象
* @param offset the offset of the object field within <code>obj</code>.
* <code>obj</code>中object型field的偏移量
* @param expect the expected value of the field.
* 希望field中存在的值
* @param update the new value of the field if it equals <code>expect</code>.
* 如果期望值expect与field的当前值相同,设置filed的值为这个新值
* @return true if the field was changed.
* 如果field的值被更改
*/
public native boolean compareAndSwapObject(Object obj, long offset,
Object expect, Object update);
所以,加锁流程就变成了如下图的样子,
加锁流程图.png根据这个流程,代码就变成了如下的样子:
package com.example.myapplication;
/**
* @ClassName: MyOwnLock
* @Description:
* @Author: 海晨忆
* @Date: 2021/7/15 11:07
*/
public class MyOwnLock {
private volatile int state;
private Thread currentHolder;
public void lock() {
if (acquire()){
return;
}
}
public boolean acquire(){
Thread currentThread = Thread.currentThread();
int state = getState();
if (state == 0) {
if (compareAndSwapState(0,1)) {
setCurrentHolder(currentThread);
}
return true;
}
return false;
}
public void unLock() {
}
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public Thread getCurrentHolder() {
return currentHolder;
}
public void setCurrentHolder(Thread currentHolder) {
this.currentHolder = currentHolder;
}
}
加锁流程就完了吗?这才哪到哪,我们现在加锁就第一个拿到锁的线程开始用了,那后面没有拿到锁的线程怎么办呢?难道全部丢掉不管吗?
当然不行。那怎么办呢?我们先用一个队列,把没有拿到锁的线程存起来,排好队,等第一个线程执行完了之后,释放锁的时候,再直接唤醒等待的线程即可;
把阻塞的线程全放进队列(并且要线程安全,高并发的情况下迅速 入队,出队)中,当T1释放锁是,直接唤醒T2。队列选择用ConcurrentLinkedQueue<Thread>(基于CAS算法,保证入队,出队安全)
那么,要怎么做呢?要怎么等待呢?最简单的就是写一个死循环,一直循环去判断锁有没有用完。但是,这样会一直占用CPU,消耗性能。
- 用sleep?Thread.sleep可能造成等待时间过长,你没法知道休眠的时间,如果,线程1执行完只需要200毫秒,你这里休眠了1000毫秒,那就浪费了800毫秒了。
- 那用Thread.yield()可以了吧?答案是不行,这个线程让步,虽然把cpu的时间片让出去给其他线程用了,但是,最后,我们需要唤醒的时候,怎么办呢?
我们最后唤醒,是要指定唤醒哪一个线程,这里我们还是用到刚才Unsafe里面的两个方法,park和unPark。park阻塞线程,让出cpu的使用权,unPark解除阻塞,唤醒某一个线程。
这里我们用它的包装类LockSupport。所以,加锁代码就变成这样子了:
private ConcurrentLinkedQueue<Thread> waiterQueue = new ConcurrentLinkedQueue<>();
public void lock() {
if (acquire()) {
return;
}
Thread currentThread = Thread.currentThread();
waiterQueue.add(currentThread);
for (; ; ) {
if (currentThread != waiterQueue.peek() && acquire()) {
waiterQueue.poll();
return;
}
LockSupport.park(currentThread);
}
}
获取锁的方法也要做相应的调整,如下:
public boolean acquire() {
Thread currentThread = Thread.currentThread();
int state = getState();
if (state == 0) {
boolean temp = waiterQueue.size() == 0 || currentThread == waiterQueue.peek();
if (temp && compareAndSwapState(0, 1)) {
setCurrentHolder(currentThread);
}
return true;
}
return false;
}
解锁就比较简单了
public void unLock() {
if (Thread.currentThread() != currentHolder) {
throw new RuntimeException("出错啦");
}
int state = getState();
if (compareAndSwapState(state, 0)) {
setCurrentHolder(null);
Thread firstThread = waiterQueue.peek();
if (firstThread != null) {
LockSupport.unpark(firstThread);
}
}
}
判断当前线程是不是持有锁的线程,如果不是,那就抛异常
如果是,那就获取状态state,通过cas算法,置换为0的状态。
最后运行完,结果如下:
最终运行结果图.png总结:上面我说的简单的线程同步器,只是AQS的一个主要的思想,CAS算法,计数器,线程队列。还有线程中断,独占模式,共享模式,公平锁,不公平锁等等。看完我这个,然后再去看源码应该会更容易懂。