浅析 OkHttp 调度器 Dispatcher
概述
OkHttp 支持异步发起请求,可以不需要使用者自己创建线程池管理异步请求,它有内置实现。
比如这边发起一个异步请求
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://publicobject.com/helloworld.txt")
.build();
client.newCall(request).enqueue(new Callback() {
...
});
跟踪 RealCall 中的 enqueue 代码中:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
最后可以看到使用内部的一个 Dispatcher 对象来执行请求。在进入调度器前,会被包装成一个 NameRunnable 的实现类 AsyncCall,这个实现类也是 RealCall 的一个内部类,可以拿到 RealCall 的引用和各种资源。在 AsyncTask 的 execute 中,会调用 OkHttp 的拦截器栈,传入 Request,返回 Response。
final class AsyncCall extends NamedRunnable {
...
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
...
} catch (IOException e) {
...
} finally {
client.dispatcher().finished(this);
}
}
}
而 NameRunnable 是 Runnable 的实现类,在 Runnable 中增加了一个功能,在任务执行期间修改了线程名为任务名,任务执行结束后再恢复旧的线程名。这样在 DDMS 查看线程的时候,我们就可以很方便地查阅到正在运行的线程是在执行哪个任务。
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
这个请求在什么时候执行,使用哪个线程执行,调用了 Dispatcher 的 enqueue 方法后由 Dispatcher 正式接管。
client.dispatcher().enqueue(new AsyncCall(responseCallback));
线程池的配置
为了实现请求的并发,Dispatcher 配置了一个线程池,具体如下:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看到 OkHttp 的线程池配置如下:
- 核心线程数:0
- 最大线程数:
Iteger.MAX_VALUE
,其实就是不限制 - 空闲线程保活时间:60s
- 任务队列:SynchronousQueue,为有界队列,且队列大小为 0,不存 Runnable,只是用来进行生产者和消费者之间的传递任务。
是不是很眼熟?和我们在 Executors 使用的 缓存线程池的配置完全一样,缓存线程池的代码如下:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
所以 OkHttp 自己创建的这个线程池,和缓存线程池的区别,仅仅是修改了线程名。
我们简要分析一下缓存线程池的运行机制,进入到 ThreadPoolExecutor 中的 execute
方法的源码中,假设线程池处于运行状态,会有这样的流程:
- 新过来一个任务,由于核心线程数为 0,不需要创建核心线程,所以尝试加入任务队列
- 如果线程池中有线程刚好空闲可以接收任务,因为任务队列的类型是 SynchronousQueue,实际大小为 0,只做消费者和生产者的中转站,所以这时候,就可以入列成功,通过 SynchronousQueue 中转任务给空闲的线程执行。
- 如果当前线程池所有线程工作饱和,会入列失败。这时候 ThreadPoolExecutor 会调用
addWorker(command, false)
来创建并且启动新线程。
这样子,就会产生疑问,那么如果一次性同时发起大量的请求,不就会产生大量线程了?比如我一次性发起 100 个请求,那么是不是会发出 100 个线程?
是的,如果直接使用该线程池而没有其他的策略的话,是有这样的问题。
所以在 OkHttp 中线程池只是一个辅助作用,仅仅是用来做线程缓存,便于复用的。真正对这些请求的并发数量限制,执行时机等等都是调度器 Dispatcher 承担的。在 OkHttp 这里,线程池只是个带缓存功能的执行器,而真正的调度是外部包了一个调度策略的。
调度策略
这里简单说明一下 OkHttp 的请求的具体调度策略。
首先,是线程数量的约束。会一次性同时并发大量请求吗?不会的,它做了这样的限制:
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
最大并发数 maxRequests 默认为 64,但个域名最大请求数 maxRequestsPerHost 默认为 5 个。所以极端情况下,才会开启 64 个线程。这个场景非常罕见。
这两个约束是如何做到的呢?
在执行异步请求 enqueue
的时候有这样处理:
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
可以看到,在并发执行的请求数量小于 64,并且当前域名的请求书小于 5 的情况下,才会调用 executorService().execute(call);
。因为是缓存线程池,有空闲线程直接执行,没有的话创建新的线程执行。
如果超过了约束,则会放入一个等待队列 readyAsyncCalls 中:
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
那么,等待队列中的请求什么时候会执行呢?
回到之前提到的 AsyncCalls 中,在 execute 结束的时候会再调用一下 Dispatcher 的 finished 方法
client.dispatcher().finished(this);
跟踪代码,可以看到会执行到 promoteCalls 方法,和 enqueue 的方法一样,也是按照 maxRequests 和 maxRequestsPerHost 的约束,执行等待队列中的任务。
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
所以,OkHttp 的调度器默认配置下,在保证每个域名可以快速响应请求的情况下,还限制了每个域名的并发数和总体的并发数。实际每个应用的请求用到的域名都不多。线程池仅仅做线程缓存功能,调度策略应该外部自己去实现。
自定义配置
OkHttp 默认最大并发数 64,单域名最大并发 5,但这个是灵活的,是允许使用者去按照自己的复杂场景做相应的配置。比如某个应用才一个域名,但请求非常频繁,就可以调整 Dispatcher 的配置,比如设置单域名最大并发 10,最大并发 10。然后把自己配置的结果在 OkHttpClient 构造的时候传入即可:
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(10);
dispatcher.setMaxRequestsPerHost(10);
OkHttpClient client = new OkHttpClient.Builder()
...
.dipatcher(dipatcher)
.build();
...
又或者,本来应用线程就很多很紧张了,然后请求的域名也挺多有几十个,网络请求的优先级比较低,就可以适当的把最大并发数调低等等,具体情况具体分析,按需配置。
一般情况下,OkHttp 提供的默认配置已经足够,不需要使用者再折腾了。