OKHttp源码分析(三)
上一章我们分析了
OKHttp
的请求过程,得到的结论是请求和返回的结果都是在Call
处理的,所以本章分析Call
的具体过程。
得到的Call call = okHttpClient.newCall(request);
我们看下源码:
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
可以看到,实际返回的是RealCall
,那我们继续看RealCall
分析:
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
// TODO(jwilson): this is unsafe publication and not threadsafe.
this.eventListener = eventListenerFactory.create(this);
}
这里也是初始化一些信息,包括请求的信息实例Request
、forWebSocket
连接、重连的拦截器RetryAndFollowUpInterceptor
和监听器EventListener
。下面我么首先看下异步请求(知道异步请求之后,同步请求其实也差不多)。
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
可以看到,只能初始化一次,否则会抛出异常。看最后一句,client.dispatcher().enqueue(new AsyncCall(responseCallback));
,我们看下dispatcher
是什么?
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
这里我们可能看的不太明白,我们看下它的一些变量和方法:
public final class Dispatcher {
//TODO 同时能进行的最大请求数
private int maxRequests = 64;
//TODO 同时请求的相同HOST的最大个数 SCHEME :// HOST [ ":" PORT ] [ PATH [ "?" QUERY ]]
//TODO 如 https://restapi.amap.com restapi.amap.com - host
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
* Ready async calls in the order they'll be run.
* TODO 双端队列,支持首尾两端 双向开口可进可出,方便移除
* 异步等待队列
*
*/
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/**
* Running asynchronous calls. Includes canceled calls that haven't finished yet.
* TODO 正在进行的异步队列
*/
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/**
* Running asynchronous calls. Includes canceled calls that haven't finished yet.
* TODO 正在进行的同步队列
*/
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
......
}
这里只贴出部分方法。可以看到,只是一个调度器,负责OkHttp
的线程的调用(包括异步运行和等待的线程,同步运行线程),然后executorService
是个线程池,用来运行线程。我们继续看,那么真正的线程应该是在AsyncCall
运行的,我们看下AsyncCall
的源码:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
AsyncCall
继承NamedRunnable
,我们看下NamedRunnable
是个什么东西?
/**
* Runnable implementation which always sets its thread name.
*/
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
NamedRunnable
就是一个线程,不过它设置了名字,然后调用的execute()
方法,而在AsyncCall
方法中,我们看到返回的结果了Response response = getResponseWithInterceptorChain()
,下面是请求失败或者异常的处理,最后分发器结束本次请求,所以我们的重点还是看getResponseWithInterceptorChain()
这个方法。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
可以看到,这里好多拦截器,把它们加入一个拦截器List
集合中,然后在RealInterceptorChain
中处理,最后返回结果,继续看RealInterceptorChain
。
**
* A concrete interceptor chain that carries the entire interceptor chain: all application
* interceptors, the OkHttp core, all network interceptors, and finally the network caller.
*/
public final class RealInterceptorChain implements Interceptor.Chain {
private final List<Interceptor> interceptors;
private final StreamAllocation streamAllocation;
private final HttpCodec httpCodec;
private final RealConnection connection;
private final int index;
private final Request request;
private int calls;
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
}
@Override public Connection connection() {
return connection;
}
public StreamAllocation streamAllocation() {
return streamAllocation;
}
public HttpCodec httpStream() {
return httpCodec;
}
@Override public Request request() {
return request;
}
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}
}
这里把RealInterceptorChain
的源码都贴出来了,因为这是OkHttp
设计最核心也是最重要的部分了。
我们可以看到,每次获取一个新的下一个的RealInterceptorChain
,Interceptor
从拦截器列表interceptors
获取一个新的interceptor
,然后执行他的intercept
方法,直到最后返回结果Response
。
要继续看下去的话我们必须要了解什么叫责任链模式,所以下章我们解释下。