面试题

多线程

2019-11-22  本文已影响0人  Teemo_fca4

JMM
JMM:java内存模型(Java Memory Model简称JMM),JMM本身是一种规范,并不真实存在,就像是十二生肖一样 也是一种概念。
JMM关于同步的规定

为什么说volatile是轻量级的同步机制?
JMM对于同步的要求有三个:

public class Test1 {
    //验证volatile的可见性
    public static void main(String[] args)   {
        Mydata mydata = new Mydata();
        new Thread(()->{
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"线程进入");
            mydata.addTO10();
        },"a").start();
        while (mydata.num==0){//如果一直为0就一直卡在这里

        }
        System.out.println("main线程结束");
    }
}
class  Mydata{
    volatile int num;

    public void  addTO10(){
        num = 10;
    }
}

如果num不加volatile 修饰,那么运行main函数后 结果是A线程跑完,main线程一直卡在while循环里面,因为主线程无法得到主内存中的num的 最新数据


image.png

加了volatile ,由于volatile 修改了主内存的数据后,会及时通知其他工作线程(例子里面的是main线程)刷新数据到工作内存里,这样main线程就能拿到最新数据了,main线程也就跑完程序了


image.png

volatile不保证原子性Demo

public class Test1 {
    //验证volatile的不保证原子性
    public static void main(String[] args)   {
        Mydata mydata = new Mydata();
        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                for (int j = 0; j < 1000; j++) {
                    mydata.addPlusPlus();
                }
            },""+i).start();
        }
        while (Thread.activeCount()>2){//等上面的线程跑完, 两个线程是 main 和gc线程,
            Thread.yield();
        }
        System.out.println("20个线程各自执行1000次++后的结果是"+mydata.num);
    }
}
class  Mydata{
    volatile int num;

    public void  addPlusPlus(){
        num ++;
    }
}

运行结果,小于20*1000=20000这个数

image.png
分析
i++ 分为三步

volatile不保证原子性的分析:在一个线程t1将自己的工作内存的更新后的数据写入到主内存的之后,此时如果其他线程t2如果发生读操作(也就是上面的第一步), t2会去主内存加载最新数据到工作内存,但是 如果此时t2线程已经处于第二步了,那么t2得到的数据就是错误的,造成最终的数据不一致,这就是volatile不保证原子性的原因。

解决volatile不保证原子性的方法是使用automic类,为什么原子类能解决这个问题? 后面会有

public class Test1 {
    //验证volatile的不保证原子性
    public static void main(String[] args)   {
        Mydata mydata = new Mydata();
        for (int i = 0; i < 20; i++) {
            new Thread(()->{
                for (int j = 0; j < 1000; j++) {
                    mydata.addPlusPlus();
                    mydata.atomicAdd();
                }
            },""+i).start();
        }
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println("20个线程各自执行1000次++后-普通变量-的结果是"+mydata.num);
        System.out.println("20个线程各自执行1000次++后-atomic-的结果是"+mydata.num2);
    }
}
class  Mydata{
    volatile int num;

    public void  addPlusPlus(){
        num ++;
    }
    AtomicInteger num2 = new AtomicInteger();
    public void  atomicAdd(){
        num2.getAndIncrement();//获取并且自增
    }
}

执行结果

image.png
指令重排
计算机在执行程序时,为了提高性能,编译器和处理器常常会对指令做重新拍下的操作,指令重排一般分为以下三种:
image.png

处理器在进行重排时必须遵守的规则是数据的依赖性: 比如数据b 依赖于数据a的值,那么b的初始化肯定是要在a之后的
多线程环境下 线程交替执行,有序重排序的存在,两个线程的变量能否保住一致性是无法确定的,结果也是无法预测的
比如以下例子:

    public  void reSort(){
        int x  = 11;//语句1
        int y = 12;//语句2
        x = x+5;//语句3
        y = x * x;//语句4
        // 最终的执行命令顺序可能是1234,2134,1324(注意无论怎么排,4都不肯是第一步,违背的数据依赖性原则)
    }

多线程下的情况

image.png
volatile 实现禁止指令重排优化,从而避免多线程环境下的乱序执行现象。

内存屏障 Memory Brrrier: 也称为内存栅栏,它是一个CPU指令。作用有两项
1 保证特定操作的执行顺序。
2 保证某些变量的可见性(volatile 就是利用该特性实现可见性的)
如果来指令间插入Memory Brrrier,它就会告诉编译器和CPU,不管什么指令都不能和这条Memory Brrrier指令重排序,Memory Brrrier的另外一个作用就是强制刷出各种CPU的缓存数据,因此CPU的线程都能读取到数据的最新版本。

image.png
原子类为什么能够解决volatile原子性问题?

