源码分析->撕开OkHttp(10)高并发线程池

2020-11-10  本文已影响0人  杨0612
源码分析基于 3.14.4
关键字:OkHttp高并发线程池
OkHttp的线程池是怎么样的?

https://www.jianshu.com/p/3a1d5389b36a在这篇文章有提到,这个线程池很有意思:核心线程数为0,SynchronousQueue是一个无容量的集合,最大线程数为Integer.MAX_VALUE(但是OkHttp自己控制了请求最大并发数,所以不会达到这个值)。

public final class Dispatcher {
  ......
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
如何达到高并发的呢?

简单说下线程池执行流程:
(1)当线程数小于核心线程数,则创建线程执行该任务,任务执行完会取队列中的任务执行;
(2)当线程数大于核心线程数,则将任务添加到队列当中,如果队列没有满则添加成功;
(3)当队列满则添加失败,且线程数小于最大线程数,则创建线程执行该任务,,任务执行完会取队列中的任务执行;
(4)上述(3)无法创建线程执行任务,则执行抛弃策略;
具体细节可以看下https://www.jianshu.com/p/95bd9bfbe04d

我们来分析下OkHttp的线程池,假设当前线程池没有任务下:
(1)当往线程池添加任务,当前线程数>=核心线程数(0>=0),则将任务添加到队列中;
(2)因为SynchronousQueue是无容量的,所以任务添加失败(有一种情况是会成功的,后面会介绍);
(3)当前线程数小于Integer.MAX_VALUE(一般也很难到达这个值),所以创建线程马上执行该任务;
因为SynchronousQueue是无容量,而且最大线程数为Integer.MAX_VALUE,也不会出现任务需要等待或者被抛弃的情况,只要任务到达线程池,就会马上被执行,也就是高并发。

最大线程数为Integer.MAX_VALUE,会不会撑爆虚拟机?

答案是不会。上面有说到,因为OkHttp自己控制了请求最大并发数,所以不会达到这个值。

为什么核心线程数为0?

是为了让所有线程可以timeout,默认情况下,核心线程是不会timeout的,除非线程池shutdown了。不过后来我想了想,通过allowCoreThreadTimeOut可以让核心线程也timeout,这样核心线程数就可以设置大于0,不过这时候就得考虑,核心线程数设置多少才合适呢,所以还是直接设置0比较简单;

为什么使用SynchronousQueue?

常用的BlockingQueue有ArrayBlockingQueue、LinkedBlockingQueue;
ArrayBlockingQueue跟LinkedBlockingQueue一样,必须设置大于0的容量,所以必须会导致任务不能马上被执行,只有SynchronousQueue适用;

思考题
(1)test1()、test2()执行有什么不同?

test1()、test2()分别创建线程池来执行两个打印任务,不同的是,前者线程池核心为0,后者核心数为1,test2()会先打印run1再打印run2,这应该是无疑,那么test1()呢?
答案是跟test2()一样,估计很多人会有疑问,核心数为0,打印run1的任务不应该先放到队列中,而打印run2的任务无法放入队列且没超过最大线程数,则会创建线程执行吗?
大体方向没有错,但是有一个关键点,打印run1的任务放到队列后,会接着判断线程池线程是否为0,是则创建线程执行队列任务,所以打印run1的任务就会被先执行。

    public void test1() {
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1));
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("run1");
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("run2");
            }
        });
    }

    public void test2() {
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1));
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("run1");
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("run2");
            }
        });
    }
ThreadPoolExecutor.execute

以下是添加到队列的逻辑,workerCountOf(recheck) == 0条件成立,则addWorker创建线程执行队列任务;

  public void execute(Runnable command) {
      ......
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
(2)以下代码会出现什么问题?

test1()逻辑跟上述差不多,只不过改成死循环打印run1,那么打印run2会被执行吗?
答案是不会。因为打印run2的任务被放入队列以后,线程池有一个线程在工作,就不会创建线程来执行打印run2,必须等待打印run1的任务执行完。

 public void test1() {
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1));
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                while(true){
                    System.out.println("run1");
                }
            }
        });

        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("run2");
            }
        });
    }
(3)SynchronousQueue.offer一定返回false吗?

答案:不是。
情景1:
以下代码,因为SynchronousQueue是无容量的,所以offer一定返回false,从日志上也可以看出;

    public void test1() {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        new Thread() {
            @Override
            public void run() {
                boolean isSuccess = queue.offer("11111");
                System.out.println("isSuccess=" + isSuccess);
            }
        }.start();
    }

输出日志
isSuccess=false

请求2:
以下代码,SynchronousQueue.offer会返回true,因为在offer之前,线程1执行了take任务,并且处于阻塞等待,当有任务过来马上返回并且offer返回true;

 public void test1() {
        SynchronousQueue<String> queue = new SynchronousQueue<>();
        //线程1,take任务
        new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println("thread1");
                    String runnable = queue.take();//堵塞,等待有任务
                    System.out.println("runnable=" + runnable);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }.start();
        //主线程休眠100毫秒,为了保证线程1先执行
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //线程2,offer任务
        new Thread() {
            @Override
            public void run() {
                System.out.println("thread2");
                boolean isSuccess = queue.offer("123");
                System.out.println("isSuccess=" + isSuccess);
            }
        }.start();
    }

输出日志:
thread1
thread2
runnable=123
isSuccess=true
总结

(1)OkHttp利用这三个参数:核心线程数为0,SynchronousQueue是一个无容量的集合,最大线程数为Integer.MAX_VALUE,创建的线程池实现高并发;
(2)线程池核心线程数有时候设置0或1,效果是一样的;
(3)SynchronousQueue虽然是无容量的,但是offer不一定返回false,如果在offer之前,有线程处于take 阻塞状态,那么offer就返回true;

以上分析有不对的地方,请指出,互相学习,谢谢哦!

上一篇下一篇

猜你喜欢

热点阅读