线程安全问题的几种解决办法

2019-10-26  本文已影响0人  想54256

title: 线程安全问题
date: 2019/04/01 17:13


线程安全问题是由于多个线程同时操作同一全局变量或静态变量引起的。

由于线程休眠的特性,从哪休眠就从哪继续执行(一个线程的事情还没干完就被其他线程挤下去了),回来继续干就会导致操作的全局变量或静态变量出现问题。

image

为了解决这个问题,我们就需要让线程执行完毕(不能被其他线程挤下去),以下是几种解决办法。

一、锁的基本概念

1、锁的分类(宏观角度)

1.1 乐观锁

乐观锁是一种乐观思想,即认为读多写少遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,采取在写时先读出当前版本号,然后加锁操作(比较跟上一次的版本号,如果一样则更新),如果失败则要重复读-比较-写的操作。

java中的乐观锁基本都是通过CAS操作实现的,CAS是一种更新的原子操作,比较当前值跟传入值是否一样,一样则更新,否则失败。

CAS(compare and swap 比较并交换):

CAS是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。 —— Wiki

1.2 悲观锁

悲观锁是就是悲观思想,即认为写多,遇到并发写的可能性高,每次去拿数据的时候都认为别人会修改,所以每次在读写数据的时候都会上锁,这样别人想读写这个数据就会阻塞到拿到锁。java中的悲观锁就是synchronized。

继承AQS(AbstractQueuedSynchronizer)的锁则是先尝试cas乐观锁去获取锁,获取不到才会转换为悲观锁,如ReentrantLock。

1.3 采用悲观锁带来的性能问题

悲观锁 -> 线程需要不断的进行上下文切换 -> 需要操作系统的介入 -> 上下文切换时间比同步代码执行的时间都长

Java中的线程就是操作系统原生线程,如果要阻塞或唤醒一个线程就需要操作系统介入,需要在用户态与核心态之间切换,这种切换会消耗大量的系统资源,因为用户态与内核态都有各自专用的内存空间,专用的寄存器等,用户态切换至内核态需要传递给许多变量、参数给内核,内核也需要保护好用户态在切换时的一些寄存器值、变量等,以便内核态调用结束后切换回用户态继续工作。

synchronized会导致拿不到锁的线程进入阻塞状态,所以说它是重量级锁。为了缓解上述性能问题,JVM从1.5开始,引入了轻量锁与偏向锁,默认启用了自旋锁,他们都属于乐观锁。

2、Java中的锁

重量级锁、自旋锁、轻量级锁和偏向锁。随着锁的竞争,锁可以从偏向锁升级到轻量级锁,再升级的重量级锁(但是锁的升级是单向的,也就是说只能从低到高升级,不会出现锁的降级)。

不同的锁有不同特点,每种锁只有在其特定的场景下,才会有出色的表现,java中没有哪种锁能够在所有情况下都能有出色的效率,引入这么多锁的原因就是为了应对不同的情况;

重量级锁是悲观锁的一种,自旋锁、轻量级锁与偏向锁属于乐观锁。

2.1 自旋锁

自旋锁原理非常简单,如果持有锁的线程能在很短时间内释放锁资源,那么那些等待竞争锁的线程就不需要做内核态和用户态之间的切换进入阻塞挂起状态,它们只需要等一等(自旋),等持有锁的线程释放锁后即可立即获取锁,这样就避免了用户线程和内核的切换的消耗,从而提高运行的效率。

但是线程自旋是需要消耗cpu的,说白了就是让cpu在做无用功,如果一直获取不到锁,那线程也不能一直占用cpu自旋做无用功,所以需要设定一个自旋等待的最大时间

如果持有锁的线程执行的时间超过自旋等待的最大时间还没有释放锁,就会导致其它争用锁的线程在最大等待时间内还是获取不到锁,这时争用线程会停止自旋进入阻塞状态**。

2.1.1 优点

自旋锁尽可能的减少线程的阻塞,这对于锁的竞争不激烈,且占用锁时间非常短的代码块来说性能可以大幅度的提升,因为在这种情况下自旋的消耗会小于线程阻塞挂起再唤醒的操作的消耗,这些操作会导致线程发生两次上下文切换。

2.1.2 缺点

但是如果锁的竞争激烈,或者持有锁的线程需要长时间占用锁执行同步块,这时候就不适合使用自旋锁了,因为自旋锁在获取锁前一直都是占用cpu做无用功,如果同时有大量线程在竞争一个锁,会导致获取锁的时间很长线程自旋的消耗大于线程阻塞挂起操作的消耗,其它需要cpu的线程又不能获取到cpu,造成cpu的浪费。所以这种情况下我们要关闭自旋锁;

2.1.3 自旋锁的时间阈值(1.6之后由JVM控制了)