答案:CAS自旋
compareAndSet方法:与主内存比较并设置

    public static void main(String[] args) {
        //原子基本变量
        AtomicInteger atomicInteger = new AtomicInteger(10);//对象存于堆内,堆是共享区域,初始化为10
        //当期待值10与原始值10相等,则设置成功
        System.out.println(atomicInteger.compareAndSet(10,50)+",当前值"+atomicInteger.get());
        //当期待值10与原始值50不等,则设置失败
        System.out.println(atomicInteger.compareAndSet(10,1000)+",当前值"+atomicInteger.get());

        //原子引用
        User zhangsan = new User("张三",20);
        User lisi = new User("李四",25);
        AtomicReference<User> atomicReference = new AtomicReference<>();
        atomicReference.set(zhangsan);
        //如果主内存中是张三 那么改为李四 true
        System.out.println(atomicReference.compareAndSet(zhangsan,lisi)+",当前值"+atomicReference.get());
        //false
        System.out.println(atomicReference.compareAndSet(zhangsan,lisi)+",当前值"+atomicReference.get());
    }

cas保证原子性的原理:

        atomicInteger.getAndIncrement();

        //这里调用的是unsafe对象的方法
        public final int getAndIncrement() {
            //this 是当前对象  valueOffset是当前对象所处的内存地址偏移量,1位增量值
            return unsafe.getAndAddInt(this, valueOffset, 1);
        }

        //原子性 靠的是unsafe来保证的 var1是上面一步传入的this对象,var2是上面传来的地址偏移量,var4是要增加的值
        public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                //根据this对象和地址偏移量获取主内存的值
                var5 = this.getIntVolatile(var1, var2);//这里获取的值,其实是volatile修饰的,这样能保证每次获取的都是主存中的最新值
//                private volatile int value;
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));//使用var5去主内存中的值比较,如果一致则更新为var5 + var4,否则轮训

            return var5;
        }

CAS的全称是compare-and-swap,它是CPU的一条系统并发原语,这一过程是原子的,在这一过程中不会造成数据不一致的问题。CAS的核心类是Unsafe,由于java无法访问底层系统,需要通过本地方法(native)来访问,Unsafe类内大量的native方法就是来访问底层的,可以直接访问内存中的数据,Unsafe位于JDK中的rt.jar的sun.misc包下。多个线程访问的例子
1 假设现在有两个线程A,B来执行getAndAddInt(上面的方法)操作(分别泡在不同的CPU上)
2 atomicInteger的原始值为3,即主内存AtomicInteger的value值为3,根据JMM模型:线程A,B 均持有一份值为3的副本到自己的工作内存,
3 线程A通过 this.getIntVolatile(var1, var2)获取到的主内存值为3,这时线程A被挂起
4 线程A通过 this.getIntVolatile(var1, var2)获取到的主内存值为3,刚好这时B未被挂起,B执行compareAndSwapInt(),执行成功,主内存中值被修改为4,线程B操作完成,收工。
5 这时A恢复,执行compareAndSwapInt()方法,发现自己手里的3和主内存中的4不一致,说明这个值被别人提前动过了,所以线程A此次操作失败。
6 线程A现在重新再来,this.getIntVolatile(var1, var2)获取到的主内存值为4(这里因为AtomicInteger中的value是volatile修饰的,所以线程B修改后,线程A能感知到,所以这里获取的是4),拿这个4再去和主内存比较,比较成功 操作成功

这里为什么不使用synchonized:因为synchonized同一时刻只允许一个线程访问,cas这里允许多个访问,即保证了一致性也保证了并发性
CAS的缺点

ABA问题:就以上面的多个线程访问的例子来说吧,如果B将3修改成4之后又将4修改成3了,那么此时A线程依然会执行成功,虽然可以执行成功,但是这并不意外这一切OK,一切都没问题
ABA问题的解决思路:
加版本号,就好比 在value之外维护一个自增变量,每次操作主内存数据后,这个自增变量都自增1,然后当其他线程来访问的时候 既需要比较value,也需要比较版本号。

