JUC 并发工具包
java.util.concurrent包:定义了并发编程中很常用的实用工具类。
一些基础比较
volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。
CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。
内部锁和显式锁:重量级多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性和原子性。
概念总览:
volatile变量: 轻量级的锁
线程可见性:保证每一个线程可以实时知道volatile变量的值变化。对于多线程,不是一种互斥关系,不能保证变量状态的“原子性操作”
原子变量的基础:CAS算法
一种无锁的非阻塞算法的实现,首先,CPU 会将内存中将要被更改的数据与期望的值做比较。然后,当这两个值相等时,CPU 才会将内存中的数值替换为新的值。否则便不做操作。最后,CPU 会将旧的数值返回。这一系列的操作是原子的。
CAS 包含了3 个操作数:
- 需要读写的内存值V
- 进行比较的值A
- 拟写入的新值B
java.util.concurrent.atomic包下提供了一些原子操作的常用类:
- AtomicBoolean
- AtomicInteger
- AtomicLong
- AtomicReference
- AtomicIntegerArray
- AtomicLongArray
- AtomicMarkableReference
- AtomicReferenceArray
- AtomicStampedReference
核心方法: boolean compareAndSet(expectedValue, updateValue)
AQS(AbstractQueuedSynchronizer)
抽象队列同步器AQS:维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
- state: 来维护同步状态,
- FIFO队列: 来完成资源获取线程的排队工作。
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
基于AQS的锁:
- Reentrant
- Semaphore
- CountDownLatch
- ReentrantReadWriteLock
- SynchronousQueue
- FutureTask
AQS通过getState、setState及compareAndSetState等protected类型方法进行状态转换。
- ReentrantLock用state表示所有者线程已经重复获取该锁的次数。
- Semaphore用state表示剩余的许可数量。
- CountDownLatch用state表示闭锁的状态,如关闭、打开。
- FutureTask用state表示任务的状态,如尚未开始、正在运行、已完成、已取消。
除了state,在同步器类中还可以自行管理一些额外的状态变量。如:
- ReentrantLock保存了锁的当前所有者的信息,这样就能区分某个获取操作是重入的还是竞争的。
- FutureTask用result表示任务的结果,该结果可能是计算得到的答案,也可能是抛出的异常。
并发容器
JUC 提供了多种并发容器类来改进同步容器的性能。如:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、 CopyOnWriteArrayList 和 CopyOnWriteArraySet。
ConcurrentHashMap
ConcurrentHashMap 在JDK 7之前是通过Lock和Segment(分段锁)实现并发安全,JDK 8之后改为CAS+synchronized来保证并发安全。
ConcurrentHashMap、HashMap和HashTable的区别:
Hashtable 是线程安全的哈希表,由于是通过内置锁 synchronized 来保证线程安全,在资源争用比较高的环境下,Hashtable 的效率比较低,不建议使用。
HashMap 是非线程安全的哈希表,常用于单线程程序中。
ConcurrentHashMap 是一个支持并发操作的线程安全的HashMap,但是他不允许存储空key或value。使用CAS+synchronized来保证并发安全,在并发访问时不需要阻塞线程,所以效率是比Hashtable 要高的。
ConcurrentHashMap并不是将每个方法在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为“分段锁”。
segments 为整个ConcurrentHashMap,segment为分段,table时分段的具体内容,HashBucket是桶形式的hash存储内容,HashEntry是具体的数据节点。
ConcurrentHashMap的出现主要是提高了hashmap在多线程下的安全性,可以理解是hashmap在多线程上的替代。
CountDownLatch 闭锁
CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行
CountDownLatch是通过一个计数器来实现的,当我们在new 一个CountDownLatch对象的时候需要带入该计数器值,该值就表示了线程的数量。每当一个线程完成自己的任务后,计数器的值就会减1。当计数器的值变为0时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。
使用场景:
- 需要等待某个条件达到要求后才能做后面的事情;
- 同时当线程都完成后也会触发事件,以便进行后面的操作。
CountDownLatch最重要的方法是
- countDown(): 计数减一
- await():等待计数为0,如果没有到达0,就只有阻塞等待了。
Callable 接口
执行Callable,需要FutureTask实现类的支持,用于接受运算的结果。FutureTask是Future接口的实现类。
示例:
//注入实现Callable接口的类,必须重写其中的call方法。
FutureTask<Integer> result = new FutureTask<>(threadDemo);
new Thread(result).start();
class ThreadDemo implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
}
}
Callable 接口类似于 Runnable,两者都可以实现多线程,其中Runnable无返回值,而Callable有返回值,但是Callable实现需要依赖Future接口实现的类。其中Callable重写call方法,Runnable重写run方法。
ReentrantLock 类
ReentrantLock 实现了 Lock 接口,ReentrantLock实现Lock有两种模式即公平模式和不公平模式
,参看java文档介绍如下。
一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。
ReentrantLock 将由最近成功获得锁,并且还没有释放该锁的线程所拥有。当锁没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁并返回。如果当前线程已经拥有该锁,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。
此类的构造方法接受一个可选的公平 参数。当设置为 true 时,在多个线程的争用下,这些锁倾向于将访问权授予等待时间最长的线程。否则此锁将无法保证任何特定访问顺序。与采用默认设置(使用不公平锁)相比,使用公平锁的程序在许多线程访问时表现为很低的总体吞吐量(即速度很慢,常常极其慢),但是在获得锁和保证锁分配的均衡性时差异较小。不过要注意的是,公平锁不能保证线程调度的公平性。因此,使用公平锁的众多线程中的一员可能获得多倍的成功机会,这种情况发生在其他活动线程没有被处理并且目前并未持有锁时。还要注意的是,未定时的 tryLock 方法并没有使用公平设置。因为即使其他线程正在等待,只要该锁是可用的,此方法就可以获得成功。
最典型的代码:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
// 一定记得要在finally中释放锁。
lock.unlock()
}
}
}
ReentrantLock里面的功能函数主要有:
lock() //阻塞模式获取资源
lockInterruptibly() //如果当前线程未被 中断,则获取锁。
tryLock() //仅在调用时锁未被另一个线程保持的情况下,才获取该锁。
tryLock(time) //在一段时间内尝试获取资源
unlock() //释放资源,为了保证一定执行,放在finally中执行。
ReadWriteLock && ReentrantReadWriteLock
读-写锁ReadWriteLock:维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有writer,读取锁可以由多个reader 线程同时保持。写入锁是独占的。
ReadWriteLock 读取操作通常不会改变共享资源,但执行写入操作时,必须独占方式来获取锁。对于读取操作占多数的数据结构。ReadWriteLock 能提供比独占锁更高的并发性。而对于只读的数据结构,其中包含的不变性可以完全不需要考虑加锁操作。
Lock readLock() //返回用于读取操作的锁。
Lock writeLock() //返回用于写入操作的锁。
Condition 控制线程通信
Condition 是需要结合具体Lock实现类使用的一种通信类接口。
Condition实例实质上被绑定到一个锁上。要为特定Lock 实例获得Condition 实例,请使用其newCondition() 方法。
示例:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// 创建方法
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
常用方法:
void await()
// 造成当前线程在接到信号或被中断之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
void awaitUninterruptibly()
// 造成当前线程在接到信号之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
void signal()
// 唤醒一个等待线程。
void signalAll()
// 唤醒所有等待线程。
线程锁细论
静态方法公用一个锁: 可理解为类锁class.lock
非静态方法不同对象用不同锁: 柯理解为对象锁:this.lock
每一个类或者对象都只有一个锁,无论多少个线程在一个时刻,只能有一个线程获得相应的锁。也就是说,类有一个class锁,每一个对象都有且仅有一个属于自己的this锁。this锁和class锁相互独立。
线程池
线程池可以解决的问题:对比数据库连接池的概念理解,降低频繁的创建删除线程的开销。以下仅介绍几个关键类,具体的可以在实际使用时查询文档。
Executors
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ScheduledExecutorService singleThreadScheduledPool = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
ThreadPoolExecutor
线程调度
接口 ScheduledExecutorService, 可安排在给定的延迟后运行或定期执行的命令,所有的 schedule 方法都接受相对 延迟和周期作为参数,而不是绝对的时间或日期:
用法示例:
//以下是一个带方法的类,它设置了 ScheduledExecutorService ,在 1 小时内每 10 秒钟蜂鸣一次:
import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
public void beepForAnHour() {
final Runnable beeper = new Runnable() {
public void run() { System.out.println("beep"); }
};
final ScheduledFuture<?> beeperHandle =
scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
scheduler.schedule(new Runnable() {
public void run() { beeperHandle.cancel(true); }
}, 60 * 60, SECONDS);
}
}