自旋锁的目的是为了占着CPU的资源不释放,等到获取到锁立即进行处理。但是如何去选择自旋的执行时间呢?如果自旋执行时间太长,会有大量的线程处于自旋状态占用CPU资源,进而会影响整体系统的性能。因此自旋的周期选的额外重要!

JVM对于自旋周期的选择,jdk1.5这个限度是一定的写死的,在1.6引入了适应性自旋锁,适应性自旋锁意味着自旋的时间不在是固定的了,而是由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定,基本认为一个线程上下文切换的时间是最佳的一个时间,同时JVM还针对当前CPU的负荷情况做了较多的优化。

2.2 重量级锁(synchronized)

在JDK1.5之前都是使用synchronized关键字保证同步的,Synchronized的作用相信大家都已经非常熟悉了;

2.2.1 synchronized的实现

图:monitor对象的内部

基本概念:

2.2.2 获取锁的过程

注:Synchronized是非公平锁。 Synchronized在线程进入ContentionList之前,线程会先尝试自旋获取锁,如果获取不到就进入ContentionList,这明显对于已经进入队列的线程是不公平的,还有一个不公平的事情就是自旋获取锁的线程还可能直接抢占OnDeck线程的锁资源。

2.2.3 加锁流程(通俗版)

image

堆内存中的对象包含:

当在对象上加锁时,数据是记录在对象头中。当执行synchronized同步方法或同步代码块时,会在对象头中记录锁标记,锁标记指向的是monitor对象(monitor对象也称为管程或监视器锁)的起始地址。每个对象都存在着一个monitor与之关联,对象与其monitor之间的关系存在多种实现方式,如monitor可以与对象一起创建销毁,也可以当线程试图获取对象锁时自动生成,但当一个monitor被某个线程持有后,它便处于锁定状态。

monitor对象是保存在栈上,不在堆内存中也不在栈帧中

每一个线程对应一个monitor对象,当线程拿这个对象进行加锁的时候,会将对象头中的锁信息指向monitor对象,当B线程拿这个对象进行加锁时,发现它已经有一个monitor对象了(monitor对象是互斥的)那么这个B线程将进入锁池状态。

在Java虚拟机(HotSpot)中,monitor(接口 java native interface)是由ObjectMonitor(实现类)实现的。

ObjectMonitor中有两个队列(_WaitSet和_EntryList)和_Owner标记。其中_WaitSet 是用于管理等待队列(obj.wait())线程的,_EntryList是用于管理锁池阻塞线程的,_Owner 标记用于记录当前执行线程。线程状态图如下:

image

当多线程并发访问同一个同步代码时,首先会进入_EntryList,当线程获取锁标记后,monitor中的_Owner记录此线程,并在monitor中的计数器执行递增计算(+1),代表锁定,其他线程在_EntryList中继续阻塞。若执行线程调用wait方法,则monitor中的计数器执行赋值为0计算,并将_Owner标记赋值为null,代表放弃锁,执行线程进入_WaitSet 中阻塞。若执行线程调用 notify/notifyAll方法,_WaitSet中的线程被唤醒,进入_EntryList中阻塞,等待获取锁标记。若执行线程的同步代码执行结束,同样会释放锁标记,monitor中的_Owner标记赋值为null,且计数器赋值为0计算。

3、轻量级锁(针对synchronized进行优化)

“轻量级”是相对于使用操作系统互斥量来实现的传统锁而言的。但是,首先需要强调一点的是,轻量级锁并不是用来代替重量级锁的,它的本意是在没有多线程竞争的前提下,减少传统的重量级锁使用产生的性能消耗。在解释轻量级锁的执行过程之前,先明白一点,轻量级锁所适应的场景是线程交替执行同步块(2个线程)的情况,如果存在同一时间3个线程访问同一把锁的情况,就会导致轻量级锁膨胀为重量级锁。

3.1 轻量级锁的加锁/放锁过程

正确版

当偏向锁不满足,有2个线程并发访问,锁定同一个对象的时候,先提升为轻量级锁,它也是使用标记ACC_SYNCHRONIZED标记进行记录锁状态的,使用ACC_UNSYNCHRONIZED标记未获取到锁信息的线程。就是只有两个线程争抢锁标记的时候,优先使用轻量级锁。

但是两个线程也可能出现重量级锁。

轻量级锁示意图:

image

同一时间只有一个线程持有monitor,但同一时间一个monitor监视着记录多个线程

4、偏向锁(针对synchronized进行优化)

引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令(由于一旦出现多线程竞争的情况就必须撤销偏向锁,所以偏向锁的撤销操作的性能损耗必须小于节省下来的CAS原子指令的性能消耗)。上面说过,轻量级锁是为了在线程交替执行同步块时提高性能,而偏向锁则是在只有一个线程执行同步块时进一步提高性能