public class Test2_2 {
    static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);//100是初始值

    static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100,1);//100是初始值 1是初始版本号
    public static void main(String[] args) {
        System.out.println("*******************下面操作会有ABA问题的出现********************");
        new Thread(()->{
            atomicReference.compareAndSet(100,101);//A->B
            atomicReference.compareAndSet(101,100);//B->A

        },"t1").start();

        new Thread(()->{
            //睡眠一秒 等待上面的ABA操作完成
            try {TimeUnit.SECONDS.sleep(1);}catch (Exception e){}
            atomicReference.compareAndSet(100,2019);//B->A
            System.out.println("atomicReference最终的值是\t"+atomicReference.get());
        },"t2").start();

        //睡眠3秒不然结果输出太混乱
        try {TimeUnit.SECONDS.sleep(2);}catch (Exception e){}
        System.out.println("*******************下面操作会有ABA问题的出现********************");

        new Thread(()->{
            int stamp = atomicStampedReference.getStamp();//最初的版本号是1
            System.out.println("t3第一次获取的版本号是\t"+stamp);
            //睡眠1秒 等待t4获取最初版本号
            try {TimeUnit.SECONDS.sleep(1);}catch (Exception e){}
            atomicStampedReference.compareAndSet(100,101,stamp,stamp+1);
            stamp = atomicStampedReference.getStamp();
            System.out.println("t3第二次获取的版本号是\t"+stamp);
            atomicStampedReference.compareAndSet(101,100,stamp,stamp+1);
            System.out.println("t3第三次获取的版本号是\t"+atomicStampedReference.getStamp());
        },"t3").start();


        new Thread(()->{
            int stamp = atomicStampedReference.getStamp();//最初的版本号是1
            System.out.println("t4第一次获取的版本号是\t"+stamp);
            //睡眠三秒 等待上面的ABA操作完成
            try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
            boolean result = atomicStampedReference.compareAndSet(100,2019,stamp,stamp+1);
            System.out.println("最后的操作结果\t"+result+",最终的值是\t"+atomicStampedReference.getReference().longValue()+"最终的 版本号是"+atomicStampedReference.getStamp());
        },"t4").start();
    }
}

结果


image.png
集合类的线程不安全
    public  static void listNotSafe() {
        List<String> list = new ArrayList<>();
//        List<String>  list  = new Vector<>();
//        List<String>  list  = Collections.synchronizedList(new ArrayList<>());
//        CopyOnWriteArrayList<String>  list  = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString().replace("-", "").substring(8));
                System.out.println(list);
            }, "t" + i).start();
        }
    }
    /**
     * 1 故障描述 多线程环境下会报错  java.util.ConcurrentModificationException
     *
     * 2 报错原因
     *  一个线程正在写,另外一个线程抢夺了资源,导致数据不一致异常,也就是并发修改异常
     *
     * 3 解决方法
     * 3.1 使用 new Vector<>()(效率低 使用synchronized来实现同步)
     * 3.2 使用 Collections.synchronizedList(new ArrayList<>());(效率也低,也是使用synchronized来实现同步)
     * 3.3 (推荐)使用 new CopyOnWriteArrayList<>();
     *     CopyOnWriteArrayList 写时复制,读的时候不复制,写的时候复制一个新数组,读的时候还是读老数组,这样既能保证安全也能保证效率,读写分离的思想
     *     CopyOnWriteArrayList add方法源码
     *     public void add(int index, E element) {
     *         final ReentrantLock lock = this.lock;
     *         lock.lock();//首先加锁
     *         try {
     *             Object[] elements = getArray();
     *             int len = elements.length;
     *             if (index > len || index < 0)
     *                 throw new IndexOutOfBoundsException("Index: "+index+
     *                                                     ", Size: "+len);
     *             Object[] newElements;
     *             int numMoved = len - index;
     *             if (numMoved == 0)
     *                 newElements = Arrays.copyOf(elements, len + 1);
     *             else {
     *                 newElements = new Object[len + 1];
     *                 System.arraycopy(elements, 0, newElements, 0, index);
     *                 System.arraycopy(elements, index, newElements, index + 1,
     *                                  numMoved);
     *             }
     *             newElements[index] = element;//在复制后的数组尾部加入数据
     *             setArray(newElements);
     *         } finally {
     *             lock.unlock();//解锁
     *         }
     *     }
     *
     *    并发修改异常也会出现在 set和 Map中 ,都可以使用CopyOnWriteArraySet和ConcurrentHashMap 来获得一个线程安全的版本
     *    其中Set的JUC解决方案中用 CopyOnWriteArraySet,值得注意的是 CopyOnWriteArraySet中维护的是CopyOnWriteArrayList
     *    Map的JUC解决方案中用 ConcurrentHashMap 来解决
     */
公平锁和非公平锁

并发包中的ReentrantLock创建可以在构造函数中指定为公平锁和非公平锁,默认为非公平锁
关于二者的区别

可重入锁(也叫递归锁)

指同一个线程,如果在外层函数获得锁之后,内层递归函数依然可以获取该锁的代码,也就是说在外层方法获取锁之后,可以进入任何一个他已经拥有锁的同步代码或者代码块。
ReentrantLock和Synchronized就是典型的可重入锁,可重入锁的最大作用是避免死锁。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class Phone implements Runnable{
    public synchronized  void sendSMS(){
        System.out.println(Thread.currentThread().getName()+"进入sendSMS方法");
        sendEmail();
    }

    public synchronized  void sendEmail(){
        System.out.println(Thread.currentThread().getName()+"进入sendEmail方法");
    }

    @Override
    public void run() {
        get();
    }
    Lock lock = new ReentrantLock();

    public void get(){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName()+"进入get方法");
            set();
        }finally {
            lock.unlock();
        }
    }

    public void set(){
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName()+"进入set方法");
        }finally {
            lock.unlock();
        }
    }
}

