okhttp源码解析--dispatcher
1. dispatcher到底是什么、做了什么?
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
通过上一篇的学习了解到不论是同步还是异步请求,最终调用的都是dispatcher中的方法,并分别将请求线程AsyncCall添加到异步队列(readyAsyncCalls、runningAsyncCalls)
和同步队列(runningSyncCalls)
中.
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
executorService()
创建了一个线程池,在通过上面两张代码,不难看出Dispatcher
主要工作就是维护请求队列和通过线程池来执行网络请求
1.1 Dispatcher的readyAsyncCalls、runningAsyncCalls等队列是怎样进行控制的?
从上一段代码中可以看到同步方法executed
很简单直接将请求添加到同步队列中(runningSyncCalls)
。异步请求也只有几行代码,判断了下是否小于最大请求数(maxRequests)
和小于正在进行中的网络请求最大值(maxRequestsPerHost)
,是--就添加到进行中异步队列(runningAsyncCalls)
并执行线程池,否者就添加到异步等待队列(readyAsyncCalls)
。
看到这里我就产生了一个疑问,这里只是对消息队列进行了简单的添加,并没有和我们的线程池产生联系啊!不着急,我们继续。上一段代码我们以异步请求enqueue
方法继续分析 :当请求满足条件时执行了
runningAsyncCalls.add(call);
executorService().execute(call);
这两步操作:第一步添加到执行中的异步请求队列(runningAsyncCalls)
,第二步线程池直接执行当前网络请求(注意executorService()
返回线程池、AsyncCall(继承自NamedRunnable)
就是一个线程哦,这俩在上一篇中已介绍过就不多说了)
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
当线程池执行网络请求时,实际就是执行了子线程的run()
方法,AsyncCall
继承的NamedRunnable
就是一个Runnable
,在它的run(
)方法中执行了它的抽象方法execute()
。
我们继续到AsyncCall
中去看看execute()
的具体实现:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
retryAndFollowUpInterceptor.isCanceled()
首先判断请求是否取消,
取消就调用 responseCallback
的onFailure(...)
失败方法,
否则responseCallback
的onResponse(...)
返回数据。这样一个异步请求就完成了。
最终在finlly
看到了这段代码client.dispatcher().finished(this)
调用了dispatcher的finished
方法
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
首先将执行完成的网络请求从runningSyncCalls
中移除,第二步通过promoteCalls判断是否是异步请求,是就执行promoteCalls()
:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
首先判断请求进行中的异步队列(runningAsyncCalls)
达到最大数量就return,异步等待队列(readyAsyncCalls)
是为空就return。
然后遍历异步等待队列(readyAsyncCalls)
:
-
判断现在最大请求数是否达到,没到就将请求从异步等待队列
(readyAsyncCalls)
移除,并添加到异步队列(runningAsyncCalls)
执行线程池,就又开始了下一次网络请求。 -
在此判断请求是否到达了最大值,因为在上面可能刚刚添加了一个网络请求
这样队列中的请求就完成了一个循环。队列也和线程池联系了起来
简单的总结一下:
executorService().execute(call)-->AsyncCall执行execute()-->dispatcher().finished(this)--> executorService().execute(call)