4.1 偏向锁的加锁/放锁过程

正确版

5、锁优化

以上介绍的锁不是我们代码中能够控制的,但是借鉴上面的思想,我们可以优化我们自己线程的加锁操作;

5.1 减少锁的时间

不需要同步执行的代码,能不放在同步快里面执行就不要放在同步快内,可以让锁尽快释放;

5.2 减少锁的粒度

它的思想是将物理上的一个锁,拆成逻辑上的多个锁,增加并行度,从而降低锁竞争。它的思想也是用空间来换时间;

拆锁的粒度不能无限拆,最多可以将一个锁拆为当前cup数量个锁即可;

Java中很多数据结构都是采用这种方法提高并发操作的效率:

5.2.1 ConcurrentHashMap(1.7版本之前 采用分段锁)

java中的ConcurrentHashMap在jdk1.8之前的版本,使用一个Segment数组

Segment< K,V >[] segments

Segment继承自ReenTrantLock,所以每个Segment就是个可重入锁,每个Segment有一个HashEntry<K,V>数组用来存放数据,put操作时,先确定往哪个Segment放数据,只需要锁定这个Segment,执行put,其它的Segment不会被锁定;所以数组中有多少个Segment就允许同一时刻多少个线程存放数据,这样增加了并发能力。

5.2.2 LongAdder

LongAdder 实现思路也类似ConcurrentHashMap,LongAdder有一个根据当前并发状况动态改变的Cell数组,Cell对象里面有一个long类型的value用来存储值;

开始没有并发争用的时候或者是cells数组正在初始化的时候,会使用cas来将值累加到成员变量base上,在并发争用的情况下,LongAdder会初始化cells数组,在Cell数组中选定一个Cell加锁,数组有多少个cell,就允许同时有多少线程进行修改,最后将数组中每个Cell中的value相加,在加上base的值,就是最终的值;cell数组还能根据当前线程争用情况进行扩容,初始长度为2,每次扩容会增长一倍,直到扩容到大于等于cpu数量就不再扩容,这也就是为什么LongAdder比cas和AtomicInteger效率要高的原因,后面两者都是volatile+cas实现的,他们的竞争维度是1,LongAdder的竞争维度为“Cell个数+1”为什么要+1?因为它还有一个base,如果竞争不到锁还会尝试将数值加到base上;

5.3 锁粗化

大部分情况下我们是要让锁的粒度最小化,锁的粗化则是要增大锁的粒度;

在以下场景下需要粗化锁的粒度:

假如有一个循环,循环内的操作需要加锁,我们应该把锁放到循环外面,否则每次进出循环,都进出一次临界区(虽然锁可重入,但是其中还是有一些操作的),效率是非常差的;

5.4 使用读写锁

ReentrantReadWriteLock是一个读写锁,读操作加读锁,可以并发读,写操作使用写锁,只能单线程写;

5.5 读写分离

CopyOnWriteArrayList 、CopyOnWriteArraySet

CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

CopyOnWrite并发容器用于读多写少的并发场景,因为,读的时候没有锁,但是对其进行更改的时候是会加锁的,否则会导致多个线程同时复制出多个副本,各自修改各自的;

class CopyOnWriteArrayList {
    // volatile保证了获取到的数据都是最新的,而且防止了指令重排序。
    private volatile List<String> list = new ArrayList();

    // synchronized保证了添加不会乱
    public synchronized void add(int index, String a) {
        。。。
    }

    // 所以get操作还是可能会读取到脏数据的。
    public String get(int index) {
        return list.get(index);
    }
}

5.6 使用cas

如果需要同步的操作执行速度非常快,并且线程竞争并不激烈,这时候使用cas效率会更高,因为加锁会导致线程的上下文切换,如果上下文切换的耗时比同步操作本身更耗时,且线程对资源的竞争不激烈,使用volatile+cas操作会是非常高效的选择;

二、锁的使用

1、synchronized关键字

synchronized可以锁的是对象。它可以锁的对象包括:this、临界资源对象、Class类对象。

加锁的目的:就是为了保证操作的原子性

在商业开发中避免使用同步方法而选用同步代码块;细粒度解决同步问题,提高效率。

1.1 锁临界资源对象

public class Test_01 {
    private int count = 0;
    private final Object o = new Object();  // 临界资源对象:多个线程都能访问到的对象;所以理论上这个对象是不允许修改的(final)

    // 锁的o
    public void testSync1(){
        synchronized(o){
            System.out.println(Thread.currentThread().getName() + " count = " + count++);
        }
    }
}

1.2 锁当前对象