public class Test4 {
    public static void main(String[] args) {
        System.out.println("******************synchronized版本的可重入锁********************");
        Phone phone = new Phone();
        new Thread(()->{
            phone.sendSMS();
        },"t1").start();

        new Thread(()->{
            phone.sendSMS();
        },"t2").start();

        try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
        System.out.println("******************ReentrantLock版本的可重入锁********************");


        Thread t3 = new Thread(phone,"t3");
        Thread t4 = new Thread(phone,"t4");

        t3.start();
        t4.start();
    }
}
image.png

无论执行多少次,结果都是连续的,t1 和 t2之间不会插队,t3 和 t4之间不会插队。也就说明了重入锁的情况,在sendSMS()中调用sendEmail(),是无需等待锁资源的。get()中调用set()也一样,因为都是一把锁。

另外:加锁与解锁必须要成双成对才行,即使重复加锁也需要成双成对。

image.png
image.png
image.png
自旋锁

尝试获取锁的线程如果获取失败不会立即阻塞,而是采用循环的方式继续去尝试获取锁,这样的好处是减少线程上下文的切换,缺点是消耗CPU

/**
* @Desc 通过CAS操作完成自旋锁,A线程先进来调用myLock获取锁,持有锁3秒,
*       B随后进来后发现AtomicReference中已被线程A占有,不是null,所以线程B只能自旋等待,直到A线程释放所资源B随后抢到
*/
public class MyRotateLock {
    AtomicReference<Thread> atomicReference = new AtomicReference<>();

    public void myLock(){
        Thread thread = Thread.currentThread();
        System.out.println(thread.getName()+"进入myLock方法");
        //如果获取失败,就一直循环去获取锁
        while (!atomicReference.compareAndSet(null,thread)){
//            System.out.println(thread.getName()+"尝试获取锁");
        }
    }

    public void myUnLock(){
        Thread thread = Thread.currentThread();

        //解锁
        atomicReference.compareAndSet(thread,null);
        System.out.println(thread.getName()+"进入myUnLock方法,解锁成功");
    }

    public static void main(String[] args) {
        MyRotateLock myRotateLock = new MyRotateLock();

        new Thread(()->{
            myRotateLock.myLock();
            //持有锁3秒时间
            try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
            myRotateLock.myUnLock();
        },"AA").start();

        try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}

        new Thread(()->{
            myRotateLock.myLock();
            try {TimeUnit.SECONDS.sleep(1);}catch (Exception e){}
            myRotateLock.myUnLock();
        },"BB").start();
    }
}
读写锁
public class MyReadWriteLock1 {
    private volatile Map<String,Object> map = new HashMap<>();

    public void put(String key,Object value){
        System.out.println(Thread.currentThread().getName()+"正在写入数据,写入的key是"+key);
        //模拟写过程
        try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
        map.put(key,value);
        System.out.println(Thread.currentThread().getName()+"写入完成");
    }


    public void get(String key){
        System.out.println(Thread.currentThread().getName()+"正在读取:"+key);
        //模拟读过程
        try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
        Object result =  map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取完成:"+result);
    }

    public static void main(String[] args) {

        //模拟5个线程同时去写,五个线程同时去读
        MyReadWriteLock1 myReadWriteLock1  = new MyReadWriteLock1();
        for (int i = 1; i <=5 ; i++) {
            final  int tempInt = i;
            new Thread(()->{
                myReadWriteLock1.put(tempInt+"",tempInt+"");
            },i+"").start();
        }

        for (int i = 1; i <=5 ; i++) {
            final  int tempInt = i;
            new Thread(()->{
                myReadWriteLock1.get(tempInt+"");
            },i+"").start();
        }
    }
}
image.png

上面的代码无论怎么运行,都有错误,因为写的时候有其他线程去读,数据会错误,
多个线程去读一个资源是没问题的,但是如果有一个线程去写共享资源,不不能让其他线程来读或者写这个资源了,也就是写-独占,读-共享。写操作时必须保证写线程独占这个资源,并且写操作必须是原子的,不允许过程被打断。
读--读 可以同时进行 加读锁
读--写 不可以同时进行 加写锁
写--写 不可以同时进行 加写锁

public class MyReadWriteLock2 {
    private volatile Map<String,Object> map = new HashMap<>();

    //读写锁资源,默认非公平
    private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

