Java多线程深度探索
- 线程
- 多线程操作
-
锁
- [1.锁类型](#1. 锁类型)
- 2.synchronized
- 3.concurrent包的锁机制
- 线程池
线程
1.优先级
每个线程有一个优先级,更高优先级的线程优先运行,优先的意思是只是在分配cpu时间段的时候,得到的概率高一些。
当在某个线程创建一个新的线程,这个线程有与创建线程相同的优先级。
setPriority(int newPriority)
2.守护线程
每个线程可能被标记为1个守护线程,当在某个线程创建一个新的线程,如果创建线程是守护线程则它也是。
当jvm中只剩下守护线程,守护线程不会支持jvm继续运行。
setDaemon(boolean on)
必须在start之前设置。
当一个jvm启动,这里通常有一个非守护线程(运行main函数),这个jvm继续执行线程们直到:
- Runtime类的exit方法被执行。
- 所有非守护线程都已死亡,或者从run中返回,或者抛出异常。
3.常用方法
Paste_Image.png4.线程状态:
jvm中的线程有以下状态(不是操作系统)
- NEW :还未开始的线程
- RUNNABLE :正在被jvm执行,但可能正在等待CPU调度
- BLOCKED : 等待获取锁以进入同步方法/代码块
-
WAITING : 线程在这个状态下等待其他线程执行特定操作。通常为当执行以下操作后
Object.wait
,
Thread.join
,
LockSupport.park
-
TIMED_WAITING :当线程开始等待一段时间。通常为当执行以下操作后
Thread.sleep(long)
,
Object.wait(long)
,
Thread.join(long)
,
LockSupport.parkNanos
,
LockSupport.parkUntil
- TERMINATED :线程已结束
Thread.join
的实现是基于Object.wait
的。就不与讨论。
Thread.sleep
一般不会用于线程同步。他也与锁无关,不会释放锁。
线程状态的改变主要有2种方式:
Object.wait
/Object.notify
这是基于jvm的线程同步机制与synchronized
关键字息息相关,wait时会释放锁,所以必须先通过synchronized
关键字获取锁。
LockSupport.park
是concurrent包新增的线程状态切换方式。与synchronized
的锁机制没有关系,直接改变线程状态,所以也不会有死锁问题。concurrent包所有线程状态切换全基于此。
多线程操作
1.volatile
线程工作区:
每个线程有各自的工作内存,用于给JVM大量的空间来优化线程内指令的执行。主存中的变量可以被拷贝到线程的工作内存中去单独执行,在执行结束后,结果可以在某个时间刷回主存。
可见性:
volatile关键字给与变量可见性,该变量在所有线程的工作区中都是一致的,线程中拿到的永远是最新的值,就是并发的时候每个线程总是能看到其他线程对这个volatile变量最后的写入 。
例:
public class Thread1 extends Thread {
private boolean flag = false;
public void run() {
while(!flag) {
}
}
public void close() {
flag = true;
}
}
调用close()根本停不下来。需要给flag加上volatile
。保存在所有线程值中一致。
原理:
如果对声明了volatile变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,将这个变量所在缓存行的数据写回到系统内存。但是就算写回到内存,如果其他处理器缓存的值还是旧的,再执行计算操作就会有问题,所以在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议,每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器要对这个数据进行修改操作的时候,会强制重新从系统内存里把数据读到处理器缓存里。
2.Atomic
原子性:volatile并不保证原子性。在i++
的过程中最为明显。
private static volatile int value = 0;
private static class LoopVolatile implements Runnable {
public void run() {
int count = 0;
while (count < 1000) {
value++;
count++;
}
}
}
public static void main(String[] args) {
Thread t1 = new Thread(new LoopVolatile());
t1.start();
Thread t2 = new Thread(new LoopVolatile());
t2.start();
while (t1.isAlive() || t2.isAlive()) {
}
System.out.println("final val is: " + value);
}
Output:-------------
final val is: 1912
value++
这条指令也分为4条jvm指令,而这4条指令是不安全的。容易脏读丢失修改。所以进行了2000次加法但结果一般不会是2000。
Atomic类就是为了解决原子性问题而存在:
这些类实现也都是用
volatile
来保证可见性。UnSafe
类提供的底层实现原子操作,举个AtomicInteger 的例子:
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();//用来进行CAS的Unsafe类
private static final long valueOffset;//value的内存地址偏移Unsafe类操作需要
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;//真正的值
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
//…………
}
参考资料:
为什么volatile不能保证原子性而Atomic可以
3.CAS
CAS(Compare and Swap)即比较并替换,设计并发算法时常用到的一种技术。返回boolean值。
赋值操作即是新值替换旧值,所以先比较现在的值是否为自己所以为的旧值,如果不是,则说明自己发生了脏读。本次操作失败返回false。
CAS无锁的进行原子操作。Java的CAS由UnSafe
类提供,是底层实现的原子指令,但是不同的CPU架构可能提供的原子指令不一样,也有可能需要某种形式的内部锁,所以该方法不能绝对保证线程不被阻塞。
参考资料:
JAVA CAS原理深度分析
锁
1.锁类型
锁方式分:
- 自旋锁:用一个无限循环来停住线程,线程一直是运行态,占CPU但响应速度快。
- 阻塞锁:直接线程进入阻塞态,再恢复运行态,线程状态切换会比较耗资源。
- 自适应自旋锁:自旋锁的自旋上界由同一个锁上的自旋时间统计和锁的持有者状态共同决定。当自旋超过上界后,自旋锁就升级为阻塞锁。
公平性:
- 公平锁:线程进入等待的顺序决定了恢复顺序
- 非公平锁:恢复时乱序
悲观/乐观:
- 悲观锁:1次事务开始时就锁住全部相关资源。提交后解锁。效率低。能防脏读。
- 乐观锁:只是提交事务的时候先锁再改再解锁。不能防脏读。
独占/共享:
- 独占锁:只能有1个线程持有锁。
- 共享锁:可以同时有多个线程持有锁。
参考资料:
Java锁的种类以及辨析
2.synchronized
synchronized是jvm实现的锁机制。是一种可重入/非公平/悲观/独占锁。
可以把任何一个非null对象作为"锁",当synchronized作用在方法上时,锁住的便是对象实例(this);当作用在静态方法时锁住的便是对象对应的Class实例。
synchronized同步锁在jdk1.6之前非常慢,以至于被Lock完爆。1.6进行一系列优化后(偏向锁,轻量锁,锁粗化)性能与Lock差不多。下面说一说1.6之后的synchronized。
这是对象头的MarkWord部分。MarkWord会根据对象状态复用空间,一共有4种状态,无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态:
MarkValue | status | 状态 |
---|---|---|
对象哈希码、对象分代年龄 | 01 | 无锁 |
锁记录的地址 | 00 | 轻量锁 |
Monitor的地址 | 10 | 重量锁 |
空,不需要记录信息 | 11 | GC标志 |
偏向线程ID、偏向时间戳、对象分代年龄 | 01 | 偏向锁 |
偏向锁
锁对象第一次被线程获取的时候,虚拟机把对象头的status设置为"01",偏向锁状态,当发生锁重入时,只需要检查MarkValue中的ThreadID是否与当前线程ID相同即可,相同即可直接重入。偏向锁的释放不需要做任何事情,这也就意味着加过偏向锁的MarkValue会一直保留偏向锁的状态,因此即便同一个线程持续不断地加锁解锁,也是没有开销的。
一般偏向锁是在有不同线程申请锁时升级为轻量锁,这也就意味着假如一个对象先被线程1加锁解锁,再被线程2加锁解锁,这过程中没有锁冲突,也一样会发生偏向锁失效,不同的是这回要先退化为无锁的状态,再加轻量锁。
轻量锁
对对象加轻量锁的条件是该对象当前没有被任何其他线程锁住。
加轻量锁时在当前线程的栈帧中生成一个锁记录,然后把MarkValue设为记录的地址,状态置00(CAS操作),如果这一系列操作成功,则表示轻量锁申请成功。如果不成功,说明有线程在竞争,则需要在当前对象上生成重量锁来进行多线程同步。
重量锁
重量锁在JVM中又叫对象监视器(Monitor)。
一旦锁升级成重量级锁,就不会再恢复到轻量级锁状态。
参考资料:
聊聊并发(二)——Java SE1.6中的Synchronized
深入JVM锁机制1-synchronized
JVM锁实现探究2:synchronized深探
3.concurrent包的锁机制
concurrent包是java拓展的多线程工具包。包括很多工具,事实上它的所有工具都基于下面的概念:
CLH队列锁:
CLH队列锁的概念很酷炫,广泛用于多种语言,Java的Lock锁体系全都基于此概念。
一个链表,每个节点包含一个线程与状态位。状态位默认true,释放解锁时为false。
lock时开始获取锁,把当前线程封装进节点,加进链表尾。并自旋等待前一节点的状态。
如果前一节点释放锁,状态变为false,本节点自旋结束,线程继续,直到释放锁。
是通过自旋锁和Atomic保证节点插入和移除的原子性。
参考资料:
JAVA并发编程学习笔记之CLH队列锁
AQS:
AQS(AbstractQueuedSynchronizer)是整个 java.util.concurrent包的基础。典型的模版方法模式。
AQS以CLH锁作为基础而设计。在等待机制上由原来的自旋改成阻塞唤醒。
-
等待机制:ASQ将线程封装到一个Node里面,并维护一个CHL Node FIFO队列。但不是无脑自旋。自旋里会用
LockSupport.park()
来暂停自己。当一个节点释放时会LockSupport.unpark(Thread t)
来唤醒下一个节点的线程。然后一次自旋做相应处理。 -
超时措施:使用了超时设置,则不会用
LockSupport.park()
阻塞。而是判断spinForTimeoutThreshold变量阀值,它是决定使用自旋方式消耗时间还是使用系统阻塞方式消耗时间的分割线,juc工具包作者通过测试将默认值设置为1000ns,即如果在成功插入等待队列后剩余时间大于1000ns则调用系统底层阻塞,否则不调用系统底层,取而代之的是仅仅让之在Java应用层不断循环消耗时间,属于优化的措施。超时后跳出循环。 -
公平:AQS可以设置是否公平,他虽然是一个FIFO但有一个闯入策略。新来线程会与头节点线程竞争。
-
status:整个AQS框架核心功能都是围绕着其32位整型属性state进行,不过是提供给你自由发挥,一般可以说它表示锁的数量,对同步状态的控制可以实现很多同步工具,例如重入锁、信号量、栅栏等等。
为了保证可见性此变量被声明为volatile,保证每次的原子更新都将及时反映到每条线程上。有如下方法操作:
getState()
普通的get
setState(int newState)
普通的set
compareAndSetState(int expect, int update)
CAS方式的硬件级别的原子更新。返回成功或失败。
参考资料:
ReentrantLock实现原理深入探究
并发新特性—Lock 锁与条件变量
ReentrantLock
ReentrantLock是1个可重入独占锁。完全拥有synchronized
的功能,并可以提供公平/非公平2种方式。
ReentrantLock内部拥有FairSync
与NonfairSync
2个AQS来负责公平与非公平的情况。
这是ReentrantLock的公平模式的lock时尝试获取锁的逻辑。
abstract static class Sync extends AbstractQueuedSynchronizer {……}
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//等于0即当前没有被锁,开始竞争!。
if (!hasQueuedPredecessors() &&/*队列是否已有元素(有表示竞争失败)*/
compareAndSetState(0, acquires)) {//compareAndSetState也成功就表示竞争成功
setExclusiveOwnerThread(current);//设置本线程为独占锁持有者线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//锁已被使用,但使用者的线程就是自己,这就是重入的情况。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//state+1
return true;
}
//其他情况则表示不能获取锁。
return false;
}
}
Condition
Condition是与Lock结合使用的,通过Lock#newCondition()
方法能够创建与Lock绑定的Condition实例。
await()
对应于Object#wait()
signal()
对应于Object#notify()
signalAll()
对应于Object#notifyAll()
。
当然这几个方法与synchronized
一样,需要先Lock#lock()
获取锁。
BlockingQueue
一个完全因生产消费者概念而出现的概念。
常用子类有ArrayBlockingQueue
,LinkedBlockingQueue
构造函数设置一个容量。默认为最大Int。
加入数据时检查是否达到容量上限,到达上限就阻塞直到被消费;
取出数据时检查是否已经没有数据,没有就阻塞直到添加数据。
内部使用ReentrantLock
+Condition
来实现。
ReentrantReadWriteLock
读写锁:允许读读,不允许读写,写写并发。是一个可重入共享锁。
Semaphore
信号量:操作系统的信号量是个很重要的概念,设定N个许可(i=N),线程拿1个许可(i--)去访问资源,访问结束后释放许可(i++)。i=0时就获取不到许可,就等待许可。
Java中的Semaphore也是通过AQS实现。构造方法传入许可数,通过简单的2个方法semp.acquire()
,semp.release()
。许可数为1时等同于ReentrantLock。
CountDownLatch
可以实现类似计数器的功能。用这个计数器阻塞的线程,会在计数变为0时解除阻塞。
构造方法传入计数,通过简单2个方法 await()
,countDown()
。
CyclicBarrier
回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
CyclicBarrier的构造函数传入栅栏数量,使用await()
阻塞当前线程,当阻塞的线程数等于栅栏的数量时,同时放行所有线程。
线程池
线程池主要是为了解决线程创建与线程销毁带来的性能消耗。
Java通过Executors提供四种线程池,分别为:
- newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 - newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 - newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。 - newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
实现原理其实也是无限循环处理Runable。多个线程共享一个任务队列,用LinkedBlockingQueue
实现。
翻译自void execute(Runnable command)
源码注释,稍作修改:
执行Runable分3步:
- 如果运行的线程比corePoolSize少,就尝试开始新建1个线程并把这个任务作为他的第一个任务,原子性的调用addWorker并检查runState 和 workerCount,不需要加进队列。
- 尝试加进队列,如果成功,我们还是需要进行双重检查,检查我们是否需要添加1个新线程(因为存在在后一次检查时线程池唯一一条线程却结束的情况),检查线程池是否已结束,已结束就拒绝掉当前任务。
- 如果我们不能加进队列,我们会尝试添加1个新的线程,如果失败我们就知道我们被关闭或者我们已经饱和。所以拒绝掉这个任务。
线程池里装的是Worker,Worker是1个封装了1个Thread的AQS。
按上面的规则直接新建1个带有初始任务的Worker或者把任务加进队列,新建1个空Worker。
new了Worker就会启动它的线程。这个线程的run()
里直接执行了下面这个方法(代码已简化):
void runWorker(Worker w){
w.unlock(); // allow interrupts
while (task != null || (task = getTask()) != null) {
w.lock(); // 简单利用AQS实现互斥锁,不能重入。
beforeExecute(wt, task);
try {
task.run();
}finally {
afterExecute(task, thrown);
}
w.unlock();
}
}
private Runnable getTask()
使用LinkedBlockingQueue
获取Runable。所以在没有待执行任务时会阻塞直到被唤醒。
参考资料:
Java四种线程池的使用