当线程执行到这段代码块的时候会获取当前对象并使用当前对象加锁,直到代码块执行结束;如果期间再有别的线程来的时候,拿到当前对象进行加锁,发现是不行的,就会等待(可以获取当前对象,但不能用当前对象加锁)

    // 锁的this:
    public void testSync2(){
        synchronized(this){
            System.out.println(Thread.currentThread().getName() + " count = " + count++);
        }
    }

    // 锁的也是this
    public synchronized void testSync3(){
        System.out.println(Thread.currentThread().getName() + " count = " + count++);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

1.3 锁当前类对象

public class Test_02 {
    private static int staticCount = 0;

    // 静态方法上的synchronized,锁的是当前类型的类对象 在本代码中就是Test_02.class
    public static synchronized void testSync4() {
        System.out.println(Thread.currentThread().getName() + " staticCount = " + staticCount++);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void testSync5() {
        synchronized (Test_02.class) {
            System.out.println(Thread.currentThread().getName() + " staticCount = " + staticCount++);
        }
    }
}

1.4 synchronized只影响锁定同一个锁对象的同步方法

同步方法只影响锁定同一个锁对象的同步方法。不影响其他线程调用非同步方法,或调用其他锁资源的同步方法。

public class Test_04 {

    // 真真切切在java堆中的同一个对象
    private final Object o = new Object();


    // 在这个demo中m1方法锁的是this(当前对象),m3方法锁的是o,当程序运行时m1和m3没有等待过程
    // 但如果他们锁的都是this,那么m3会在m1执行完后才会执行
    // 还是那句话,synchronized()中的对象锁住之后,可以拿到对象,但不能用这个对象加锁
    public synchronized void m1(){
        System.out.println("public synchronized void m1() start");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("public synchronized void m1() end");
    }
    
    public void m3(){
        synchronized(o){
            System.out.println("public void m3() start");
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("public void m3() end");
        }
    }
}

1.5 synchronized是可重入锁

可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,并且不发生死锁(前提得是同一个对象或者class),这样的锁就叫做可重入锁。

当线程持有锁时,会在monitor的计数器中执行递增计算,若当前线程调用其他同步代码,且同步代码的锁对象相同时,monitor中的计数器继续递增。每个同步代码执行结束,monitor中的计数器都会递减,直至所有同步代码执行结束,monitor中的计数器为0时,释放锁标记,并将_Owner标记赋值为null。

public class Test_06 {
    
    private synchronized void m1(){ // 锁this
        System.out.println("m1 start");
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 调用m2方法
        m2();
        System.out.println("m1 end");
    }

    private synchronized void m2(){ // 锁this
        System.out.println("m2 start");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("m2 end");
    }
}

1.6 同步方法的继承

子类同步方法覆盖父类同步方法。可以指定调用父类的同步方法。相当于锁的重入

public class Test_07 {
    
    synchronized void m(){
        System.out.println("Super Class m start");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Super Class m end");
    }
}

class Sub_Test_07 extends Test_07{
    synchronized void m(){
        System.out.println("Sub Class m start");
        super.m();
        System.out.println("Sub Class m end");
    }
}

1.7 同步方法撞上异常

当同步方法中发生异常的时候,自动释放锁资源。不会影响其他线程的执行。

如果在业务逻辑中遇到异常,要在catch中将数据恢复成原来的样子

public class Test_08 {
    private int i = 0;

    private synchronized void m(){
        System.out.println(Thread.currentThread().getName() + " - start");
        while(true){
            i++;
            System.out.println(Thread.currentThread().getName() + " - " + i);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if(i == 5){
                i = 1/0;
            }
        }
    }
}

1.8 锁对象变更问题

public class Test_13 {
    Object o = new Object();
    
    private void m(){
        System.out.println(Thread.currentThread().getName() + " start");
        synchronized (o) {
            while(true){
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " - " + o);
            }
        }
    }
    
    public static void main(String[] args) {
        Test_13 t = new Test_13();

        // 启动线程1,使用的是【o】进行上的锁
        new Thread(t::m(), "thread1").start();
        
        // 修改o引用的堆上的对象
        t.o = new Object();

        // 启动线程2,使用的是【新o】进行上的锁
        Thread thread2 = new Thread(t::m, "thread2").start();
    }
    
}

同步代码一旦加锁后,那么会有一个临时的锁引用连接到堆上的对象,和真实的引用无直接关联。
在锁未释放之前,修改真实的对象引用,不会影响同步代码的执行。

synchronized(o)是将当前堆上的o锁定了,并不是存的o的引用,所以上面的程序会出现两个线程同时执行的情况。

上面的可以与下面的demo是同样的道理

public int func() {
    int i = 0;

    try {
        return i;
    } finally {
        i = 10
    }
}

return的流程如下:

return i ->
int _returnValue = i;   // 0;  先定义了个临时变量
return _returnValue;    // return的是这个临时变量

1.9 常量问题

在定义同步代码块时,不要使用常量对象作为锁对象。一些字符串和数字在常量池中(常量池在方法区内),而new关键字,一定是在堆中创建一个新的对象

2、volatile关键字

作用:往常线程都是将某个变量复制一份到自己的线程中,经过volatile字段修饰后,就统一的从内存中读取

原理:volatile本质是在告诉jvm当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取;

被volatile修饰的变量具有synchronized的可见性,但是不具备原子性

被volatile修饰的变量不能用作线程安全计数器。虽然增量操作(x++)看上去类似一个单独操作,实际上它是一个由【读取->修改->写入】操作序列组成的组合操作,必须以原子方式执行,而volatile不能提供必须的原子特性。

用处:如果读操作远远大于写操作,volatile变量还可以提供优于锁的性能优势

与synchronized字段的区别:

我们知道现代的CPU为了优化性能,计算时一般不与内存直接交互。一般先把数据从内存读取到CPU内部缓存再进行操作。而不同线程可能由不同的CPU内核执行,很可能会导致某变量在不同的处理器中保存着2个不同副本的情况,导致数据不一致,产生意料之外的结果。

3、互斥锁(ReentrantLock)

它也是重入锁,为了避免synchronized的使用而设计出来的(由于synchronized之前效率太低,但是jdk在1.7对其进行了优化,现在已经很好了)

使用互斥锁的时候必须手工释放锁标记。一般都是在 finally 代码块中定义释放锁标 记的 unlock 方法。

3.1 基本使用

public class Test_01 {
    private Lock lock = new ReentrantLock();
    
    private void m1(){
        try{
            lock.lock(); // 加锁,在对象上加标记
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            lock.unlock(); // 解锁
        }
    }
}

3.2 尝试锁

public class Test_02 {
    private Lock lock = new ReentrantLock();
    
    private void m1(){
        try{
            lock.lock();
            for(int i = 0; i < 10; i++){
                TimeUnit.SECONDS.sleep(1);
                System.out.println("m1() method " + i);
            }
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }
    
    private void m2(){
        boolean isLocked = false;
        try{
            // 尝试锁,如果有锁,无法获取锁标记,返回false。如果获取锁标记,返回true
            // isLocked = lock.tryLock();
            
            // 阻塞尝试锁,阻塞参数代表的时长,尝试获取锁标记。如果超时,不等待。直接返回。
            isLocked = lock.tryLock(5, TimeUnit.SECONDS); 
            
            if(isLocked){
                System.out.println("m2() method synchronized");
            }else{
                System.out.println("m2() method unsynchronized");
            }
        }catch(Exception e){
            e.printStackTrace();
        }finally{
            if(isLocked){
                // 尝试锁在解除锁标记的时候,一定要判断是否获取到锁标记。如果当前线程没有获取到锁标记,关闭时会抛出异常。
                lock.unlock();
            }
        }
    }
    
    public static void main(String[] args) {
        final Test_02 t = new Test_02();
        new Thread(t::m1).start();

        new Thread(t::m2).start();
    }
}

3.3 拿到互斥锁的线程可被打断

Java中的3种阻塞状态:普通阻塞,等待队列,锁池队列。

public class Test_03 {
    private Lock lock = new ReentrantLock();
    
    private void m1(){
        try{
            lock.lock();
            for(int i = 0; i < 5; i++){
                TimeUnit.SECONDS.sleep(1);
                System.out.println("m1() method " + i);
            }
        }catch(InterruptedException e){
            e.printStackTrace();
        }finally{
            lock.unlock();
        }
    }
    
    private void m2(){
        try{
            // 如果锁被占用的话,在这会阻塞
            lock.lockInterruptibly(); // 可尝试打断该线程的阻塞状态;在阻塞等待锁的过程中,可以被其他的线程打断其阻塞状态(只能打断阻塞状态的线程)
            System.out.println("m2() method");
        } catch(InterruptedException e){    // 如果被打断,会抛出异常
            System.out.println("m2() method interrupted");
        } finally{
            try{
                lock.unlock();
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        final Test_03 t = new Test_03();
        new Thread(t::m1).start();


        Thread t2 = new Thread(t::m2);
        t2.start();


        t2.interrupt();// 打断线程休眠。非正常结束阻塞状态的线程,都会抛出异常。
    }
}

3.4 公平锁/非公平锁

当锁释放的时候,操作系统不会让排队的线程直接拿到锁,而是让排队的线程竞争,这是不公平的;但是ReentrantLock可以实现公平锁。

synchronized是非公平锁

公平锁底层使用的是queue。公平锁尽量少用。

public class Test_04 {

    public static void main(String[] args) {
        TestReentrantlock t = new TestReentrantlock();
        //TestSync t = new TestSync();
        Thread t1 = new Thread(t);
        Thread t2 = new Thread(t);
        t1.start();
        t2.start();
    }
}

class TestReentrantlock extends Thread {
    // 定义一个公平锁
    private static ReentrantLock lock = new ReentrantLock(true);

    public void run() {
        for (int i = 0; i < 5; i++) {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " get lock");
            } finally {
                lock.unlock();
            }
        }
    }
}

4、ThreadLocal

ThreadLocal就相当于一个map,key是当前线程,value是要保存的变量

内存问题 : 在并发量高的时候,可能有内存溢出。

所以使用ThreadLocal的时候,一定注意回收资源问题,每个线程结束之前,将当前线程保存的线程变量一定要删除;ThreadLocal.remove()方法。 参考文档

image

三、并发容器

并发容器包注重以下特性:

1、CopyOnWriteArrayList(COW模型)

1.1 并发读(会有脏读的问题)

cow模型来源于linux系统fork命令,Java中一种使用cow模型来实现的并发类是CopyOnWriteArrayList。相比于Vector,它的读操作是无需加锁的:

public E get(int index) {
     return (E) elements[index];
}

之所以有如此神奇功效,其采取的是空间换取时间的方法,查看其add方法:

public synchronized boolean add(E e) {
     Object[] newElements = new Object[elements.length + 1];
     System.arraycopy(elements, 0, newElements, 0, elements.length);
     newElements[elements.length] = e;
     elements = newElements;
     return true;
}

我们注意到,CopyOnWriteArrayList的add方法是需要加锁的,但其内部并没有直接对elements数组做操作,而是先copy一份当前的数据到一个新的数组,然后对新的数组进行修改和赋值操作。这样做就让get操作从同步中解脱出来。因为更改的数据并没有发生在get所需的数组中,而是放生在新生成的副本中。

1.2 并发遍历

CopyOnWriteArrayList的另一特点是允许多线程遍历,且其它线程更改数据并不会导致遍历线程抛出ConcurrentModificationException异常,来看下iterator():

public Iterator<E> iterator() {
     Object[] snapshot = elements;
     return new CowIterator<E>(snapshot, 0, snapshot.length);
}

CowIterator是ListIterator的子类,这个Iterator(指的是CowIterator)的特点是它并不支持对数据的更改操作

public void add(E object) {
     throw new UnsupportedOperationException();
}
public void remove() {
    throw new UnsupportedOperationException();
}
public void set(E object) {
    throw new UnsupportedOperationException();
}

这样做的原因也很容易理解,我们可以简单地的认为CowIterator中的snapshot是不可变数组,因为list中有数据更新都会生成新数组,而不会改变snapshot, 所以此时Iterator没办法再将更改的数据写回list了。同理,list数据有更新也不会反映在CowIterator中。CowIterator只是保证其迭代过程不会发生异常

2、ConcurrentHashMap<JDK1.8版>(CAS)

CAS是Compare and Swap的简写,即比较与替换,CAS操作将比较和替换封装为一组原子操作,不会被外部打断。这种原子操作的保证是由处理器层面提供支持。

2.1 ConcurrentHashMap的存储结构

ConcurrentHashMap与HashMap对数据的存储有着相似的地方,都采用数组+链表+红黑树的方式。基本逻辑是内部使用Node来保存map中的一项key,value结构,对于hash不冲突的key,使用数组来保存Node数据,而每一项Node都是一个链表,用来保存hash冲突的Node,当链表的大小达到一定程度会转为红黑树,这样会使在冲突数据较多时也会有比较好的查询效率。

2.2 ConcurrentHashMap中对tab进行操作的函数

了解了ConcurrentHashMap的存储结构后,我们来看下在这种结构下,ConcurrentHashMap是如何实现高效的并发操作,这得益于ConcurrentHashMap中的如下三个函数。

/**
 * 1、其中的U就是Unsafe的实例,这三个方法都通过Unsafe的方法保证了原子性
 * 2、tab就是上面说的链表/红黑树
 */

// 返回tab的第i项
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
// 对比tab第i项是否与第c项是否相等,相等的话将其设置为v。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 将tab的第i项设置为v
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putOrderedObject(tab, ((long)i << ASHIFT) + ABASE, v);
}

2.3 put操作的流程

有了这三个函数就可以保证ConcurrentHashMap的线程安全吗?并不是的,ConcurrentHashMap内部也使用比较多的synchronized,不过与HashTable这种对所有操作都使用synchronized不同,ConcurrentHashMap只在特定的情况下使用synchronized,来较少锁的定的区域。来看下putVal方法(精简版):

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to embin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                    ....
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

流程:

2.4 get操作的流程

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

2.5 JDK1.7与JDK1.8中的区别

JDK6,7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度,把HashMap分割成若干个Segment,在put的时候需要锁住Segment,get时候不加锁,使用volatile来保证可见性,当要统计全局时(比如size),首先会尝试多次计算modcount来确定,这几次尝试中,是否有其他线程进行了修改操作,如果没有,则直接返回size。如果有,则需要依次锁住所有的Segment来计算。

jdk7中ConcurrentHashmap中,当长度过长碰撞会很频繁,链表的增改删查操作都会消耗很长的时间,影响性能,所以jdk8中完全重写了concurrentHashmap,代码量从原来的1000多行变成了 6000多 行,实现上也和原来的分段式存储有很大的区别。

主要设计上的变化有以下几点:

3、LinkedBlockingQueue(读写分离)

还有一种实现线程安全的方式是通过将读写进行分离,这种方式的一种实现是LinkedBlockingQueue。LinkedBlockingQueue整体设计的也十分精巧,它的全局变量分为三类:

final型变量由于声明后就不会被修改,所以自然线程安全,Atomic型内部采用了cas模型来保证线程安全。对于普通型变量,LinkedBlockingQueue中只包含head与last两个表示队列的头与尾。并且私有,外部无法更改,所以,LinkedBlockingQueue只需要保证head与last的安全即可保证真个队列的线程安全。并且LinkedBlockingQueue属于FIFO型队列,一般情况下,读写会在不同元素上工作,所以,LinkedBlockingQueue定义了两个可重入锁,巧妙的通过对head与last分别加锁,实现读写分离,来实现良好的安全并发特性:

/** take,poll等操作需要持有的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 等待队列中元素被拿走 */
private final Condition notEmpty = takeLock.newCondition();
/** put,offer等操作需要持有的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 等待队列中元素放置 */
private final Condition notFull = putLock.newCondition();

3.1 offer方法

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

3.2 poll方法

可见,在对队列进行添加元素时,只需要对putLock进行加锁即可,保证同一时刻只有一个线程可以对last进行插入。同样的,在从队列进行提取元素时,也只需要获取takeLock锁来对head操作即可:

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

LinkedBlockingQueue整体还是比较好理解的,但有几个点需要特殊注意:

四、线程池

1、Executor

线程池顶级接口。方法void execute(Runnable)是用于处理任务的一个服务方法。调用者提供Runnable接口的实现,线程池通过线程执行这个Runnable。该方法是无返回值的,原因是Runnable接口中的run方法无返回值。

常用方法:

2、ExecutorService

它是Executor接口的子接口,它提供了一个新的方法submit(),返回值为Future类型。

常见方法:

线程池状态:

3、Future

代表未来的结果

常用方法:

4、Executors

工具类型。可以快速的创建若干种线程池。如:固定容量的,无限容量的,容量为1 等各种线程池。

线程池是一个进程级的重量级资源。默认的生命周期和JVM一致。如果手工调用 shutdown方法,那么线程池执行所有的任务后,自动关闭。

5、FixedThreadPool

容量固定的线程池。活动状态和线程池容量是有上限的线程池。所有的线程池中,都有一个任务队列(BlockingQueue<Runnable>)。当任务数量大于线程池容量的时候,没有运行的任务保存在任务队列中当线程有空闲的,自动从队列中取出任务执行

使用场景: 大多数情况下推荐使用FixedThreadPool;由于OS系统和硬件是有线程支持上限。不能随意的无限制的提供线程。

线程池默认的容量上限是 Integer.MAX_VALUE。

常见的线程池容量: PC - 200。 服务器 - 1000~10000

queued tasks - 任务队列

completed tasks - 结束任务队列

public class Test_02_FixedThreadPool {
    
    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(5);

        for(int i = 0; i < 6; i++){
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " - test executor");
            });
        }
        
        System.out.println(service);
        
        // 优雅关闭
        service.shutdown();
        // 是否已经结束, 相当于回收了资源。
        System.out.println(service.isTerminated());
        // 是否已经关闭, 是否调用过shutdown方法
        System.out.println(service.isShutdown());
        System.out.println(service);
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
    }

}