    //写操作 加写锁
    public void put(String key,Object value){

        reentrantReadWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"正在写入数据,写入的key是"+key);
            //模拟写过程
            try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
            map.put(key,value);
            System.out.println(Thread.currentThread().getName()+"写入完成");
        }catch (Exception e){

        }finally {
            reentrantReadWriteLock.writeLock().unlock();
        }

    }

    //读操作  加读锁
    public void get(String key){
        reentrantReadWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"正在读取:"+key);
            //模拟读过程
            try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
            Object result =  map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取完成:"+result);
        }catch (Exception e){

        }finally {
            reentrantReadWriteLock.readLock().unlock();
        }
    }

    public static void main(String[] args) {

        //模拟5个线程同时去写,五个线程同时去读
        MyReadWriteLock2 myReadWriteLock1  = new MyReadWriteLock2();
        for (int i = 1; i <=5 ; i++) {
            final  int tempInt = i;
            new Thread(()->{
                myReadWriteLock1.put(tempInt+"",tempInt+"");
            },i+"").start();
        }

        for (int i = 1; i <=5 ; i++) {
            final  int tempInt = i;
            new Thread(()->{
                myReadWriteLock1.get(tempInt+"");
            },i+"").start();
        }
    }
}
CountDownLatch 计数器闭锁

让一些线程阻塞到另外一些线程完成操作后才被唤醒,CountDownLatch主要有两个方法 ,所有调用await()方法的线程会被阻塞,直到计数器为才被唤醒。调用countDown()方法的线程不会被阻塞,countDown()方法会让计数器减1。
这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier

import lombok.AllArgsConstructor;
import lombok.Getter;

import java.util.concurrent.CountDownLatch;

public class Test2 {
    public static void main(String[] args) throws  Exception{
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6 ; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"国灭亡");
                countDownLatch.countDown();
            },Country.list_Contry(i).getName()).start();
        }
        countDownLatch.await();
        System.out.println("战国时代结束,秦统一天下");
    }
}

@AllArgsConstructor
@Getter
enum Country{//使用枚举,枚举就相当于一个小型的简单数据库
    ONE(1,"赵"),TWO(2,"韩"),ThREE(3,"楚"),FOUR(4,"燕"),FIVE(5,"齐"),SIX(6,"魏");
    private Integer code;
    private String name;

    public static Country list_Contry(int index){
        Country[] list = Country.values();
        for (Country country : list) {
            if(index==country.getCode()){
                return country;
            }
        }
        return null;
    }
}
image.png
CyclicBarrier(字面意思:可循环使用的屏障)

CyclicBarrier是让一组线程到达的一个屏障点,直到最后一个线程到达屏障时,屏障才会开门,开门执行CyclicBarrier初始化的线程操作,这个操作完成之后,所有被拦截的线程才会被唤醒。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class Test3 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙~~~");
            try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
            System.out.println("召唤完成~~~");
        }); //数字达到7 就输出召唤神龙
        for (int i = 1; i <= 7 ; i++) {//这里可以弄成14,cyclicBarrier 会阻塞两次,因为它可以重用
            final int tempInt = i;
            new Thread(()->{
                System.out.println("收集到第"+tempInt+"龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("第"+tempInt+"龙珠继续运行");
            },i+"").start();
        }
    }
}
image.png
Semaphore (可以理解为简单版的阻塞队列)

信号量主要用于两个目的,一个是控制并发线程的数量,一个是多个共享资源互斥使用。

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class Test4 {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);//模拟三个停车位
        // 模拟6部车
        for (int i = 1; i <= 6 ; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"抢到了车位");
                    //占有车位三秒
                    try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
                    System.out.println(Thread.currentThread().getName()+"离开了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            },i+"").start();
        }
    }
}
image.png

阻塞队列 BlockingQueue

阻塞队列的特点:从空的阻塞队列中获取元素操作会被阻塞,同样,往满的阻塞队列中丢一个元素操作 也会被阻塞。
为什么需要阻塞队列:在生产者消费者模型中,一个比较难的重点是唤醒其他线程 这往往是很麻烦的,阻塞队列就不存在这个问题,唤醒线程的时机BlockingQueue给程序员包办了
可以用在一下三个方面,生产者消费者模式,线程池,消息队列。

image.png

从继承结构上来看阻塞队列BlockingQueue和我们CRUD工程师经常使用的ArrayLIst,LinkedList一样,属于同一个层级,只是大部分的业务比较简单,用不上BlockingQueue而已,BlockingQueue下有7个常用的子类,ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,DelayQueue,PriorityBlockingQueue,LinkedTransferQueue,LinkedBlockingDeque,前三个比较常用。
ArrayBlockingQueue 由数组组成的有界阻塞队列(界限在构造函数上手动指定)。
LinkedBlockingQueue 由链表组成的有界队列(默认长度是Integer.MAX_VALUE,这个值其实也就相当于无界了)
SynchronousQueue 只存储一个元素的有界队列(这个可以做一个自旋锁的版本不?)
PriorityBlockingQueue 支持优先级的无界阻塞队列
DelayQueue 使用优先级队列实现的延迟无界阻塞队列
LinkedTransferQueue 由链表组成的无界阻塞队列
LinkedBlockingDeque 由链表组成的双向队列

