Java并发(2)
1.如何暂停或恢复线程
在JDK中提供了以下两个方法(类Thread)用来暂停线程和恢复线程。
Øsuspend方法:暂停线程
Øresume方法:恢复线程
Østop方法:终止线程
这两个方法和stop方法一样是被废弃的方法,其用法和stop一样,暴力地暂停线程和恢复线程。这两个方法之所以废弃,主要有以下两个原因:
(1)线程持有锁定的公共资源的情况下,一旦被暂停,线程不会释放已经占有的资源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。
(2)线程强制暂停,导致该线程执行的操作没有执行完全,这时访问该线程的数据会出现数据不一致。
(3)强制结束线程,线程应该做的清理工作无法完成。
2.synchronized和Lock区别(ReentrantLock,可重入锁)
synchronized和ReentrantLock都可以用于线程同步,synchronized中使用的IntrinsicLock和ReentrantLock都是可重入的。它们之间的区别如下:
1)synchronized是Java语言的一个特性,得到虚拟机的直接支持,Lock是concurrent包下的类。
2)synchronized在进入同步方法或同步代码块时会自动获取锁,在返回同步方法或者退出同步代码块时会自动释放锁,但是ReentrantLock必须显式地获取锁,并且一定要在finally中显式释放锁,如果忘了显式释放,获取的锁无法被其他线程获取,有可能造成死锁。
3)ReentrantLock提供了更大的灵活性。
A)可以通过tryLock实现轮询或定时获取锁,可用于避免死锁的发生
线程在等待内置锁而阻塞时无法响应中断(因为线程认为它肯定可以获取锁,所以不会响应中断请求)。ReentrantLock的lockInterruptibly方法能够在获取锁的过程中保持对中断的响应,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放。
B)synchronized方法和synchronized块都是基于块结构的加锁,ReentrantLock可用于非块结构加锁(例如ConcurrentHashMap中的分段锁)
C)ReentrantLock可以实现公平锁。synchronized使用的内置锁和ReentrantLock默认都是非公平的,ReentrantLock在构造时可选择公平锁。
公平锁是指如果有其它线程持有锁,或者阻塞队列中有其它线程,新请求锁的线程会被放到阻塞队列的末尾,按照先来先到的顺序;非公平锁是指只有锁被其它线程持有时,才会被放入阻塞队列,如果没有其它线程持有,所有线程都可以获取锁。
ReadWriteLock(读写锁)描述的是:一个资源能够被多个读线程访问,或者被一个写线程访问,但是不能同时存在读写线程。也就是说读写锁使用的场合是一个共享资源被大量读取操作,而只有少量的写操作(修改数据)。
3.Condition条件对象
条件对象是线程同步对象中的一种,主要用来等待某种条件的发生,条件发生后,可以唤醒等待在该条件上的一个线程或所有线程。条件对象要与锁一起协同工作。
条件变量调用Lock.newCondition()获得一个实例:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ReentrantLock里有个函数newCondition(),该函数得到一个锁上的“条件”,用于实现线程间的通信,条件变量很大程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。
Condition拥有await、signal、signalAll。await对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法因为任何类都拥有这些方法。
每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。
4.AQS
AQS,即AbstractQueuedSynchronizer,队列同步器,它是Java并发用来构建锁和其他同步组件的基础框架。AQS是一个抽象类,主要是以继承的方式使用。AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。JUC中的许多可阻塞类,例如ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch、SynchronousQueue和FutureTask等,都是基于AQS构建的。
AQS原理简介
AQS的实现依赖内部的同步队列(FIFO双向队列),如果当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,将其加入同步队列的尾部,同时阻塞当前线程,当同步状态释放时,唤醒队列的头节点。
上面说的有点抽象,来具体看下,首先来看AQS最主要的三个成员变量:
private transient volatile Nodehead;
private transient volatile Nodetail;
private volatile int state;
上面提到的同步状态就是这个int型的变量state. head和tail分别是同步队列的头结点和尾结点。假设state=0表示同步状态可用(如果用于锁,则表示锁可用),state=1表示同步状态已被占用(锁被占用)。
AbstractQueuedSynchronizer类底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列,其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。而Condition queue不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。下面举例说下获取和释放同步状态的过程:
获取同步状态
假设线程A要获取同步状态(这里想象成锁,方便理解),初始状态下state=0,所以线程A可以顺利获取锁,A获取锁后将state置为1。在A没有释放锁期间,线程B也来获取锁,此时因为state=1,表示锁被占用,所以将B的线程信息和等待状态等信息构成出一个Node节点对象,放入同步队列,head和tail分别指向队列的头部和尾部(此时队列中有一个空的Node节点作为头点,head指向这个空节点,空Node的后继节点是B对应的Node节点,tail指向它),同时阻塞线程B(这里的阻塞使用的是LockSupport.park()方法)。后续如果再有线程要获取锁,都会加入队列尾部并阻塞。
释放同步状态
当线程A释放锁时,即将state置为0,此时A会唤醒头节点的后继节点(所谓唤醒,其实是调用LockSupport.unpark(B)方法),即B线程从LockSupport.park()方法返回,此时B发现state已经为0,所以B线程可以顺利获取锁,B获取锁后B的Node节点随之出队。
5.ReentrantLock源码解析
ReentrantLock可重入锁内部有3个类,Sync、FairSync和NonfairSync。
Sync是一个继承AQS的抽象类,并发的控制就是通过Sync实现的(当然是使用AQS实现的,AQS是Java并发包的一个同步基础类),它复写了tryRelease方法,它有2个子类FairSync和NonfairSync,也就是公平锁和非公平锁。
由于Sync复写了tryRelease方法,它的2个子类公平锁和非公平锁没有再次复写这个方法,所以公平锁和非公平锁的释放锁操作是一样的,释放锁也就是唤醒等待队列中的第一个被挂起的线程。
虽然公平锁和非公平锁的释放锁方式一样,但是它们的获取锁方式不一样,公平锁获取锁的时候,如果1个线程获取到了锁,其他线程都会被挂起并且进入等待队列,后面来的线程的等待时间没有队列里的线程等待时间长的话,那么就放弃获取锁,进入等待队列。非公平锁获取锁的方式是一种抢占式的方式,不考虑等待时间的问题,无论哪个线程获取到了锁,其他线程就进入等待队列。
6.ThreadLocal为什么保证线程私有
ThreadLocal,也称线程本地变量。顾名思义,ThreadLocal为变量在每个线程中都创建了一个副本,那么每个线程可以访问自己内部的副本变量。其意义在于高并发场景时变量为各个线程所读时互不影响,有效地避免了线程安全问题,也避免了同步造成的性能开销。不过使用这种技巧的同时,也会伴随着一些不可避免的缺陷:由于在每个线程中都创建了副本,所以要考虑它对资源的消耗。换句话来说,ThreadLocal是以空间换时间的典型例子。ThreadLocal和线程同步机制都是为了解决多线程中相同变量的访问冲突问题。在同步机制中,通过对象的锁机制保证同一时间只有一个线程访问变量。这时该变量是多个线程共享的,使用同步机制要求程序慎密地分析什么时候对变量进行读写,什么时候需要锁定某个对象,什么时候释放对象锁等繁杂的问题,程序设计和编写难度相对较大。
ThreadLocal类的基本方法:
lvoid
set(Object value)设置当前线程的线程局部变量的值。
lpublic
Object get()该方法返回当前线程所对应的线程局部变量。
lpublic void
remove()将当前线程局部变量的值删除,目的是为了减少内存的占用,
lprotected
Object initialValue()返回该线程局部变量的初始值,该方法是一个protected的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第1次调用get()或set(Object)时才执行,并且仅执行1次,ThreadLocal中的缺省实现直接返回一个null。
ThreadLocal实现机制
在每个线程Thread内部有一个ThreadLocal.ThreadLocalMap类型的成员变量threadLocals,这个threadLocals就是用来存储实际的变量副本的,内部由名为table的Entry数组维护,Entry的key为当前ThreadLocal变量,value为变量副本(即T类型的变量)。
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
应用场景
Spring利用ThreadLocal解决线程安全问题
Singleton表示该bean全局只有一个实例,Spring中bean的scope默认也是singleton。
Prototype表示该bean在每次被注入的时候,都要重新创建一个实例,这种情况适用于有状态的Bean。
Spring框架里的Bean,默认为单例模式,这是在多线程开发的时候要尤其注意的地方。当多用户同时请求一个服务时,容器会给每一个请求分配一个线程,这是多个线程会并发执行该请求多对应的业务逻辑(成员方法),此时就要注意了,如果该处理逻辑中有对该单例Bean状态的修改(体现为该单例Bean的成员属性),则必须考虑线程同步问题。
在一般情况下,只有无状态的Bean才可以在多线程环境下共享,在Spring中,绝大部分Bean都可以声明为singleton作用域。就是因为Spring对一些Bean中非线程安全“状态性对象”采用ThreadLocal进行封装,让它们也成为线程安全的“状态性对象”,因此有状态的Bean就可以在多线程中共享了。
7.Java中,编写多线程程序的时候你会遵循哪些最佳实践
a)给线程命名,这样可以帮助调试。
b)最小化同步的范围,而不是将整个方法同步,只对关键部分做同步。
c)如果可以,更偏向于使用volatile而不是synchronized。
d)使用更高层次的并发工具,而不是使用wait和notify来实现线程间通信,如BlockingQueue,CountDownLatch及Semeaphore。
e)优先使用并发集合,而不是对集合进行同步,并发集合提供更好的可扩展性。
8.ConcurrentHashMap(重点)
JDK 1.6
ConcurrentHashMap采用锁分段的机制,实现并发的更新操作,底层由Segment数组和HashEntry数组组成。Segment继承ReentrantLock用来充当锁的角色,每个Segment对象守护每个散列映射表的若干个桶。HashEntry用来封装映射表的键/值对;每个桶是由若干个HashEntry对象链接起来的链表。一个ConcurrentHashMap实例中包含由若干个Segment对象组成的数组,下面我们通过一个图来演示一下ConcurrentHashMap的结构:
JDK1.8
1.8的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层依然采用数组+链表+红黑树的存储结构。
改进一:取消segments字段,直接采用transient
volatile HashEntry table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。
改进二:将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。
put方法
1)hash算法
staticfinal int spread(int h) {return (h ^ (h >>> 16)) & HASH_BITS;}
2)table中定位索引位置,n是table的大小
intindex = (n - 1) & hash
3)获取table中对应索引的元素f。
Doug
Lea采用Unsafe.getObjectVolatile来获取,也许有人质疑,直接table[index]不可以么,为什么要这么复杂?
在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
4)如果f为null,说明table中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject方法插入Node节点。
Ø如果CAS成功,说明Node节点已经插入,随后addCount(1L,
binCount)方法会检查当前容量是否需要进行扩容。
Ø如果CAS失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。
5)如果f的hash值为-1,说明当前f是ForwardingNode节点,意味有其它线程正在扩容,则一起进行扩容操作。
注:ForwardingNode:一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动。
6)其余情况把新的Node节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发
9.CopyOnWriteArrayList和CopyOnWriteArraySet
Copy-On-Write简称COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,CopyOnWriteArrayList和CopyOnWriteArraySet。
CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后在新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。
CopyOnWrite的应用场景
CopyOnWrite并发容器用于读多写少的并发场景。
CopyOnWrite的缺点
CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。
(1)内存占用问题
因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。
(2)数据一致性问题
CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。
publicbooleanadd(Ee) {
finalReentrantLocklock=this.lock;
lock.lock();
try{
Object[] elements=getArray();
intlen=elements.length;
Object[] newElements=Arrays.copyOf(elements, len+1);
newElements[len]=e;
setArray(newElements);
returntrue;
}finally{
lock.unlock();
}
}
一致性又可以分为强一致性与弱一致性。
强一致性可以理解为在任意时刻,所有节点中的数据是一样的。同一时间点,你在节点A中获取到key1的值与在节点B中获取到key1的值应该都是一样的。
弱一致性包含很多种不同的实现,目前分布式系统中广泛实现的是最终一致性。
所谓最终一致性,就是不保证在任意时刻任意节点上的同一份数据都是相同的,但是随着时间的迁移,不同节点上的同一份数据总是在向趋同的方向变化。也可以简单的理解为在一段时间后,节点间的数据会最终达到一致状态。
CopyOnWriteArraySet是基于CopyOnWriteArrayList实现的。
10.阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加操作支持阻塞的插入和移除方法。支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满为止。支持阻塞的移除方法:当队列为空时,获取元素的线程阻塞等待线程非空。
阻塞队列通常用于生产者和消费者的场景,生产者就是向队列里添加元素,而消费者就是从队列里取出元素。阻塞队列就是生产者存储元素而消费者用来获取元素的容器。
注意:如果是无界阻塞队列,队列永远都不会出现满的情况,所以使用put或者take方法永远都不会被阻塞,而且使用put方法时,该方法永远返回为true。
JDK提供的阻塞队列
JKD7提供了7个阻塞队列:
(1)ArrayBlockingQueue:由数组结构组成的有界阻塞队列
(2)LinkedBlockingQueue:由链表结构组成的有界阻塞队列
(3)PriorityBlockingQueue:支持优先级排序的无界阻塞队列
(4)DelayQueue:使用优先级队列队列实现的无界阻塞队列
(5)SynchronousQueue:不存储元素的阻塞队列
(6)LinkedTransferQueue:由链表结构组成的无界阻塞队列
(7)LinkedBlockingDeque:由链表结构组成的双向阻塞队列
ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的有界队列,此队列按照先进先出的原则对元素进行排序。
默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。
LinkedBlockingQueue
LinkedBlockingQueue是一个用链表实现的有界阻塞队列,此队列默认最大长度为Integer.MAX_VALUE,按照先进先出(FIFO)的原则对元素进行排序。
PriorityBlockingQueue
PriorityBlockingQueue是一个支持优先级的无界阻塞队列,默认情况下元素采用自然排序升序排列,也可以自定义类实现compareTo()方法来指定元素排序规则或者初始化PriorityBlockingQueue时指定构造参数Comparator来对元素进行排序,需要注意的是不能保证同优先级的元素排序。
DelayQueue
DelayQueue是一个支持延时获取元素的无界阻塞队列,队列使用PriorityQueue来实现。队列中元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有延迟期满时才能从队列中提出元素。
DelayQueue非常有用,可以将DelayQueue运用在一下场景:
l缓存系统的设计:可以送DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦从DelayQueue获取元素,就表示缓存到期了。
l定时任务调度:使用DelayQueue保存当前将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimeQueue就是使用DelayQueue实现的。
SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列,每个put操作必须等待一个take操作,否则不能继续添加元素。
支持公平访问队列,默认情况下线程采用非公平性策略,使用带boolean参数的构造方法可以实现等待线程采用先进先出(FIFO)的顺序访问队列。
LinkedTransferQueue
LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列,相当于其他阻塞队列,LinkedTransferQueue多了一tryTransfer和transfer方法。
(1)transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或者带时间限制的poll方式时,transfer()方法可以把生产者传入的元素立即transfer(传输)给消费者,如果没有消费者在等待接收元素,transfer方法将元素存放在队列的tail节点,并等待该元素被消费者消费了才返回。
(2)tryTransfer方法
tryTransfer方法用来试探生产者传入元素是否能够直接传递给消费者,如果没有消费者等待接收元素则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收方法立即返回,而transfer需要等待消费者消费了才返回。
LinkedBlockingDeque
LinkedBlockingDeque是一由链表结构组成的双向阻塞队列,所谓双向队列指的是可以从队列两端插入和移除元素,双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一般竞争。相比其他阻塞队列,LinkedBlockingDeque多了addFirst,
addLast,offerFirst,offerLast,peekFirst,peekLast等方法。
在初始化LinkedBlockingDeque时可以设置容量防止其过渡膨胀。另外,双向阻塞队列可以运行在“工作窃取”模式中。
11.生产者消费者模式
classProducerimplementsRunnable {
privateBlockingQueueque;
publicProducer(BlockingQueueque) {
this.que=que;
}
@Override
publicvoidrun() {
try{
while(true) {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName()+"get
"+que.take());
}
}catch(InterruptedExceptione) {
e.printStackTrace();
}
}
}
classConsumerimplementsRunnable {
privateBlockingQueueque;
publicConsumer(BlockingQueueque) {
this.que=que;
}
privatestaticAtomicLongid=newAtomicLong(0);
@Override
publicvoidrun() {
try{
while(true) {
que.put(id.getAndIncrement());
System.out.println(Thread.currentThread().getName()+"put
"+id);
Thread.sleep(300);
}
}catch(InterruptedExceptione) {
e.printStackTrace();
}
}
}
12.Java中线程安全的实现方法
(1)互斥同步(悲观并发)
互斥包括的几种方式:临界区、互斥量、信号量。
在Java中最基本的互斥同步的手段就是synchronized关键字,其次有重入锁(ReentrantLock)
(2)非阻塞法同步(基于冲突检测的乐观并发)
互斥同步最主要的问题就是进行线程阻塞和唤醒带来的性能问题,因此这种同步也称为阻塞同步。
主要使用的就是CAS操作。
(3)无同步方案
要保证线程安全,并不是一定就要进行同步,两者没有因果关系。
Ø可重入代码
Ø线程本地变量(ThreadLocal)
13.同步和异步,阻塞和非阻塞
同步与异步
同步和异步关注的是消息通信机制(synchronous communication/ asynchronous communication)
阻塞与非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
·同步:自己亲自出马持银行卡到银行取钱。
·异步:委托一小弟拿银行卡到银行取钱,然后给你。
·阻塞:ATM排队取款,你只能等待。
·非阻塞:柜台取款,取个号,然后坐在椅子上做其它事,等号广播会通知你办理。
14.Java中同步机制的实现方式
(1)synchronized关键字
(2)volatile关键字是线程同步的轻量级实现
(3)wait /notify(notifyAll)
(4)同步容器类
(5)ReentrantLock可重入锁
(6)Condition条件对象
条件对象的意义在于对于一个已经获取锁的线程,如果还需要等待其他条件才能继续执行的情况下,才会使用Condition条件对象。
(7)ThreadLocal
(8)并发包下的工具类
一般情况下,我们不会使用wait/notifyAll或者ReentrantLock这种比较底层的类,而是使用并发包下提供的一些工具类。
-Semaphore信号量被用于控制特定资源在同一个时间被访问的个数
-CountDownLatch
-CyclicBarrier
-AbstractQueuedSynchronizer
AQS是很多同步工具类的基础,比如ReentrentLock里的公平锁和非公平锁,Semaphore里的公平锁和非公平锁,CountDownLatch里的锁等他们的底层都是使用AbstractQueuedSynchronizer完成的。
15.同步容器类和并发容器类
同步容器
同步容器实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法同步,使得每次只有一个线程能够访问容器的状态。
lVector和Hashtable以及继承自Vector的Stack。
lCollections.synchronizedXxx等工厂方法创建的类。
同步容器的问题
同步容器中的方法采用了synchronized进行了同步,这必然会影响到执行性能.同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,但代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低。
对Vector等容器并发地进行迭代修改时,会报ConcurrentModificationException异常。
并发容器
Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。主要解决了两个问题:
1)根据具体场景进行设计,尽量避免synchronized,提供并发性。
2)定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。
ConcurrentHashMap用来替代同步且基于散列的Map
CopyOnWriteArrayList用于在遍历操作为主要操作的情况下代替同步的List
CopyOnWriteArraySet对应的非并发容器:HashSet;目标:代替synchronizedSet
ConcurrentLinkedQueue不会阻塞的队列;对应的非并发容器:Queue;原理:基于链表实现的FIFO队列(LinkedList的并发版本)
ConcurrentSkipListMap作为同步的SortedMap的并发替代品
ConcurrentSkipListSet作为同步的SortedSet的并发替代品
阻塞队列:
LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
PriorityBlockingQueue:按优先级排序的队列
对应的非并发容器:BlockingQueue
特点:拓展了Queue,增加了可阻塞的插入和获取等操作
16.CountDownLatch和CyclicBarrier的区别
(1)CyclicBarrier可以循环使用,而CountDownLatch只能用一次。
(2)CountDownLatch是减计数方式,计数等于0时释放所有等待的线程;CyclicBarrier是加计数方式,计数达到构造方法中参数指定的值时释放所有等待的线程。
(3)应用场景不一样
lCountDownLatch应用场景:主/从任务模式。是一个或多个线程(主任务),等待另外N个线程(从任务)完成某个事情之后才能执行。
lCyclicBarrier应用场景:队友模式。一组N个线程(N个队友)相互等待,任意一个线程(某个队友)没有完成任务,所有线程都等着。直到这一组所有线程的任务完成,这组中每个线程才能继续往下运行。
(4)实现机制不一样
CountDownLatch底层使用的是共享锁,它有个内部类Sync,这个Sync继承AQS,实现了共享锁。CyclicBarrier底层使用的是ReentrantLock和这个lock的条件对象Condition。它是通过独占锁实现的
17.Future和FutureTask
Executor是Runnable和Callable的调度容器。
Future提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。
Øcancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
ØisCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true。
ØisDone方法表示任务是否已经完成,若任务完成,则返回true。
Øget()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。
Øget(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内还没获取到结果,就直接返回null。
RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,管理任务。
18.同步工具类
(1)闭锁CountDownLatch
(2)FutureTask
(3)信号量Semaphore
信号量(Semaphore)是用来控制同时访问特定资源的线程数量,通过协调各个资源,以保证合理地使用公共资源。通过acquire()获取一个许可,如果没有就等待,而release()释放一个许可。
(4)栅栏CyclicBarrier
19.Executor框架
Executor框架是在Java5中引入的,可以通过该框架来控制线程的启动,执行,关闭,简化并发编程。Executor框架把任务提交和执行解耦,要执行任务的人只需要把任务描述清楚提交即可,任务的执行提交人不需要去关心。通过Executor框架来启动线程比使用Thread更好,更易管理,效率高,避免this逃逸问题。Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。
Executor基于生产者-消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。
Executor框架的两级调度模型
在Hotspot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。
应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
Executor框架由3大部分组成:
1)任务:被执行任务需要实现接口Runnable、Callable。
2)任务执行:任务执行机制的核心接口Executor,继承Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口,分别为ThreadPoolExecutor和ScheduledThreadPoolExecutor。
3)异步计算的结果:Future和实现了Future接口的FutureTask类。
Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。比Timer更灵活,功能更强大。
Future接口和实现Future接口的FutureTask类,代表异步计算的结果
Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或者ScheduledThreadPoolExecutor执行。
ExecutorService(可以理解为程序员提供了一堆操作Executor的API)
是一个接口,继承自Executor接口,提供了更多的方法,提供了生命周期的管理方法,以及可跟踪一个或多个异步任务执行状况的方法。
ExecutorService的生命周期包括三种状态:运行,关闭,终止。创建后便进入运行状态,当调用了shutdown()方法时,进入关闭状态,此时不再接受新任务,但是它还在执行已经提交了的任务,当所有的任务执行完后,便达到了终止状态。
方法execute(Runnable)接收一个java.lang.Runnable对象作为参数,并且以异步的方式执行它。
方法submit(Runnable)同样接收一个Runnable的实现作为参数,但是会返回一个Future对象。这个Future对象可以用于判断Runnable是否结束执行。
Executors
提供了一系列静态工厂方法用于创建各种线程池。返回的线程池都实现了ExecutorService接口。如果没有特殊要求,请尽量使用此类中提供的静态方法生成线程池。
运行过程
1)主线程首先要创建实现Runnable或者Callable接口的任务对象。
2)然后可以把Runnable对象直接交给ExecutorService执行(ExecutorService.execute(Runnable command)或者ExecutorService.submit(Runnable task)或者ExecutorService.submit(Callable
task))。如果执行ExecutorService::submit()那么将返回一个实现Future接口的对象。
3)最后,主线程可以执行FutureTask::get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel来取消次任务的执行。
20.线程池
线程池:是指管理一组同构工作线程的资源池。
线程池作用(线程复用,控制最大并发数,管理线程)
(1)重用线程池中的线程,减少因对象创建、销毁所带来的性能开销;
(2)能有效的控制线程的最大并发数,提高系统资源利用率,同时避免过多的资源竞争,避免堵塞;
(3)能够对多线程进行简单的管理,使线程的使用简单、高效;
(4)不会由于等待创建线程而延迟任务的执行,从而提高了响应性。
线程池类型和对应的应用场景
(1)newFixedThreadPool
创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程。
(2)newCachedThreadPool
创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
(3)newSingleThreadExecutor
一个单线程的Executor,它创建单个工作线程来执行任务,如果某个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行。
(4)newScheduledThreadPool
创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务。
线程池的工作过程如下:
(1)线程池刚创建时,里面没有一个线程,任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
(2)当调用execute()方法添加一个任务时,线程池会做如下判断:
a)如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
b)如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
c)如果这时候队列满了,而且正在运行的线程数量小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
d)如果队列满了,而且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会抛出异常RejectExecutionException。
(3)当一个线程完成任务时,它会从队列中取下一个任务来执行。
(4)当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小。
线程池的参数设置
对于maximumPoolSize:指的是线程池中最多允许有多少个线程。
对于corePoolSize:指的是线程池中正在运行的线程。
线程池实现原理
1)首先,各自存放线程和任务,其中,任务带有阻塞。
private final HashSet workers = newHashSet();
private final BlockingQueueworkQueue;
工作线程:线程池创建线程时,会将线程封装成工作线程worker,worker在执行完任务后,还会循环地获取工作队列里的任务来执行。workers是HashSet类型,即它是一个Worker集合。而一个Worker对应一个线程,也就是说线程池通过workers包含了"一个线程集合"。wokers的作用是,线程池通过它实现了“允许多个线程同时运行”。
workQueue是BlockingQueue类型,即它是一个阻塞队列。当线程池中的线程数超过它的容量的时候,线程会进入阻塞队列进行阻塞等待。
2)然后,在execute方法中进行addWorker(command,true),也就是创建一个线程,把任务放进去执行;或者是直接把任务放入到任务队列中。
3)接着如果是addWorker,那么就会new Worker(task),调用其中run方法,在Worker的run方法中,调用runWorker(this)方法,在该方法中就会具体执行我们的任务task.run(),同时这个runWorker方法相当于是个死循环,正常情况下就会一直取出任务队列中的任务来执行,这就保证了线程不会销毁。
关闭线程池
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminated方法会返回true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,可以调用shutdownNow方法。
如何合理地设置线程池的大小
要想合理的配置线程池的大小,首先得分析任务的特性,可以从以下几个角度分析:
Ø任务的性质:CPU密集型任务、IO密集型任务、混合型任务。
Ø任务的优先级:高、中、低。
Ø任务的执行时间:长、中、短。
Ø任务的依赖性:是否依赖其他系统资源,如数据库连接等。
性质不同的任务可以交给不同规模的线程池执行。
对于不同性质的任务来说,CPU密集型任务应配置尽可能小的线程,如配置CPU个数+1的线程数,IO密集型任务应配置尽可能多的线程,因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,如配置两倍CPU个数+1,而对于混合型的任务,如果可以拆分,拆分成IO密集型和CPU密集型分别处理,前提是两者运行的时间是差不多的,如果处理时间相差很大,则没必要拆分了。
若任务对其他系统资源有依赖,如某个任务依赖数据库的连接返回的结果,这时候等
待的时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才能更好的利用CPU。
当然具体合理线程池值大小,需要结合系统实际情况,在大量的尝试下比较才能得出,以上只是前人总结的规律。
一个估算合理值的公式
最佳线程数目=((线程等待时间+线程CPU时间)/线程CPU时间)* CPU数目
这个公式进一步转化为:
最佳线程数目=(线程等待时间与线程CPU时间之比+ 1)* CPU数目