6、CachedThreadPool

缓存的线程池。容量不限(Integer.MAX_VALUE)。自动扩容。容量管理策略:如果线程池中的线程数量不满足任务执行,创建新的线程。每次有新任务无法即时处理的时候,都会创建新的线程。当线程池中的线程空闲时长达到一定的临界值(默认 60 秒),自动释放线程

ExecutorService service = Executors.newCachedThreadPool();

应用场景:

7、ScheduledThreadPool

计划任务线程池。可以根据计划自动执行任务的线程池。底层基于DelaydQueue

常用方法:

使用场景: 计划任务时选用,如:电信行业中的数据整理,每分钟整理,每天整理等。

坑:运行时间长了会有一定的误差

public class  Test_07_ScheduledThreadPool {
    
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
        System.out.println(service);
        
        // 定时完成任务。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
        // runnable - 要执行的任务。
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            }
        }, 0, 300, TimeUnit.MILLISECONDS);
        
    }
}

8、SingleThreadExceutor

单一容量的线程池。

ExecutorService service = Executors.newSingleThreadExecutor();

使用场景: 保证任务顺序时使用。如: 游戏大厅中的公共频道聊天。秒杀。

9、ForkJoinPool

分支合并线程池(mapreduce类似的设计思想)。适合用于处理复杂任务。初始化线程容量与CPU核心数相关。

