多线程编程
同步机制
Atomic类
- 原理
CAS+自旋,CAS依赖unsafe实现。缺点的高并发是自旋消耗cpu - jdk8的优化
新增了几个继承自Striped64的类,LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator,思想是CAS的目标分段,每个线程分别对应一个段,降低冲突,最后把所有段加起来返回
锁
- synchronized关键字
对象头中指向一个ObjectMonitor(c++实现)对象,monitor实现锁依赖于操作系统的Mutex Lock,使用时需要进行用户态到内核态的切换,所以效率较低。
锁优化过程:因为synchronized的效率问题,JDK1.6进行了优化,主要依靠对象头设计了三种不同类型的锁——偏向锁、轻量级锁、重量级锁。初始获得偏向锁,待其他线程竞争时,将偏向锁升级到轻量级锁,原持有锁的线程继续执行,竞争者自旋,一定次数后升级为重量级锁,竞争者阻塞。
详见:
深入理解Java并发之synchronized实现原理
Java性能 -- synchronized锁升级优化
- Lock
典型用法:
Lock l = ...;
l.lock();
try {
// access the resource protected by this lock
} finally {
l.unlock();
}
和synchronized的一些区别,除了使用方式上:
- try acquire
- 公平锁
- 可中断
- 读写锁
- 不保证优先级,支持公平/非公平
- 可重入
- 可降级
- 可中断
- 支持条件变量
官方示例:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
- 可重入实现
计数器 + 记录持有锁的线程
信号量
本质上是个计数器,可以限制能够访问资源的线程个数。常见场景如池的访问,官方示例:
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
当计数器为1时,构成一个二元信号量,可以当做一个“锁”使用,这个“锁”可以由其他线程释放,也就是说信号量没有owner的概念,这一点在死锁恢复中很有用。
条件变量
获取锁之后,可以等待某个条件,等待时释放锁。使用时需要和锁绑定在一起。
与wait、notify/notifyAll的区别类似于synchronized和lock。
官方示例:
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
AQS
AQS是JUC中大部分同步类(如上面说的ReentrantLock、Semaphore等)的底层实现框架。通过继承AQS并实现几个必要的方法,我们可以很容易地实现自己的同步类。简单地说,AQS通过CAS操作和一个等待队列(CLH队列的变体)来实现同步功能,如下图:
详见美团技术团队的文章:
从ReentrantLock的实现看AQS的原理及应用
线程池
- Executor,顶层接口,只有execute方法
- ExecutorService,继承自Executor,增加了shutdown和返回future的能力,关闭示例:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
线程池的五种状态:
线程池的状态
https://www.jianshu.com/p/03ecc5a4316c
- ThreadPoolExecutor,核心参数:
- Core and maximum pool sizes
线程数小于core size时,提交任务直接创建新线程,不管有没有idle线程;
线程数介于core size和maximum之间时,只有没有idle线程才会创建 - Keep-alive times
超过core size的线程存活时间 - 队列
线程数大于core size后,会优先扔进队列,队列满后会新建线程直到已经达到maximumPoolSIze,这时任务会被拒绝
常见的3种队列:- 直传,如使用SynchronousQueue,不存储,消费一个放一个,一般要求无界的maximumPoolSize防止拒绝任务。这种策略防止任务之间有依赖的时候卡住;
- 无界队列,如LinkedBLockingQueue,此时maximumPoolSize是无效的,也不会大于corePoolSize个数的线程被创建出来;
- 有界队列,如ArrayBlockingQueue,queue size和maximumPoolSize更难取舍,取决于cpu占用和吞吐量的取舍
- 拒绝任务
队列大小和最大线程数被触达后就会执行RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
四种预置的拒绝策略包括:
- Core and maximum pool sizes
总结ThreadPoolExecutor的工作流程:
当线程数小于corePoolSize时,每次提交直接创建新线程;当corePoolSize达到后,再提交会放到队列,当队列满后,继续创建线程直到maximumPoolSIze,然后就是开始拒绝任务提交
如何设置线程池的参数?
- 机械化的公式
- IO型/计算型
- 监控 + 动态配置
-
Executors
工厂类,常见的四种线程池:-
newCachedThreadPool,可缓存线程
newCachedThreadPool -
newFixedThreadPool,定长,队列无界
newFixedThreadPool -
newScheduledThreadPool,定时,支持延迟,异常之后不会继续执行
newScheduledThreadPool
实现延时基于:DelayedWorkQueue,一般而言,延时队列基于优先级队列(堆)实现。
两种用法:- scheduleWithFixedDelay
固定延时,等上次执行完后,等待延时时间执行 - scheduleAtFixedRate
固定频率执行,如果任务执行时间超过定时,就立刻开始,否则等待固定时间到再执行
- scheduleWithFixedDelay
-
newSingleThreadExecutor
newSingleThreadExecutor
-
-
fork join
传统的线程池无法处理任务直接存在依赖的情况,也就是分治。fork join可以应用在这种场景中,类似map-reduce,fork把任务拆成小任务,join合并结果,多个线程有各自的队列,当自己空闲时会去其他线程队列偷任务执行。使用要注意不要阻塞父线程使其成为监工。
Java内存模型
- 线程和进程
进程是资源分配的单元,拥有独立的地址空间。线程是任务调度的单元,是进程内部的一个执行序列。一个进程至少有一个线程,进程之间切换开销大,通信困难,线程切换开销小,可以共享进程资源,同步方便。 -
线程的状态
NEW、RUNABLE、TERMINATED、WAITING、TIMED_WAITING、BLOCKED
线程状态
- JMM
Java内存模型,JMM,可以理解为一组规范、规则,屏蔽了底层硬件(cpu、寄存器、内存)的差异,为java程序提供了统一的内存访问模型。 - 三大特性
JMM围绕三大特性展开,包括原子性、可见性、有序性。加锁可以满足全部三条特性,volatile可以满足可见性和有序性(通过内存屏障) - happens-before原则
除了三大特性以外,还可以通过happens-before原则来推定程序的顺序:- 程序次序规则
- 锁定规则
- volatile变量规则
- 传递规则
- 线程启动规则
- 线程中断规则
- 线程终结规则
- 对象终结规则
理解happens-before原则:因为工作线程的缓存和主内存同步问题,先行发生的线程对内存的操作未必能被后续线程观测到,而如果满足hb原则则可以保证这一点
其他
- ThreadLocal
每个thread对象里有一个threadlocals属性,这是一个map,以ThreadLocal为key,set的值为value。所以对每个threadlocal,每个thread内部都有一个map存对应的值。