(一 初步了解并发编程)
一 并发编程相关
引用了一些博客的资料,并非全部原创。
前言:
-
只要数据需要被跨线程共享,就需要进行恰当的同步。否则可能会访问到过期值。java允许将64位的读或写操作划分为两个32位的操作,如果读和写发生在不同的线程,可能会得到一个值的高32位和另一个值的低32位。
-
锁不仅是关于同步与互斥的,也是关于内存可见的。为了保证所有线程能够看到共享的,可变变量的最新值,读取和写入线程必须使用公共的锁进行同步。当线程B执行到与A相同的锁监视的同步块时,A在同步块之中或之前所作的一切都是对B可见的。当访问一个共享的可变变量时,要保证所有线程都由同一个锁进行同步。
1 并发的常见问题
无状态对象永远是线程安全的,因为两个线程不共享状态,它们如同在访问不同的实例。
当计算的正确性依赖于运行时中相关的时序或者多线程的交替时,会产生竞争条件;换句话说,想得到正确的答案,要依赖于“幸运的”时序。(使用潜在的过期观察值来作决策或执行计算,这种竞争条件被称作检查再运行)。
检查再运行的常见方法:惰性初始化
2 Atomicxxx
Atomicxxxx类本身的一个方法是原子性的,如add(貌似也能保证可见性),是用很底层的方法实现,所以效率很高。
但是Atomic的多个方法并不构成原子性,还是要加锁。所以为了保护状态的一致性,要在单一的原子操作中更新相互关联的状态变量。
3 synchronized
- synchronized 锁定的是对象;
- 代码块是原子操作,不可分。
- synchronize作为多线程关键字,是一种同步锁,它可以修饰以下几种对象:
代码块:被修饰的代码块称为同步语句块,其作用的范围是大括号{ }里的代码,作用的对象是调用这个代码块的对象;
方法:被修饰的方法称为同步方法,其作用的范围是整个方法,作用的对象是调用这个方法的对象
静态方法:作用的范围是整个静态方法,作用的对象是这个类的所有对象
类:作用的范围是synchronize后面括号里的部分,作用的对象是这个类的所有对象.
对象锁:对象只有一个,可作为一个锁
1.1、代码块形式:手动定义一个锁对象,比如
Object lock = new Object();
synchronized(lock ){
//代码块
......
}
1.2、方法锁形式:synchronized指定修饰普通方法,锁对象默认为this
2、类锁:Java类可以实例多个对象,但只有一个class类,所以可以作为一个类锁
2.1、synchronized加在static静态方法上,,static为静态修饰类,加载实例时这个方法只有一个class静态方法。
2.2、synchronized(*.class)类申明
以下博客对synchronized的用法讲的很详细,不再赘述。
- 无论通过正常控制路径退出,还是从块中抛出异常,线程都在放弃synchronized块的控制时自动释放锁。获得内部锁的唯一途径是:进入这个内部锁保护的同步块或方法。内部锁在java中扮演了互斥锁的角色,意味着至多只有一个线程可以拥有锁,当线程A尝试请求一个被线程B占有的锁时,线程A必须等待或者阻塞,直到B释放它。如果B永远不释放锁,A将永远等下去。同一时间,只有一个线程可以运行特定锁保护的代码块。因此,由同一个锁保护的synchronized块会各自原子的执行,不会互相干扰。但是这种方法在统一时间内只有一个线程可以进入service方法,所以太过极端,完全禁止多个用户同时使用,所以会导致糟糕的响应性。要尽量从synchronized块中分离耗时的且不影响共享状态的操作。
4 关于Reentrancy(重进入)
内部锁是可重入的,基于“每线程”,而不是“每调用”,重进入的实现是通过为每个锁关联一个请求计数和一个占有它的线程。当计数为0时,认为锁是未被占有的。线程请求,则jvm封锁占有者,并记为1,若同一线程再次请求,计数将递增。每次线程退出,计数递减。如:子类调用父类。一个同步方法可以调用另外一个同步方法,因为在同一个线程中,相当于在自己的锁上加了数字,由1变成2,可重入。子类的同步方法可以调用父类的同步方法,可重入的第二种。
5 关于volatile(无锁同步)
由于各线程有自己的内存或缓冲区,因为jmm的设置,所以线程会把修改或读操作放入CPU的缓冲区,把数据放入自己CPU缓冲区后便不会再从内存读,即使另外的线程改变了自己线程内的值,它也不会知道。
(不过如果有一点空闲的时候,CPU可能就会去内存里刷新数据,(有可能,不确定)但是这样不太好,还是要保证线程之间可见性),但是只能让线程1及时知道线程2对共享数据的更改,却并不能保证线程1执行操作基于的数据有没有被刷掉,
总而言之,
-
java语言也提供了选择,即一种弱同步的形式:volatile变量。它确保对一个变量的更新以可预见的方式告知其他的线程。当一个域被声明为volatile类型后,编译器与运行时会监控这个变量,它是共享的,而且它的操作不会与其他的内存操作一样被重排序。volatile变量不会缓存在寄存器或者缓存在对其他处理器隐蔽的地方,所以访问volatile类型的变量时,总会返回由某一线程所写入的最新值。保证了两种特性:1 可见性,2 禁止指令重排序。
volatile(只保证了可见性)是自己线程内部的数据被其他线程修改后,会有机制专门通知自己的缓冲区,自己再重新读。 -
访问volatile的变量的操作不会加锁,也就不会引起执行线程的阻塞,这使得volatile变量相对于sychronized而言,只是轻量级的同步机制。
-
但是volatile的语义不支持互斥,满足不了 count++的原子操作,除非只有一个线程。即volatile并不能保证多个线程修改某一字段所带来的不一致问题。
因为volatile只是保证读回来的是对的(如100),但是写回去时候(101)不会再检查是否已经超过100。
若想要保证原子性,则要用synchronized(可见性,原子性),但是这个太重了,效率比volatile低很多。
加锁可以保证可见性与原子性;volatile变量只能保证可见性。
6 wait,notify,sleep
java的锁基本上只能锁在堆里,不能锁栈,因为是直接锁在对象上,并不是锁引用。
wait()方法可以使调用该线程的方法释放持有当前对象的锁,然后从运行状态退出,进入等待队列,直到再次被唤醒。
notify()方法可以随机唤醒等待队列中等待的一个线程,并使得该线程退出等待状态,进入可运行状态.但是调用之前必须保证对象已经加锁了,才能调用。
public class ThreadNotify {
private Object lock;
public ThreadNotify(Object lock) {
this.lock = lock;
}
public void testNotify() {
try {
synchronized (lock) {
System.out.println("start notify........");
lock.notify();
System.out.println("end notify........");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ThreadWait {
private Object lock;
public ThreadWait(Object lock) {
this.lock = lock;
}
public void testWait() {
try {
synchronized (lock) {
System.out.println("start wait........");
lock.wait();
System.out.println("end wait........");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ThreadWaitNotifyDemo {
public static void main(String[] args) throws Exception {
Object lock = new Object();
Thread waitThread = new Thread(() -> {
ThreadWait threadWait = new ThreadWait(lock);
threadWait.testWait();
});
Thread notifyThread = new Thread(() -> {
ThreadNotify threadNotify = new ThreadNotify(lock);
threadNotify.testNotify();
});
waitThread.start();
/**
* 保证waitThread一定会先开始启动
*/
Thread.sleep(1000);
notifyThread.start();
}
}
打印结果:
start wait........
start notify........
end notify........
end wait........
死循环监测有时很耗CPU,wait会释放锁,notify不会释放锁,sleep不释放锁。所以自己占着锁的情况下,要等自己先执行完。notify会叫醒其中一个等待的线程。
notify一般叫醒别的线程,但是也可以叫醒自己。不能指定线程。
wait会让出锁对象。
7 CountDownLatch
只涉及通信,不涉及同步时候可以采用CountDownLatch;
调用countDown往下数,当初始化的数变成0时,门闩就开了,就可以继续往下执行。
- CountDownLatch 提供 #await() 方法,来使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断.
- CountDownLatch 提供 #countDown() 方法,递减锁存器的计数。如果计数到达零,则唤醒所有等待的线程。
- getCount()
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(5);
/**
* Boss线程,等待员工到达开会
*/
static class BossThread extends Thread{
@Override
public void run() {
System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");
try {
//Boss等待
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有人都已经到齐了,开会吧...");
}
}
// 员工到达会议室线程
static class EmpleoyeeThread extends Thread{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ",到达会议室....");
//员工到达会议室 count - 1
countDownLatch.countDown();
}
}
public static void main(String[] args){
//Boss线程启动
new BossThread().start();
for(int i = 0 ; i < countDownLatch.getCount() ; i++){
new EmpleoyeeThread().start();
}
}
}
输出结果:
image.png
8 Reentrantlock(更灵活)
jdk中独占锁的实现除了使用关键字synchronized外,还可以使用ReentrantLock。虽然在性能上ReentrantLock和synchronized没有什么区别,都是独占锁且都是可重入的,但ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。
- 独占锁且可重入:
public class ReentrantLockTest {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
for (int i = 1; i <= 3; i++) {
lock.lock();
}
for(int i=1;i<=3;i++){
try {
} finally {
lock.unlock();
}
}
}
}
- reentrantlock是手工锁,必须要手动上锁,手动释放;
- 在使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放。
- syschronized如果等不到锁,会死等下去;但是reentrantlock(比syschronized更灵活一些,但是这两个效率差不多)可以有选择的,尝试的等,也可以指定等待时间,如果等不到,就去干别的事情。
- trylock可以去尝试获得锁,写不同情况的逻辑。
- lockInterruptibly可打断,如以下代码由于thread的退让,使得产生中断而不会产生死锁。
public class ReentrantLockTest {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new ThreadDemo(lock1, lock2));//该线程先获取锁1,再获取锁2
Thread thread1 = new Thread(new ThreadDemo(lock2, lock1));//该线程先获取锁2,再获取锁1
thread.start();
thread1.start();
thread.interrupt();//是第一个线程中断
}
static class ThreadDemo implements Runnable {
Lock firstLock;
Lock secondLock;
public ThreadDemo(Lock firstLock, Lock secondLock) {
this.firstLock = firstLock;
this.secondLock = secondLock;
}
@Override
public void run() {
try {
firstLock.lockInterruptibly();
TimeUnit.MILLISECONDS.sleep(10);//更好的触发死锁
secondLock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
firstLock.unlock();
secondLock.unlock();
System.out.println(Thread.currentThread().getName()+"正常结束!");
}
}
}
}
- reentrantlock可以初始化公平锁。线程1执行完,等待时间长的是线程2,所以线程2执行。同理,接着线程1执行。
public class ReentrantLockTest {
static Lock lock = new ReentrantLock(true);//true即为公平锁
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<5;i++){
new Thread(new ThreadDemo(i)).start();
}
}
static class ThreadDemo implements Runnable {
Integer id;
public ThreadDemo(Integer id) {
this.id = id;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i=0;i<2;i++){
lock.lock();
System.out.println("获得锁的线程:"+id);
lock.unlock();
}
}
}
}
运行结果:
image.png
- 使用entrantlock ,中的lock condition可以精确的指定哪些线程被叫醒,哪些不叫醒。Condition可以实现多路通知功能,也就是在一个Lock对象里可以创建多个Condition(即对象监视器)实例,线程对象可以注册在指定的Condition中,从而可以有选择的进行线程通知,在调度线程上更加灵活。
如下是使用condition的小例:
public class ConditionTest {
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
lock.lock();
new Thread(new SignalThread()).start();
System.out.println("主线程等待通知");
try {
condition.await();
} finally {
lock.unlock();
}
System.out.println("主线程恢复运行");
}
static class SignalThread implements Runnable {
@Override
public void run() {
lock.lock();
try {
condition.signal();
System.out.println("子线程通知");
} finally {
lock.unlock();
}
}
}
}
image.png
用condition实现简单的阻塞队列:
public class MyBlockingQueue<E> {
int size;//阻塞队列最大容量
ReentrantLock lock = new ReentrantLock();
LinkedList<E> list=new LinkedList<>();//队列底层实现
Condition notFull = lock.newCondition();//队列满时的等待条件
Condition notEmpty = lock.newCondition();//队列空时的等待条件
public MyBlockingQueue(int size) {
this.size = size;
}
public void enqueue(E e) throws InterruptedException {
lock.lock();
try {
while (list.size() ==size)//队列已满,在notFull条件上等待
notFull.await();
list.add(e);//入队:加入链表末尾
System.out.println("入队:" +e);
notEmpty.signal(); //通知在notEmpty条件上等待的线程
} finally {
lock.unlock();
}
}
public E dequeue() throws InterruptedException {
E e;
lock.lock();
try {
while (list.size() == 0)//队列为空,在notEmpty条件上等待
notEmpty.await();
e = list.removeFirst();//出队:移除链表首元素
System.out.println("出队:"+e);
notFull.signal();//通知在notFull条件上等待的线程
return e;
} finally {
lock.unlock();
}
}
}
测试代码
public static void main(String[] args) throws InterruptedException {
MyBlockingQueue<Integer> queue = new MyBlockingQueue<>(2);
for (int i = 0; i < 10; i++) {
int data = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
queue.enqueue(data);
} catch (InterruptedException e) {
}
}
}).start();
}
for(int i=0;i<10;i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
Integer data = queue.dequeue();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}