线程池中运行的内容必须是ForkJoinTask的子类RecursiveTask(有返回结果的分支合并任务或RecursiveAction(无返回结果的分支合)

ForkJoinPool 没有所谓的容量。默认都是 1 个线程。根据任务自动的分支新的子线程。 当子线程任务结束后,自动合并。所谓自动是根据 fork 和 join 两个方法实现的。

应用: 主要是做科学计算或天文计算的。数据分析的。

public class Test_08_ForkJoinPool {
    
    final static int[] numbers = new int[1000000];
    final static int MAX_SIZE = 50000;
    final static Random r = new Random();
    
    
    static{
        for(int i = 0; i < numbers.length; i++){
            numbers[i] = r.nextInt(1000);
        }
    }
    
    static class AddTask extends RecursiveTask<Long>{ // RecursiveAction
        int begin, end;
        public AddTask(int begin, int end){
            this.begin = begin;
            this.end = end;
        }
        
        // 
        protected Long compute(){
            if((end - begin) < MAX_SIZE){
                long sum = 0L;
                for(int i = begin; i < end; i++){
                    sum += numbers[i];
                }
                // System.out.println("form " + begin + " to " + end + " sum is : " + sum);
                return sum;
            }else{
                int middle = begin + (end - begin)/2;
                AddTask task1 = new AddTask(begin, middle);
                AddTask task2 = new AddTask(middle, end);
                task1.fork();// 就是用于开启新的任务的。 就是分支工作的。 就是开启一个新的线程任务。
                task2.fork();
                // join - 合并。将任务的结果获取。 这是一个阻塞方法。一定会得到结果数据。
                return task1.join() + task2.join();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {
        long result = 0L;
        for(int i = 0; i < numbers.length; i++){
            result += numbers[i];
        }
        System.out.println(result);
        
        ForkJoinPool pool = new ForkJoinPool();
        AddTask task = new AddTask(0, numbers.length);
        
        Future<Long> future = pool.submit(task);
        System.out.println(future.get());
        
    }

}

10、WorkStealingPool

JDK1.8 新增的线程池,工作窃取线程池。线程池中每个线程都维护自己的任务队列。当自己的任务队列执行完成时,会帮助其他线程执行其中的任务。底层使用的是ForkJoinPool

初始化线程容量与 CPU 核心数相关。此线程池中维护的是精灵线程。

Executors.newWorkStealingPool();

11、ThreadPoolExecutor

除ForkJoinPool外,其他常用线程池底层都是使用ThreadPoolExecutor实现。

public ThreadPoolExecutor
(int corePoolSize, // 核心容量,创建线程池的时候,默认有多少线程。也是线程池保持 的最少线程数
int maximumPoolSize, // 最大容量,线程池最多有多少线程
long keepAliveT ime, // 生命周期,0 为永久。当线程空闲多久后,自动回收。
TimeUnit unit, // 生命周期单位,为生命周期􏰀供单位,如:秒,毫秒
BlockingQueue<Runnable> workQueue // 任务队列,阻塞队列。注意,泛型必须是 Runnable
);

使用场景: 默认提供的线程池不满足条件时使用。如:初始线程数据 4,最大线程数 200,线程空闲周期 30 秒。

public class Test_09_ThreadPoolExecutor {
    
    public static void main(String[] args) {
        // 模拟fixedThreadPool, 核心线程5个,最大容量5个,线程的生命周期无限。
        ExecutorService service = 
                new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, 
                        new LinkedBlockingQueue<Runnable>());
        
        for(int i = 0; i < 6; i++){
            service.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " - test executor");
                }
            });
        }
        
        System.out.println(service);
        
        service.shutdown();
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        service.shutdown();
        System.out.println(service.isTerminated());
        System.out.println(service.isShutdown());
        System.out.println(service);
        
    }

}

本文参考

1、java 中的锁 -- 偏向锁、轻量级锁、自旋锁、重量级锁(建议看)

2、浅析几种线程安全模型

上一篇下一篇

猜你喜欢

热点阅读