阻塞队列的四类常用方法

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek()
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Test1 {
    public static void main(String[] args)throws Exception {
//        test1();
//        test2();
//        test3();
        test4();
    }

    // 测试 --- 抛异常
    public static  void test1(){
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("1"));
        System.out.println(blockingQueue.add("2"));
        System.out.println(blockingQueue.add("3"));

        //比较暴力:前面已经放入了三个元素进去 这里再放一个就会抛异常java.lang.IllegalStateException: Queue full
//        blockingQueue.add("4");

        //返回队首第一个元素
        System.out.println(blockingQueue.element());
        System.out.println("*************");

        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //同样的:前面已经弹出三个元素出来 这里再弹一个就会抛异常ava.util.NoSuchElementException
//        System.out.println(blockingQueue.remove());
    }

    // 测试--- 特殊值
    public static  void test2(){
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("1"));
        System.out.println(blockingQueue.offer("2"));
        System.out.println(blockingQueue.offer("3"));

        //这里比较温和一点:放不进去就不放了  返回false
        System.out.println(blockingQueue.offer("4"));

        //返回队首第一个元素
        System.out.println(blockingQueue.peek());
        System.out.println("*************");

        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        //同样的:这里没元素返回了,返回null
        System.out.println(blockingQueue.poll());
    }

    // 测试---阻塞
    public static  void test3() throws  Exception{
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        blockingQueue.put("1");
        blockingQueue.put("2");
        blockingQueue.put("3");

        System.out.println("*************");

        //这里放不进去就一直等待,一直阻塞,直到放进去为止
//        blockingQueue.put("3");


        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        //同样的:一直等待,一直阻塞,直到弹出元素为止
        System.out.println(blockingQueue.take());
    }

    // 测试---超时阻塞
    public static  void test4() throws  Exception{
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("1", 2, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("2", 2, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("3", 2, TimeUnit.SECONDS));


        System.out.println("*************");

        //2秒钟内数据都没放进去就返回false
        System.out.println(blockingQueue.offer("4", 2, TimeUnit.SECONDS));


        System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
        System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
        //同样的:2秒钟内数据都没取出来就返回null
        System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
    }
}

SynchronousQueue

