深入Java线程(二)

2018-08-21  本文已影响10人  WilsonMing

为了理解可先看深入Java线程(一)内容在看本篇。

线程生命周期

线程生命周期

关于线程生命周期的不同状态,在 Java 5 以后,线程状态被明确定义在其公共内部枚举类型 java.lang.Thread.State 中,分别是:

public final native void wait(long timeout) throws InterruptedException;

线程安全基本特性

Monitor 实现的三种锁:

所谓锁的升级、降级,就是 JVM 优化 synchronized 运行的机制,当 JVM 检测到不同的竞争状况时,会自动切换到适合的锁实现,这种切换就是锁的升级、降级。

我注意到有的观点认为 Java 不会进行锁降级。实际上据我所知,锁降级确实是会发生的,当 JVM 进入安全点(SafePoint)的时候,会检查是否有闲置的 Monitor,然后试图进行降级


image.png
public class RepeLock implements Runnable {
    private Lock lock = new ReentrantLock();
    public void get() {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getId());
            set();
        } finally {
            lock.unlock();
        }
    }
    public void set() {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getId());
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void run() {
        get();
    }
    public static void main(String[] args) {
        RepeLock repeLock = new RepeLock();
        new Thread(repeLock).start();
        new Thread(repeLock).start();
        new Thread(repeLock).start();
    }
}

线程池

Executors 目前提供了 5 种不同的线程池创建配置:

线程池构造方法几个参数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
  1. corePoolSize : 该线程池中核心线程数最大值
    核心线程:线程池新建线程的时候,如果当前线程总数小于 corePoolSize ,则新建的是核心线程;如果超过corePoolSize,则新建的是非核心线程。核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。如果指定ThreadPoolExecutor的 allowCoreThreadTimeOut 这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间( keepAliveTime),就会被销毁掉.
  2. maximumPoolSize :该线程池中线程总数的最大值
    线程总数计算公式 = 核心线程数 + 非核心线程数。
  3. keepAliveTime :该线程池中非核心线程闲置超时时长
    注意:一个非核心线程,如果不干活(闲置状态)的时长,超过这个参数所设定的时长,就会被销毁掉。但是,如果设置了 allowCoreThreadTimeOut = true,则会作用于核心线程。
  4. unit :(时间单位)
    首先,TimeUnit是一个枚举类型,翻译过来就是时间单位,我们最常用的时间单位包括:
    MILLISECONDS : 1毫秒 、SECONDS : 秒、MINUTES : 分、HOURS : 小时、DAYS : 天
  5. BlockingQueue<Runnable> workQueue :( Blocking:阻塞的,queue:队列),主要有四种队列
    • SynchronousQueue:(同步队列)这个队列接收到任务的时候,会直接提交给线程处理,而不保留它(名字定义为 同步队列),所有该队列跟设置的corePoolSize无效。但有一种情况,假设所有线程都在工作怎么办?这种情况下,SynchronousQueue就会新建一个线程来处理这个任务。所以为了保证不出现(线程数达到了maximumPoolSize而不能新建线程)的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大,去规避这个使用风险。
    • LinkedBlockingQueue(链表阻塞队列):这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
    • ArrayBlockingQueue(数组阻塞队列):可以限定队列的长度(既然是数组,那么就限定了大小),接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误
    • DelayQueue(延迟队列):队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务
  6. ThreadFactory threadFactory = > 创建线程的方式,这是一个接口,new它的时候需要实现他的Thread newThread(Runnable r)方法
  7. RejectedExecutionHandler handler = > 这个主要是用来抛异常的
    当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。

以 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 为例,我们一起来分析一下,根据需求可以从很多方面考量:

ArrayBlockingQueue ,LinkedBlockingQueue ,SynchronousQueue

重点源码分享

okhttp

 public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

AsyncTask

 private static final BlockingQueue<Runnable> sPoolWorkQueue =
            new LinkedBlockingQueue<Runnable>(128);
    /**
     * An {@link Executor} that can be used to execute tasks in parallel.
     */
    public static final Executor THREAD_POOL_EXECUTOR;
    static {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
                sPoolWorkQueue, sThreadFactory);
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        THREAD_POOL_EXECUTOR = threadPoolExecutor;
    }
private static class SerialExecutor implements Executor {
        final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
        Runnable mActive;

        public synchronized void execute(final Runnable r) {
            mTasks.offer(new Runnable() {
                public void run() {
                    try {
                        r.run();
                    } finally {
                        scheduleNext();
                    }
                }
            });
            if (mActive == null) {
                scheduleNext();
            }
        }

        protected synchronized void scheduleNext() {
            if ((mActive = mTasks.poll()) != null) {
                THREAD_POOL_EXECUTOR.execute(mActive);
            }
        }
    }
上一篇下一篇

猜你喜欢

热点阅读