互联网科技

超硬核!11000字多线程全覆盖总结,还愣着干嘛?赶紧收藏啊!

2020-09-08  本文已影响0人  风平浪静如码

线程

线程和进程的区别

进程是一个可执行的程序,是系统资源分配的基本单位。线程是进程内部相对独立的可执行单元,是操作系统进行任务调度的基本单位。

多线程的优缺点

优点:

线程有几种状态

一共五种状态:分别是新建,就绪,运行,阻塞和死亡状态。详细见下图:

经典题:一个线程OOM了,其他线程是否还能运行?

答案是还能运行。虽然说堆是线程共享的区域,一个线程堆抛出OOM异常,你可能会觉得其他线程也会抛出OOM异常。但其实不然,当一个线程抛出OOM异常后,它所占据的内存会全部释放掉,从而不会影响其他线程的运行。另外如果主线程异常了,子线程还能运行吗?这个问题也是可以运行的。线程不像进程,一个进程之间的线程之间是没有父子之分的,都是平级关系。即线程都是一样的,退出了一个不会影响另外一个。

创建线程的几种方法

sleep和wait方法的区别

yield和join区别

死锁

什么叫死锁

死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象。若无外力作用,他们都将无法进行运行下去。

死锁产生的原因

死锁产生的必要条件

如何避免死锁

ThreadLocal

ThreadLocal是什么

ThreadLocalMap的内存泄露问题

我们知道ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用时,在GC的时候,这个ThreadLocal必然会被回收,但是对应的value不会被回收掉直到线程结束才会被回收。如果当前线程一直处于运行中,那么这些Entry对象中的value就可能一直无法回收,就会发生内存泄露。实际开发中,会使用线程池维护线程的创建和复用,线程为了复用是不会主动结束的,那么ThreadLocal设置的value值就一直被引用,就会发生内存泄露。

线程池

概念

创建线程池

线程池可以通过ThreadPoolExecutor来创建,看下构造函数:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
   BlockingQueue<Runnable> workQueue,
   ThreadFactory threadFactory,
   RejectedExecutionHandler handler) 

几个核心参数的作用:

线程池的执行流程

文字表述就是:

其实线程池为了执行你的任务也是操碎了心有木有,先用核心线程执行你的任务,核心线程没空,把你加到核心线程的任务队列里(核心线程空下来就执行你的任务)。核心线程的排期也满了,不能再加了,还是想办法在最大线程数的范围内创建非核心的线程去执行你的任务,如果你的任务太多,那不好意思,只能用拒绝策略招待你了。

四种拒绝策略

线程池的工作队列

ArrayBlockingQueue

有界队列,是一个用数组实现的有界阻塞队列,按FIFO排序。

LinkedBlockingQueue

基于链表实现的阻塞队列,按FIFO排序任务,容量可以设置,不设置的话将是一个无边界的阻塞队列(最大长度是Integet.Max_VLAUE),吞吐量通常要高于ArrayBlockingQueue;newFixedThreadPool线程池使用了这个队列。

DelayQueue

DelayQueue是一个任务定时周期延迟执行的队列。根据指定的执行从小到大排序,否则根据插入到队列的先后顺序。newScheduledThreadPool线程池使用了这个队列。

PriorityBlockingQueue

优先级队列是具有优先级的无界阻塞队列。

SynchronousQueue

同步队列,一个不存储元素的阻塞队列,每个插入操作都必须等待另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。吞吐量通常要高于LinkedBlockingQueue,newCachedThreadPool使用了这个队列。

几种常用的线程池

newFixedThreadPool

  public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

线程池特点

newCachedThreadPool

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

线程池特点:

执行流程参照ThreadPoolExecutor和池特点自行得出。这个池也有一个问题:当提交任务的数量大于处理任务的数量时,每次提交一个任务必然会创建一个非核心线程,极端情况下会创建过多的线程(最大Integer.Max_VALUE),耗尽CPU和内存资源。当然如果没有任务了,这些空闲的非核心线程在存活60s后被回收。使用场景:用于并发量大执行大量短期的小任务。

newSingleThreadExecutor

  public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

线程池特点:

提交任务的执行流程可自行得出。其实这个池子里干活的只有一个人,不管你往里塞多少任务,都是它按部就班的从队列里获取任务执行,适用于串行执行任务的情景,一个任务接一个任务的执行。

newScheduledThreadPool

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

线程池特点:

使用场景:周期性的执行任务的场景,做一些简单的定时调度。

线程池状态

线程池有这几个状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED

   //线程池状态
   private static final int RUNNING    = -1 << COUNT_BITS;
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
   private static final int STOP       =  1 << COUNT_BITS;
   private static final int TIDYING    =  2 << COUNT_BITS;
   private static final int TERMINATED =  3 << COUNT_BITS;

RUNNING

SHUTDOWN

STOP

TIDYING

TERMINATED

该状态表明线程池彻底终止或死亡

四种并发工具类

这里按照使用频率简单说下四种并发工具类CountDownLatch,CyclicBarrier,Semaphore和Exchanger的区别和实现。

CountDownLatch和CyclicBarrier

这两太容易弄混了。

下面给两个Demo
一个老师(主线程)等10个学生(其他线程)都来教室了才开始上课。是CountdownLatch模型。

public class CountDownLatchDemo {

    public static final Integer STUDENT_COUNT = 10;

    public static CountDownLatch countDownLatch = new CountDownLatch(STUDENT_COUNT);

