线程

2021-05-26  本文已影响0人  Drew_MyINTYRE

Java 不提供强制停止线程的能力

线程正在写入一个文件,这时收到终止信号,它就需要根据自身业务判断,是选择立即停止,还是将整个文件写入成功后停止,
而如果选择立即停止就可能造成数据不完整,不管是中断命令发起者,还是接收者都不希望数据出现问题。

我们一旦调用某个线程的 interrupt() 之后,这个线程的中断标记位就会被设置成 true。每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位,如果标记位被设置成 true,就说明有程序想终止该线程。

public class StopThread implements Runnable {
 
    @Override
    public void run() {
        int count = 0;

        //这种就属于通过 interrupt 正确停止线程的情况。
        while (!Thread.currentThread().isInterrupted() && count < 1000) {
            num++;
            try {
               //sleep、wait 等可以让线程进入阻塞的方法使线程休眠了
               //正在休眠的子线程能收到 29 行发出的中断信号,并且会抛出一个 InterruptedException 异常,同时清除中断信号,将中断标记位设置成 false
               //即便线程还在休眠,仍然能够响应中断通知,并抛出异常。
               Thread.sleep(1000);
            } catch (InterruptedException e) {
                //我们在实际开发中不能盲目吞掉中断,如果不在方法签名中声明,也不在 catch 语句块中再次恢复中断,而是在 catch //中不作处理,我们称这种行为是“屏蔽了中断请求”。如果我们盲目地屏蔽了中断请求,会导致中断信号被完全忽略,最终导致线程无法正确停止。

                //正确的处理方式是
                Thread.currentThread().interrupt();
                e.printStackTrace();    


            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new StopThread());
        thread.start();
        Thread.sleep(5000);
        
        //主线程休眠 5 毫秒后,通知子线程中断
        thread.interrupt();
    }
}

如果我们在面试中被问到“你知不知道如何正确停止线程”这样的问题,我想你一定可以完美地回答了,首先,从原理上讲应该用 interrupt 来请求中断,而不是强制停止,因为这样可以避免数据错乱,也可以让线程有时间结束收尾工作。

而 volatile 这种方法在某些特殊的情况下,比如线程被长时间阻塞的情况,就无法及时感受中断,所以 volatile 是不够全面的停止线程的方法。


thread.join()


thread1.start();


thread2.start();
thread2.join();

thread3.start();

//上面的代码意思是 必须等thread2执行结束之后才能继续往下执行thread3。但是,在执行thread2的过程中可以执行thread1(往前执行线程)。

#### thread.yield()

使线程暂停并允许执行其他线程


当多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行问题,也不需要进行额外的同步,而调用这个对象的行为都可以获得正确的结果,那这个对象便是线程安全的。

如果某个对象是线程安全的,那么对于使用者而言,不需要考虑不能同时写入或读写不能并行的问题,也不需要考虑任何额外的同步问题,比如不需要额外自己加 synchronized 锁,那么它才是线程安全的,可以看出对线程安全的定义还是非常苛刻的。

在多线程下,CPU 的调度是以时间片为单位进行分配的,每个线程都可以得到一定量的时间片。但如果线程拥有的时间片耗尽,它将会被暂停执行并让出 CPU 资源给其他线程,这样就有可能发生线程安全问题。

因为 students 这个成员变量是在构造函数中新建的线程中进行的初始化和赋值操作,而线程的启动需要一定的时间,但是我们的 main 函数并没有进行等待就直接获取数据,导致 getStudents 获取的结果为 null,这就是在错误的时间或地点发布或初始化造成的线程安全问题。

线程安全问题主要有 3 种,i++ 等情况导致的运行结果错误,通常是因为并发读写导致的,第二种是对象没有在正确的时间、地点被发布或初始化,而第三种线程安全问题就是活跃性问题,包括死锁、活锁和饥饿。

“检查与执行”并非原子性操作,在中间可能被打断,而检查之后的结果也可能在执行时已经过期、无效,换句话说,获得正确结果取决于幸运的时序。这种情况下,我们就需要对它进行加锁等保护措施来保障操作的原子性。

但多线程之间则需要调度以及合作,调度与合作就会带来性能开销从而产生性能问题。

Java 程序中的线程与操作系统中的线程是一一对应的

线程池用一些固定的线程一直保持工作状态并反复执行任务。线程池会根据需要创建线程,控制线程的总数量,避免占用过多内存资源。

我们可以看到实际上任务进来之后,线程池会逐一判断 corePoolSize、workQueue、maximumPoolSize,如果依然不能满足需求,则会拒绝任务

因为它们是核心线程,即便未来可能没有可执行的任务也不会被销毁。随着任务量的增加,在任务队列满了之后,线程池会进一步创建新线程,最多可以达到 maximumPoolSize 来应对任务多的场景,如果未来线程有空闲,大于 corePoolSize 的线程会被合理回收。所以正常情况下,线程池中的线程数量会处在 corePoolSize 与 maximumPoolSize 的闭区间内。

线程池只有在任务队列填满时才创建多于 corePoolSize 的线程,如果使用的是无界队列(例如 LinkedBlockingQueue),那么由于队列不会满,所以线程数不会超过 corePoolSize。

通过设置 corePoolSize 和 maximumPoolSize 为相同的值,就可以创建固定大小的线程池。

通过设置 maximumPoolSize 为很高的值,例如 Integer.MAX_VALUE,就可以允许线程池创建任意多的线程。

比如新建一个线程池,使用容量上限为 10 的 ArrayBlockingQueue 作为任务队列,并且指定线程池的核心线程数为 5,最大线程数为 10,假设此时有 20 个耗时任务被提交,在这种情况下,线程池会首先创建核心数量的线程,也就是5个线程来执行任务,然后往队列里去放任务,队列的 10 个容量被放满了之后,会继续创建新线程,直到达到最大线程数 10。此时线程池中一共有 20 个任务,其中 10 个任务正在被 10 个线程执行,还有 10 个任务在任务队列中等待,而且由于线程池的最大线程数量就是 10,所以已经不能再增加更多的线程来帮忙处理任务了,这就意味着此时线程池工作饱和,这个时候再提交新任务时就会被拒绝。

ExecutorService service = Executors.newCachedThreadPool();

ScheduledExecutorService service = Executors.newScheduledThreadPool(10);

SingleThreadExecutor 这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景,而前几种线程池不一定能够保障任务的执行顺序等于被提交的顺序,因为它们是多线程并行执行的。

比如说主任务需要执行非常繁重的计算任务,我们就可以把计算拆分成三个部分,这三个部分是互不影响相互独立的,这样就可以利用 CPU 的多核优势,并行计算,然后将结果进行汇总。这里面主要涉及两个步骤,第一步是拆分也就是 Fork,第二步是汇总也就是 Join,到这里你应该已经了解到 ForkJoinPool 线程池名字的由来了。

你可以看到 ForkJoinPool 线程池和其他线程池很多地方都是一样的,但重点区别在于它每个线程都有一个自己的双端队列来存储分裂出来的子任务。ForkJoinPool 非常适合用于递归的场景,例如树的遍历、最优路径搜索等场景。

线程池内部的任务队列,作为一种缓冲机制,线程池会把当下没有处理的任务放入任务队列中,
由于多线程同时从任务队列中获取任务是并发场景,此时就需要任务队列满足线程安全的要求,所以线程池中任务队列采用 BlockingQueue 来保障线程安全。
任务,任务要求实现统一的接口,以便工作线程可以处理和执行。

线程池内部的阻塞队列类型

用数组实现的,在新建对象的时候要求传入容量值,且后期不能扩容

存放任务没有容量的限制,也叫无界队列,所以对应的线程池只会创建核心线程数量的线程,所以此时的最大线程数对线程池来说没有意义,因为并不会触发生成多于核心线程数的线程。

容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。
线程数可以无限扩展,一旦有任务被提交就直接转发给线程或者创建新线程来执行,而不需要另外保存它们。
对应的线程池中最大线程数是 Integer 的最大值,可以理解为线程数是可以无限扩展的

它对应的线程池分别是 ScheduledThreadPool 和 SingleThreadScheduledExecutor,这两种线程池的最大特点就是可以延迟执行任务,比如说一定时间后执行任务或是每隔一定的时间执行一次任务。
而延迟队列正好可以把任务按时间进行排序,方便任务的执行。
内部的任务会按照延迟的时间长短排序,内部采用的是“堆”的数据结构。
它同时也是一个无界队列

自动创建线程池先天不足

自己手动创建线程池会更好,因为我们可以更加明确线程池的运行规则,不仅可以选择适合自己的线程数量,更可以在必要的时候拒绝新任务的提交,避免资源耗尽的风险。

我们调整线程池中的线程数量的最主要的目的是为了充分并合理地使用 CPU 和内存等资源,从而最大限度地提高程序的性能。

合适的线程数量是多少?CPU 核心数和线程数的关系?

线程数 = CPU 核心数 *(1 + 平均等待时间/平均工作时间)

综上所述我们就可以得出一个结论:

线程复用原理

我们知道线程池会使用固定数量或可变数量的线程来执行任务,但无论是固定数量或可变数量的线程,其线程数量都远远小于任务数量,面对这种情况线程池可以通过线程复用让同一个线程去执行不同的任务。

public Object take() throws InterruptedException {
       lock.lock();
       try {
           while (queue.size() == 0) {
               notEmpty.await();
           }
           Object item = queue.remove();
           notFull.signalAll();
           return item;
       } finally {
           lock.unlock();
       }
   }

这里需要注意,我们在 take() 方法中使用 while( queue.size() == 0 ) 检查队列状态,而不能用 if( queue.size() == 0 )。为什么呢?大家思考这样一种情况,因为生产者消费者往往是多线程的,我们假设有两个消费者,第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常,这不符合我们的逻辑。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。

另外一个线程因为没有拿到锁而卡在被唤醒的地方,拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常。

执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况

上一篇 下一篇

猜你喜欢

热点阅读