每个put操作之后必须要等take完,才能进行下一次put

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class Test2 {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
        new Thread(()->{
            String threadName = Thread.currentThread().getName();
            try {
                System.out.println(threadName+"放入了元素1");
                blockingQueue.put("1");

                System.out.println(threadName+"放入了元素2");
                blockingQueue.put("2");

                System.out.println(threadName+"放入了元素3");
                blockingQueue.put("3");
            }catch (Exception e){

            }

        },"AAA").start();

        new Thread(()->{
            String threadName = Thread.currentThread().getName();
            try {
                TimeUnit.SECONDS.sleep(2);//模拟取数据的过程
                System.out.println(threadName+"取出了元素"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(threadName+"取出了元素"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(threadName+"取出了元素"+blockingQueue.take());
            }catch (Exception e){

            }

        },"BBB").start();
    }
}
image.png
生产者消费者模型
image.png
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//线程操作资源类
class ShareData{
    private volatile int num =0;//保证资源可见性
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment()throws  Exception{
        lock.lock();
        try {
            while (num != 0){//当num不为0 生产者阻塞  这里一定要用while,用if就死定了
                condition.await();
            }
            num++;
            System.out.println("生产者生产了一次,num为:"+num);
            condition.signalAll();//唤醒其他线程去消费
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }


    public void derement()throws  Exception{
        lock.lock();
        try {
            while (num == 0){//当num == 0 消费者阻塞 这里一定要用while,用if就死定了
                condition.await();
            }
            num --;
            System.out.println("消费者消费了一次,num为:"+num);
            condition.signalAll();//唤醒其他线程去生产
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }

}

public class Test3 {

    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(()->{
            try {
                for (int i = 0; i <5 ; i++) {
                    shareData.increment();
                }
            }catch (Exception e){

            }

        },"AAA").start();

        new Thread(()->{
            try {
                for (int i = 0; i <5 ; i++) {
                    shareData.derement();
                }
            }catch (Exception e){

            }

        },"BBB").start();

        new Thread(()->{
            try {
                for (int i = 0; i <5 ; i++) {
                    shareData.increment();
                }
            }catch (Exception e){

            }

        },"CCC").start();

        new Thread(()->{
            try {
                for (int i = 0; i <5 ; i++) {
                    shareData.derement();
                }
            }catch (Exception e){

            }

        },"DDD").start();
    }
}
synchronized 和 Lock的区别
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 有三个线程  实现A 打印3次A,B打印3次B,C打印3次C,交替打印,,,按照这样顺序执行10轮
 */
class Data{
    private   int num = 1; //A -1,B -2,C-3
    private Lock lock  = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    private void print(String str,int count){
        for (int i = 0; i <count ; i++) {
            System.out.println(str);
        }
    }

    public void Aprint(){
        lock.lock();
        try {
            while (num !=1){
                condition1.await();//不是A对应的1 那就在这里等着
            }
            print("A",3);
            num =2;//让B可以去执行
            condition2.signal();//唤醒B
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }
    public void Bprint(){
        lock.lock();
        try {
            while (num !=2){
                condition2.await();
            }
            print("B",3);
            num =3;
            condition3.signal();
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }

    public void Cprint(){
        lock.lock();
        try {
            while (num !=3){
                condition3.await();
            }
            print("C",3);
            num =1;
            condition1.signal();
        }catch (Exception e){

        }finally {
            lock.unlock();
        }
    }


}

public class Test4 {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(()->{
            try {
                for (int i = 0; i <10 ; i++) {
                    data.Aprint();
                }
            }catch (Exception e){

            }

        },"AAA").start();

        new Thread(()->{
            try {
                for (int i = 0; i <10 ; i++) {
                    data.Bprint();
                }
            }catch (Exception e){

            }

        },"BBB").start();

        new Thread(()->{
            try {
                for (int i = 0; i <10 ; i++) {
                    data.Cprint();
                }
                data.Cprint();
            }catch (Exception e){

            }

        },"CCC").start();
    }
}

volatile + atomicInteger +BlockingQueue 实现的高效生产者消费模型

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

class MyResource{
    private  volatile  boolean FLAG = true;
    private AtomicInteger atomicInteger = new AtomicInteger();

    BlockingQueue<String> blockingQueue = null;

    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        System.out.println("注入类型是"+blockingQueue.getClass().getName());
    }

    public void produce() throws  Exception{
        //不要在while里大量创建引用
        String threadName = Thread.currentThread().getName();
        String data = null;
        boolean result = false;
        while (FLAG){
             data = atomicInteger.incrementAndGet()+"";
             result =  blockingQueue.offer(data,2,TimeUnit.SECONDS);
            if(result){
                System.out.println(threadName+"线程放入元素"+data+"成功");
            }else{
                System.out.println(threadName+"线程放入元素"+data+"失败");
            }
        }
        System.out.println("开关关闭,生产动作结束");
    }

    public void consume() throws  Exception{
        //不要在while里大量创建引用
        String threadName = Thread.currentThread().getName();
        String result = null;
        while (FLAG){
            result = blockingQueue.poll(2,TimeUnit.SECONDS);
            if(result==null ||"".equals(result)){
//                FLAG = false;
                System.out.println("消费者2秒都没取到数据,消费者退出");
            }
            System.out.println(threadName+"线程取到元素元素:"+result+"------------");
        }
        System.out.println("开关关闭,消费动作结束");
    }

    public void stop() {
        this.FLAG = false;
    }
}


public class Test5 {

    public static void main(String[] args) {
        MyResource myResource = new MyResource(new ArrayBlockingQueue<>(3));
        new Thread(()->{
            try {
                myResource.produce();
            }catch (Exception e){

            }
        },"AAA").start();

        new Thread(()->{
            try {
                myResource.produce();
            }catch (Exception e){

            }
        },"CCC").start();

        new Thread(()->{
            try {
                myResource.consume();
            }catch (Exception e){

            }
        },"BBB").start();


        try {TimeUnit.SECONDS.sleep(2); }catch (Exception e){}
        System.out.println("关闭开关--结束");
        myResource.stop();
        System.out.println(myResource.blockingQueue);
    }
}
Callable与FutureTask
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

class  MyTHreahd implements Callable<Integer>{

    @Override
    public Integer call()  throws Exception{
        TimeUnit.SECONDS.sleep(2);
        System.out.println("进入call方法");
        return 100;
    }
}

public class Test1 {
    public static void main(String[] args) throws  Exception{
        FutureTask<Integer> futureTask =  new FutureTask<>(new MyTHreahd());
        new Thread(futureTask).start();//这里使用了适配器思想FutureTask 间接的实现了Runnable接口, futureTask的run()方法调用了call()方法,所以后面能获取到结果

//        int result2 = futureTask.get(); //get是获取计算结果,一旦调用那么当前线程就有可能阻塞,直到futureTask计算完,所以一般这句都是放到最后
        int result1 = 50;
        System.out.println("******************main******************");
        while (!futureTask.isDone()){
//            System.out.println("futureTask 正在计算");
        }
        int result2 = futureTask.get();
        System.out.println("计算结果"+(result1+result2));
    }
}
线程池

线程池的特点就是 线程复用,控制最大并发数,管理线程

典型的三种线程池(真实情况不这样创建线程池)

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test2{
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池5线程
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();//一池1线程
        ExecutorService threadPool = Executors.newCachedThreadPool();//一池N线程  线程数可以动态分配,如果任务多,执行时间长,就可能启动多个线程来执行任务

        try {
            for (int i = 0; i <10 ; i++) {
                threadPool.execute(()->{
                    try{TimeUnit.SECONDS.sleep(1);}catch (Exception e){}
                    System.out.println(Thread.currentThread().getName()+":执行任务");
                });
            }
        }catch (Exception e){

        }finally {
            threadPool.shutdown();
        }
    }
}

线程池核心构造方法

    public ThreadPoolExecutor(int corePoolSize,//常驻核心线程数
                              int maximumPoolSize,//线程池能够容纳最大的线程数
                              long keepAliveTime,//当线程池线程数量超过corePoolSize时,空闲线程的空闲时间达到keepAliveTime时,多余的空闲时间会被销毁到剩下corePoolSize个线程为止
                              TimeUnit unit,//keepAliveTime的单位
                              BlockingQueue<Runnable> workQueue,//任务缓冲队列:已经提交但是还未被执行的任务
                              ThreadFactory threadFactory,//生成工作线程的线程工厂,一般使用默认即可
                              RejectedExecutionHandler handler) {//拒绝策略,当线程数达到最大线程数,并且任务队列已经满的时候时,线程池拒绝新的任务提交策略
        if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;

        //四种拒绝策略
        ThreadPoolExecutor.AbortPolicy; //默认策略:直接抛出异常
        ThreadPoolExecutor.CallerRunsPolicy//调用者运行:既不抛异常,也不丢任务,而是将任务返回给调用者
        ThreadPoolExecutor.DiscardOldestPolicy//抛弃任务队列中等待最久的队列,然后把当前任务加入到任务队列
        ThreadPoolExecutor.DiscardPolicy//直接抛弃任务
    }

线程池底层工作原理

import java.util.concurrent.*;

public class Test2 {
    public static void main(String[] args) {
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池5线程
//        ExecutorService threadPool = Executors.newSingleThreadExecutor();//一池1线程
//        ExecutorService threadPool = Executors.newCachedThreadPool();
//        System.out.println(Runtime.getRuntime().availableProcessors()); 获取服务器核心数

        //这里 不使用JDK提供的Executors创建线程池的原因是,其设置的LinkedBlockingQueue 长度是Integer.MAX_VALUE 21亿,这简直坑爹,会造成OOM
        ExecutorService threadPool = new ThreadPoolExecutor(2,
                5,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        try {
            for (int i = 0; i <10 ; i++) {
                System.out.println(i);
                threadPool.execute(()->{
                    try {TimeUnit.SECONDS.sleep(1); }catch (Exception e){}
                    System.out.println(Thread.currentThread().getName()+"正在运行");
                });
// executorService.submit() 也可以调用submit方法,这个方法返回一个Futher对象,通过Futher对象get方法获取执行结果
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {TimeUnit.SECONDS.sleep(3); }catch (Exception e){}
            threadPool.shutdown();
        }
    }
}

线程数量的配置
CPU密集型任务:如果任务需要大量的计算,而CPU没有阻塞,一直全速运行,尽可能配置少的线程数量,,一般公式是 CPU核心数+1
IO密集型任务,像读DB呀,Redis呀,文件,都属于IO密集型。IO密集型会导致线程等待,浪费了CPU计算能力,所以尽可能多配置线程数 CPU核心数乘2,IO密集型时,大部分任务都阻塞时,参考公式:CPU核心数/(1-阻塞系数),比如8核心的服务器,8/(1-0.9) = 80个线程。

死锁及定位死锁代码

import java.util.concurrent.TimeUnit;

class DeadLock implements  Runnable{
    private  String lockA;
    private  String LockB;

    public DeadLock(String lockA, String lockB) {
        this.lockA = lockA;
        LockB = lockB;
    }


    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"进入run方法");
        synchronized (lockA){
            try { TimeUnit.SECONDS.sleep(2);}catch (Exception e){e.printStackTrace();}
            synchronized (LockB){
                System.out.println("执行完成");

            }
        }
    }
}

public class Test3 {
    public static void main(String[] args) {
        String lockA = "lockA";
        String lockB = "lockB";

        new Thread(new DeadLock(lockA,lockB),"AAA").start();
        new Thread(new DeadLock(lockB,lockA),"BBB").start();
    }
}
上一篇 下一篇

猜你喜欢

热点阅读