    public static void main(String [] args) {
        Thread teacher = new Thread(new Teacher());
        teacher.start();
        for (int i=0; i<STUDENT_COUNT; i++) {
            Thread student = new Thread(new Student());
            student.start();
        }
    }

    public static class Teacher implements Runnable {

        @Override
        public void run() {
            System.out.println("老师"+Thread.currentThread().getName()+"来了,等所有同学都到了才上课...");
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("所有同学都到齐了,开始上课...");
        }
    }

    public static class Student implements Runnable {

        @Override
        public void run() {
            System.out.println("同学"+Thread.currentThread().getName()+"进入了教室...");
            countDownLatch.countDown();
        }
    }
}

一个任务需要10个工人(10个线程)都到岗之后才能开始进行,否则只能等待,是CyclicBarrier模型。

public class CyclicBarrierDemo {
    
    public static final Integer WORKER_COUNT = 10;

    private static CyclicBarrier cyclicBarrier;
    
    public static void main(String [] args) {
        cyclicBarrier = new CyclicBarrier(WORKER_COUNT, new Runnable() {
            @Override
            public void run() {
                System.out.println(WORKER_COUNT+"个工人已就位,可以开始干活了...");
            }
        });
        for (int i=0; i<WORKER_COUNT; i++) {
            Thread worker = new Thread(new Worker());
            worker.start();
        }
    }

    public static class Worker implements Runnable {
        @Override
        public void run() {
            System.out.println("工人"+Thread.currentThread().getName()+"就位...");
            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

Semaphore

Semaphore信号量,就是用来控制访问有限资源的线程数量,线程要访问资源首先要获得许可证,一个许可证对应一个资源,如果资源不足了,线程就要等待,如果其他线程释放了一个资源(许可证),那么信号量就通知等待的一个线程,分配给它一个许可证。内部实现这里不再赘述,下面是4个人看电影,但是电影院只有2个位置,也就是一次只能2个人在里面看(这里限制不能站着看),有人离开位置,后面的人才可以进来看电影。

public class SemaphoreDemo {

    public static final Integer SETS_NUMBER = 2;
    public static final Integer PERSON_NUMBER = 4;

    public static void main(String [] args) {
        WatchMovie watchMovie = new WatchMovie(SETS_NUMBER);
        for (int i=0; i<PERSON_NUMBER; i++) {
            Thread person = new Thread(new Person(watchMovie));
            person.start();
        }
    }

    public static class WatchMovie {

        private Semaphore semaphore;

        WatchMovie(int setCount) {
            semaphore = new Semaphore(setCount);
        }

        public void watchMovie() {
            try {
                //获取信号量
                semaphore.acquire();
                long time = (long) (Math.random()*10+1);
                System.out.println(Thread.currentThread().getName()+"看了"+time+"秒的电影");
                Thread.sleep(time);
                System.out.println(Thread.currentThread().getName()+"让出座位,离开了电影院");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放信号量
                semaphore.release();
            }

        }
    }

    public static class Person implements Runnable{

        private WatchMovie watchMovie;

        Person(WatchMovie watchMovie) {
            this.watchMovie = watchMovie;
        }

        @Override
        public void run() {
            watchMovie.watchMovie();
        }
    }
}

Exchanger

Exchanger理解其意思就行,就是线程间的数据交换。是怎么交换呢?实际上是共用一个Exchange的多个线程,在全部都到达栅栏的时候才可以进行数据的交换,否则的话先到达的线程只能等待其他的线程达到栅栏。那怎么才算到达栅栏呢,就是线程内部调用exchanger.exchange方法。然后这个方法的返回值就是其他线程交换的数据,参数就是当前线程想要交还给其他线程的数据。

三大并发容器

CopyOnWrite

JDK里提供了CopyOnWriteArrayList和CopyOnWriteArraySet,它的实现机制就是读写分离,简单的说就是任何时候都可以读,写需要加锁,写时复制。什么是写复制呢?就是对容器的修改加锁后,通过copy一个新的容器来进行修改,修改完毕后将容器替换为新的容器。不贴源码了,自己去看吧。下面说下优缺点。

ConcurrentHashMap

这个实在是太重要了!!!但是几百个字也说不清楚,这里只点到为止,说一下ConcurrentHashMap在1.7和1.8中的结构,具体的源码和实现原理以后有机会再整理。

1.7的ConcurrentHashMap

1.8中的ConcurrentHashMap

jdk1.8的实现已经摈弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap。

ConcurrentSkipListMap

ConcurrentSkipListMap内部使用跳表的数据结构来实现,他的结构很简单。跳表就是一个多层的链表,底层是一个普通的链表,然后逐层减少,通常通过一个简单的算法实现每一层元素是下一层的元素的二分之一,这样当搜索元素时从最顶层开始搜索,可以说是另一种形式的二分查找。在理论上它的查找、插入和删除时间复杂度为log(N)。在并发上,ConcurrentSkipListMap采用无锁的CAS+自旋来控制。也就是说整个插入的过程都是通过CAS来实现的。下面看下一个三层的跳表添加元素的过程:

插入值为15的元素

插入后:

写在最后

欢迎大家关注我的公众号【风平浪静如码】,海量Java相关文章,学习资料都会在里面更新,整理的资料也会放在里面。

觉得写的还不错的就点个赞,加个关注呗!点关注,不迷路,持续更新!!!

上一篇 下一篇

猜你喜欢

热点阅读