Concurrent
JDK1.5前,JAVA使用synchronized实现对共享变量的同步,synchronized在字节码上加指令,实现依赖于底层JVM
JDK1.5之后,JAVA使用原生的JAVA代码实现了synchronized的语义,不使用机器指令,也不依赖于JDK编译的特殊处理;而是使用volatile保证共享资源的可见性,使用CAS保证更新共享资源的原子性,使用AQS模型构造线程等待队列,并实现线程入队、出队、阻塞、唤醒等操作
并发控制的核心是锁的获取和释放,concurrent包基于volatile、CAS和AQS模型
- volatile,保证在多处理器情况下,共享变量的可见性。当1个线程修改共享变量时,其它线程会重新从内存中读取共享变量修改后的新值,使所有线程读到的变量值是一致的
volatile有1个缺点,当变量的值依赖于旧值时,volatile不能保证复合操作的线程安全,例如i++操作,可以拆分为取i的值,将i值+1,将新值赋给i,volatile不能保证这3步组成的复合操作能够以原子的形式执行,但是CAS能够保证
- CAS保证更新操作的原子性,
CAS,Compare And Swap,涉及到3个值,内存值,预期值和新值。当且仅当内存值和预期值相等时,才将内存值修改为新值;如果内存值和预期值不相等,说明在此期间,有其它线程修改了内存值,那么丢弃本次操作,重新比较内存值和预期值,直到相等,将内存值修改为新值
自旋CAS可能出现3种问题:
(1)若长时间不成功,即内存值总是和预期值不相等,会消耗CPU资源
(2)只能保证1个共享资源的复合操作是原子性的
(3)可能出现ABA问题
CAS比较的是内存值和预期值,如果在线程A执行期间,其它线程先将内存值改为新值,再将新值改为旧值,那么线程A检测到内存值没有发生变化,但实际上内存值发生了变化
针对这个问题,可以采用添加version的方式解决,变量每次更新,它的版本号+1,那么ABA就变成了1A2B3A,再进行检测,CAS就会发现内存值和预期值不相等,丢弃本次操作,重试直到内存值和预期值相等
atomic包下的AtomicStampedReference类的compareAndSet(),提供了解决ABA问题的逻辑,不但会检测变量的值,还会检测变量的版本
AQS维护了1个
(1)volatile类型的state变量,用来代表共享资源,得到state
(2)FIFO线程等待队列,当多线程争抢资源,被阻塞时,会进入此队列等
AQS定义了1套多线程访问共享变量的同步器框架,是整个concurrent包的基础。Lock、CountDownLatch、CyclicBarrier、信号量Semaphore以及ThreadPoolExecutor都基于AQS实现。像Lock、CountDownLatch这种自定义同步器,只需要实现对共享资源state的获取和释放方式,至于对线程等待队列FIFO的维护(例如获取资源失败入队/唤醒出队/Condition类的await()和signal()),AQS已经实现好了(ReentrantLock的lock()底层调用tryAcquire()尝试获取共享资源,成功直接返回,失败会将现场加入到等待队列的尾部,加入到等待队列的线程会自旋,直到获取到锁)
一般来说,自定义同步器要么独占,要么共享,独占需要实现AQS的tryAcquire()和tryRelease(),共享需要实现tryAcquireShared()和tryReleaseShared()
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()会执行tryAcquire()独占锁,state+1;其它线程再次tryAcquire()时会失败;持有锁的线程可以再次重复获得锁,state+1,这就是可重入;持有锁的线程unlock()会执行tryRelease(),state-1;加了多少次锁,就需要释放多少次锁,当state为0时,其它线程才可以使用tryAcquire()获得锁
以CountDownLatch为例,任务以n个线程去执行,state初始化为n,n个线程并行执行,当线程执行完,state-1,所有线程执行完,state值为0。当state为0时,会唤醒主调用线程,主线程从await()方法返回,继续后续动作
乐观锁和悲观锁
悲观锁,总是假设最坏的情况,每次都认为其它线程会修改共享资源,所以每次都加锁,当线程访问被加锁的共享资源时,会阻塞排队等待释放锁。synchronized就是一种悲观锁
乐观锁,总是认为不会发生并发问题,因此不会加锁,但是在更新数据时会进行判定,如果其它线程没有修改共享变量,才执行更新操作,否则一直重试,直到操作成功。CAS自旋就是一种乐观锁
Lock和synchronized
- synchronized是悲观锁,Lock是乐观锁
- Lock是接口,synchronized是关键字,在字节码上加指令,由JVM实现
- 异常时,synchronized自动释放锁,不会导致死锁;Lock需要手动释放,一般在finally中释放锁
- Lock可以让等待锁的线程中断 lockInterruptibly(),synchronized会一直等待,不能中断
- Lock可以使用tryLock()尝试获取锁,synchronized不可以
- synchronized使用wait()、notify()和notifyAll()实现线程间通信,Lock使用Condition的await()、signal()和signalAll()
Condition的await()、signal()和signalAll()除了提供和Object的wait()、notify()和notifyAll()相同的功能外,还能对锁做更精确的控制,对于1个锁,可以创建多个Condition,在不同的情况下使用不同的Condition。例如多线程读写同1个缓冲区,写线程写入数据后,唤醒读线程;读线程读取数据后,唤醒写线程;缓冲区为空,读线程阻塞等待;缓冲区满了,写线程阻塞等待。在使用Object的notify()/notifyAll()唤醒线程时,无法指定唤醒哪个线程,但是使用Condition时,可以指定
Future
CountDownLatch
可以实现类似计数器的功能
典型用法: 主线程需要等待其他多个线程执行完后,才开始执行
(1)主线程创建CountDownLatch对象,构造函数中设置需要等待的线程数
(2)其它线程执行自己的业务逻辑,执行完毕后,使用CountDownLatch对象的countDown()将state的值-1
(3)主线程执行CountDownLatch对象的await(),等待其他线程执行完毕,也就是等待state的值变为0,当state值为0时,说明其它线程执行完毕,主线程开始执行自己的业务
CyclicBarrier
回环栅栏,让一组正在执行线程,达到某个barrier状态时等待,直到所有线程都达到barrier状态,再一起执行。当所有等待线程被释放之后,可以被重用
(1)创建CyclicBarrier对象,构造函数中设置parties,指定让多少个线程在barrier状态等待
(2)当正在执行的线程,达到barrier状态时,会调用CyclicBarrier对象的await(),使线程处于等待状态
(3)当所有的线程都达到barrier状态时,线程再一起并发执行
另外在创建CyclicBarrier对象时,还可以在构造函数中传入1个Runnable对象,当所有的线程达到barrier状态后,先从所有线程中选择1个线程执行Runnable对象,再同时执行所有线程的后续业务逻辑
Semaphore
信号量,控制同时运行的线程个数
(1)创建Semaphore对象,设置permit个数,permit代表同时运行的线程个数
(2)线程执行时,会使用acquire()尝试获取permit,得到permit的线程才能执行,得不到的线程阻塞,等待其他线程执行完毕,释放permit
典型用法: 资源个数小于线程个数,那么一定会有线程处于不工作的状态,创建Semaphore时设置的permit就是资源的个数,线程需要竞争资源,获取到permit的线程能够执行,获取不到,阻塞等待其他线程释放资源
ThreadLocal
线程本地变量,之所以会发生线程安全问题,是因为多个线程同时对共享资源进行操作;synchronized和concurrent使用悲观锁和乐观锁的方式,在多个线程访问共享资源时,做同步操作;和同步操作不同,ThreadLocal在每个线程本地保存一份共享资源的副本,对共享资源操作时,操作的是副本,从而从一定程度上避免了线程安全问题
[MyBatis用ThreadLocal了吗]
使用JDBC时,DB连接管理类,用来获取和关闭Connection对象;单线程下,没有问题,多线程下,会存在线程安全问题
如果未同步获取和关闭Connection对象的方法,多个线程获取时,会创建多个Connection对象;关闭时,如果1个线程正在使用Conncetion进行DB操作,而另一个线程关闭了Connection,会出错
出于线程安全考虑,必须对Conncetion进行同步,当1个线程执行时,其它线程等待,但这又影响了DB操作的效率
像Spring,对于DB连接就使用了单例+ThreadLocal的方式,使用单例保证Conncetion对象只有1个,使用ThreadLocal在每个线程本地都保存了Connection对象的副本,操作数据库时使用的是本地副本,使各个线程不会互相影响
[ThreadLocal如何使用]
ThreadPoolExecutor
-
好处
(1)如果并发线程数量较多,且执行时间都很多,会频繁的创建和销毁线程。创建和销毁线程需要时间,会销毁系统资源,导致性能下降
(2)线程池可以复用线程,当线程执行完任务后,不会被销毁,可以继续用来执行其他任务 -
构造函数
ThreadPoolExecutor有4个构造函数,创建ThreadPoolExecutor对象时,需要指定一些参数,包括:
(1)核心池大小corePoolSize
(a)创建完线程池后,线程池是没有线程的,当有任务到来时,才会创建线程用来执行任务;除非是有prestartCoreThread()或prestartAllCoreThreads()创建1个或参数corePoolSIze个线程
(b)当线程池中的线程达到corePoolSize大小时,再到来的任务会被放入等待队列
(c)换句话说,corePoolSize是线程池允许同时运行的最大线程数
(2)线程池最大线程个数maximumPoolSize
线程池的最大线程数,表示最多能创建多少个线程,>=corePoolSize
(3)线程空闲时间keepAliveTime
线程没有执行任务时的存活时间
默认情况下,只有线程池中的线程个数>corePoolSize,会回收空闲时间超过keepAliveTime的线程,直到线程池中的线程个数=corePoolSize
如果设置allowCoreThreadTimeOut,即运行核心池线程超时,只要线程的空闲时间超过keepAliveTime,就会回收,直到线程个数为0
(4)时间单位unit
keepAliveTime的时间单位,包括天、小时、分、秒、ms、微妙、纳秒
(5)阻塞队列workQueue
当线程池中的线程个数=corePoolSize时,再来的任务会被放入等待队列。等待队列分为ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue
ArrayBlockingQueue,是基于数组的FIFO,创建时需指定数组大小
LinkedBlockingQueue,是基于链表的FIFO,若未指定大小,默认大小为Integer.MAX_VALUE
SynchronousQueue,不保存提交的任务
一般使用LinkedBlockingQueue
(6)线程工厂ThreadFactory
用来创建线程
(7)拒绝处理策略handler
[涉及到任务处理策略]
(a)当线程池中线程数量<corePoolSize时,每来1个任务,创建1个线程,用来执行任务
(b)当线程池中线程数量>=corePoolSize时,会把到来的任务放入的等待队列。如果放入成功,任务会等待被空闲的线程执行;如果放入失败,一般是等待队列满了,会尝试创建新的线程来执行任务;如果线程池中线程数量达到了最大线程数量maximumPoolSize,创建线程失败,此时采用拒绝策略
(c)拒绝策略分为4种,默认使用AbortPolicy
AbortPolicy,丢弃任务,抛出RejectedExecutionException
DiscardPolicy,只丢弃任务,不抛出异常
DiscardOldestPolicy,丢弃队列中最前面的任务,然后重试,不抛出异常
CallerRunsPolicy,在调用者线程执行当前任务,不抛出异常
3.线程池状态
线程池有5种状态,RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED
(1)创建完线程池,线程池处于RUNNING状态,可以接收新任务,对已有任务进行处理
(2)执行shutdown(),处于RUNNING状态的线程池会切换到SHUTDOWN状态。SHUTDOWN状态的线程池不接收新任务,但会执行任务队列中的任务
(3)执行shutdownNow(),处于RUNNING状态的线程池会切换到STOP状态。STOP状态的线程池不接收新任务,不执行任务队列中的任务,同时中断正在执行的任务
(4)处于STOP和SHUTDOWN状态的线程池,当线程池中的所有任务被中止,工作线程workCount为0,线程池切换到TIDYING状态。处于TIDYING状态的线程池会执行钩子方法terminated()
(5)线程池彻底终止,状态会变为TERMINATED
4.Executors
并不建议直接使用ThreadPoolExecutor,而是使用Executors提供的几个静态方法
(1)newCachedThreadPool创建数量可变的线程池
(2)newFixedThreadPool创建固定数量线程的线程池
(3)newSingleThreadExecutor创建1个线程的线程池
这几个方法,在底层调用ThreadPoolExecutor,只不过参数已经设置好了
newFixedThreadPool创建的线程池,corePoolSize和maximumPoolSize相等,使用LinkedBlockingQueue作为等待队列
newSingleThreadExecutor创建的线程池,corePoolSize和maximumPoolSize都为1,使用LinkedBlockingQueue作为等待队列
newCachedThreadPool创建的线程池,corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,使用SynchronousQueue,该等待队列不保存任务,只要有任务到来就创建线程,执行任务,当线程空闲时间>60s,就销毁线程
阻塞队列
阻塞队列一般用来实现"生产者-消费者"模式,阻塞队列与一般队列的区别:
(1)当队列为空时,从队列中获取元素,会阻塞,直到队列中有元素再将1st元素取出
(2)当队列满了的时候,向队列中插入元素,会阻塞,直到队列有元素出列,再将元素插入
当然非阻塞队列也可以实现"生产者-消费者"模式,不过要在代码中使用做同步操作。(1)使用synchronized为队列做同步控制,当队列为空时,Consumer线程wait(),等待Producer线程插入元素,并执行notify()唤醒Consumer线程;当队列满时,Producer线程wait(),等待Consumer线程移除元素,并执行notify()唤醒Producer线程(2)使用Lock以及Condition类的await()和signal()
相比于非阻塞队列,阻塞队列的底层已经实现好了同步的逻辑,不需要程序员收到的做同步控制,更加方便
[以ArrayBlockingQueue为例]
ArrayBlockingQueue包括存放元素的数组、队首元素和队尾元素下标takeIndex和putIndex以及当前数组中元素个数count,还包括1个可重入锁ReentrantLock及2个等待条件notEmpty和notFull,默认是不公平的
向队列中插入元素和取出元素使用put()和take(),这两个方法都是基于ArrayBlockingQueue中的可重入锁ReentrantLock和2个Condition等待条件实现的,因此使用阻塞队列时,不需要考虑线程同步问题
put(),先获取锁,然后判断当前队列元素个数是否等于队列长度,等于说明已满,调用notFull.await()等待,当被其它线程唤醒时,会执行插入
take(),和put()类似,也是先获取锁,然后判断当前队列元素个数是否为0,为0说明队列为空,调用notEmpty.await()等待,被唤醒时,执行获取
(1)ArrayBlockingQueue
基于数组实现的FIFO,有界阻塞队列,需要指定数组大小,默认不保证公平性,即不保证等待时间最长的线程最先访问队列
(2)LinkedBlockingQueue
基于链表实现的FIFO,有界阻塞队列,若不指定大小,默认长度为Integer.MAX_VALUE
(3)PriorityBlockingQueue
非FIFO,按照元素的优先级排序,按照优先级顺序出队,每次出队的元素是优先级最高的元素。无界阻塞队列,容量没有上限,因此生产者向队列中插入元素永远不会阻塞,只有当队列为空时,消费者才会阻塞
(4)SynchronousQueue
主要方法
(1)插入元素,可使用add,offer,put
add 成功true,失败异常
offer 成功true,失败false
put 成功true,失败阻塞直到插入成功
(2)删除元素,可使用remove,poll,take
remove,成功true,失败异常
poll,成功返回元素,失败null
take,成功返回元素,失败阻塞直到删除成功
COW
Copy On Write,写时复制。写入数据时,复制1个副本,在副本上进行写操作;写的同时不影响并发的读操作;写入完成后,将新数据复制给变量
每次写时,都使用ReentrantLock使并发的写操作互斥,避免数据多次修改。实现读写分离,读写并发环境下,因为写操作后,需要将新的对象赋值给变量,所以读的对象和写的对象可能不一样,只能保证最终一致,不能保证实时一致
concurrent包下的COW包括CopyOnWriteArrayList和CopyOnWriteArraySet