十一、Java高级特性(阻塞队列和线程池)

2021-05-31  本文已影响0人  大虾啊啊啊

一、概念、生产者消费者模式

二、JDK中实现的阻塞队列

在JDK中BlockingQueue类封装了阻塞队列的接口
其中包含了几个核心的方法:

    boolean add(E e);
    boolean remove(Object o);
    boolean offer(E e);
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

E take() throws InterruptedException;
void put(E e) throws InterruptedException;

三、JDK中常用的阻塞队列

以上的阻塞队列都实现了BlockingQueue接口,也都是线程安全的。

在使用阻塞队列的时候,尽量使用有界的阻塞队列,有界的阻塞队列,规定了最大容量,当队列满的时候,往里面插入元素,会被阻塞。而无界的阻塞队列,可以一直往里插入元素,会占用内存,最终总会使得内存溢出。

缓存系统的设计

可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue。一旦可以从队列中查到元素,表示缓存有效期到了。

四、线程池

Java中的线程池是运用场景最多的并发框架。合理的使用线程池能够带来以下3个好处:
(1)降低资源的消耗:通过重复利用线程池中的线程,降低线程创建和销毁带来的开销。
(2)提高响应速度:当提交一个任务的时候,不需要等待线程的创建就能立即执行。
(3)提高线程的可管理性:在操作系统中,线程是非常稀缺的资源,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性。

1、JDK中线程池的实现

在JDK中使用ThreadPoolExecutor 作为线程池的核心类,用来执行被提交的任务。

ThreadPoolExecutor 的使用举例
package com.it.test.thread.consumer_product.retranlock;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LockTest {
    public static void main(String[] args) {
        LinkedBlockingQueue queue = new LinkedBlockingQueue();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 30, TimeUnit.MINUTES, queue, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        }, new ThreadPoolExecutor.DiscardOldestPolicy());
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":你好");
            }
        });
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":世界");
            }
        });
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":hello");

            }
        });
        executor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+":world");
            }
        });
        System.out.println("阻塞队列的长度:"+queue.size());
    }
}

pool-1-thread-1:你好
pool-1-thread-2:世界
pool-1-thread-2:world
pool-1-thread-1:hello
阻塞队列的长度:2
ThreadPoolExecutor 类的解析

(1)构造方法
构造方法中包含了7个核心参数

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;

线程池的工作机制

(1)如果运行的线程池少于核心线程数,提交任务的时候将创建新的线程来执行任务。
(2)如果运行的线程数等于核心线程数,提交任务的时候将会把任务存放到阻塞队列中(BlockingQueue)。
(3)如果阻塞队列满的时候,无法再添加,则创建新的线程来执行任务。
(4)如果创建的线程超过最大线程数,将会被拒绝。调用RejectedExecutionHandler.rejectedExecution()方法

提交任务

execute提交任务,不需要任务返回值。所以无法判断任务是否被线程池执行成功
submit提交任务,有返回值,返回一个future类型的对象。可以通过future对象判断任务是否执行成功,通过future的get方法,获取任务的返回值。在调用get方法的时候,会阻塞当前线程,直到任务完成为止。也可以在get方法设置返回时间。

关闭线程池

可以通过shutdown和shutdownNow来关闭线程池。

合理的配置线程池

合理配置线程池之前首先要分析任务的特性:
(1)CPU密集型
核心线程数 = Ncpu+1,
(2)IO密集型
核心线程数 = 2*Ncpu
(3)混合型
可以根据不同的任务类型对线程池进行拆分多个。

ThreadPoolExecutor源码分析
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
private boolean addWorker(Runnable firstTask, boolean core) {
      .......
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.getState() != Thread.State.NEW)
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        workerAdded = true;
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

(1)w = new Worker(firstTask);
创建一个Worker对象,在Worker构造方法中创建一个线程,传入Worker当前对象,Worker实现了Runnable接口

   Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

(2)添加Worker到HashSet中(HashSet<Worker> workers),添加成功则启动woker中的线程

 workers.add(w);
....
  if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }

因为Worker实现了Runnable接口,并且在创建线程的时候传入了当前Worker对象,所以启动线程的时候调用了Worker中的run方法

  public void run() {
            runWorker(this);
        }

接着调用runWorker方法

   final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker方法中,开启了一个while 循环,获取第一次传进的任务或者是
getTask(),阻塞队列中的任务。然后调用 task.run();这样就回调到了我们提交任务的时候实现的run方法。

源码小结

在ThreadPoolExecutor中用一个HashSet用来存放woker,而woker的构造方法中创建了线程,因此可以理解为woker存放了核心线程。当HashSet的长度等于核心线程数的时候,则将任务提交到阻塞队列。
在执行任务的时候获取当前提交的task或者从阻塞队列中取出task执行。

上一篇 下一篇

猜你喜欢

热点阅读