OkHttp原理解析1(框架流程篇)
一直想写一篇 简洁而不失内涵 的OKHTTP源码分析,甚至从19年春节前就开始翻阅OkHttp的源码。但是赶上春节事多心杂,没能将心中所想梳理出来。
现在疫情当前,节约了外出活动的时间,静心打磨了此文,希望对看本文的小伙伴有所帮助,更希望 武汉坚强,祖国安康。
本文源码基于OkHttp3.14.6,该版本是Java版最新的一版,后续的4.*全面使用了Kotlin,如有需要可再进行分析。
针对OkHttp我打算开两篇进行分析。
第1篇分析整体的框架设计,以及大方向的请求流程。
第2篇分析具体拦截器的功能逻辑。
好,我们开始表演。
在翻看源码之前,对OkHttp的了解只停留在使用层面,对Api使用非常的6。完全没关心过内部原理。更别提设计思想了。
有句 德玛西亚 名言咋说来着 新手拼的是英雄,技能,熟练度。高玩靠的是走位,意识,英雄池。
这句话你品,你细品~~~
本文争取以一个高玩的姿态去解读OkHttp的源码,至于有多高呢?
嗯~~~ 还是这么高。。。
image.png
我们再看源码的时候有一个技巧,不一定非要挨个类去逐行阅读,枯燥无味不说,关键是很难和功能相对应,可根据使用方式切入,根据逻辑调用链,层层跟进。
本文呢又是一篇深入浅出的题材,还是那迷人的姿势,还是那熟悉的套路。
从使用到原理,从架构到逻辑。
使用体现的是框架的能力,架构承载的是框架的灵魂。
我们将从以下几个问题出发,循循渐进,层层深入。
- OkHttp有什么优势?为啥要分析的是OkHttp而不是其他框架?
- OkHttp的使用方式有哪些?
- OkHttp的架构是什么样的?
- OkHttp的内部是咋实现的?
1. OkHttp有什么优势?为啥要分析的是OkHttp而不是其他框架?
其实Android的网络框架有很多,比如Android-Async-Http,Volley,OkHttp,Retrofit,那为啥偏要分析OkHttp呢?因为它屌啊~~~
我们先看一个这个对比图就明白了。
对比分析 | Android-Async-Http | volley | OkHttp | Retrofit |
---|---|---|---|---|
技术分析 | 基于HttpClient | 基于HttpUrlConnection | 基于Socket, 和HttpUrlConnection同级但并不属于HttpUrlConnection | 基于Okhttp |
优势 | 自动智能请求重试 ;持久化cookie存储 | 1.支持图片加载;网络请求的排序。 2.优先级处理缓存。 3.多级别取消请求。 4.生命周期控制,退出后自动取消请求。 5.可拓展性好;可支持HttpClient、HttpUrlConnection、和OkHttp |
1.高性能Http请求库。 2.支持SPDY,共享同一个Socket来处理同一个服务器所有的请求. 3.支持http2.0、websocket;支持同步、异步。 4.内部封装了线程池、数据转换、参数使用、错误处理等。 5.支持GZIP来减少数据流量。 6.基于NIO和OKio,性能更高。 |
1.Api牛逼 2.支持通过注解配置参数,url等信息。 3.支持Gson,Jackson,Protobuf。 4.支持RxJava。 5.代码简化;解耦彻底。 |
劣势 | Android5.0之后不推荐用HttpClient ,并且该框架通知维护。 | Volley的Request和Response都是把数据放到byte数组里,不支持输入输出流,把数据放到数组中,如果文件多了,数组就会大,消耗内存 | ... | ... |
目前相对主流的框架Retrofit,Glide中都是内置了OkHttp,而Retrift自己即是网络框架,且它都基于OkHttp,可见OkHttp是怎样一个地位。
2. OkHttp的使用方式有哪些?
接下来将秀8种操作,看好了您嘞。
- 1.同步Get请求
- 2.异步Get请求
- 3.同步Post请求
- 4.异步Post请求
- 5.Post提交文件
- 6.Post提交from表单
- 7.文件字符串复合提交
- 8.拦截器使用
/**
* 1.同步Get请求
* 2.异步Get请求
* 3.同步Post请求
* 4.异步Post请求
* 5.Post提交文件
* 6.Post提交from表单
* 7.文件字符串复合提交
* 8.拦截器使用
*/
OkHttpClient okHttpClient = new OkHttpClient();
/**
* 同步Get请求
*/
private void SyncGet() {
try {
Request request = new Request.Builder()
.url("url")
.build();
Call call = okHttpClient.newCall(request);
Response response = call.execute();
} catch (Exception e) {
}
}
/**
* 异步Gost请求
*/
private void AsyncGet() {
Request request = new Request.Builder()
.url("url")
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//请求失败
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//请求成功
}
});
}
/**
* 同步Post请求
*/
private void SyncPost() {
try {
MediaType JSON = MediaType.parse("application/x-www-form-urlencoded; charset=utf-8");
String str = "通信数据";
Request request = new Request.Builder().url("url").post(RequestBody.create(JSON, str)).build();
Call call = okHttpClient.newCall(request);
Response response = call.execute();
} catch (Exception e) {
}
}
/**
* 异步Post请求
*/
private void AsyncPost() {
MediaType JSON = MediaType.parse("application/x-www-form-urlencoded; charset=utf-8");
String str = "通信数据";
Request request = new Request.Builder().url("url").post(RequestBody.create(JSON, str)).build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//请求失败
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//请求成功
}
});
}
private void postFile() {
MediaType fileMediaType = MediaType.parse("text/x-markdown; charset=utf-8");
Request request = new Request.Builder()
.url("url")
.post(RequestBody.create(fileMediaType, new File("sd/mnt/a.png")))
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//请求失败
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//请求成功
}
});
}
/**
* Post提交from表单
*/
private void postFrom() {
MediaType fileMediaType = MediaType.parse("text/x-markdown; charset=utf-8");
Request request = new Request.Builder()
.url("url")
.post(new FormBody.Builder().add("key", "value").build())
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//请求失败
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//请求成功
}
});
}
/**
* Post提交组合数据(字符串+文件)
*/
private void postMultipartBody() {
MediaType fileMediaType = MediaType.parse("image/png");
RequestBody requestBody = RequestBody.create(fileMediaType, new File("sd/mnt/1.png"));
MultipartBody multipartBody = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addPart(
Headers.of("Content-Disposition", "form-data; name=\"title\""),
RequestBody.create(null, "文字")//这样可以直接添加数据,无需单独创建RequestBody
)
.addPart(
Headers.of("Content-Disposition", "form-data; name=\"image\""),
RequestBody.create(fileMediaType, new File("sd/mnt/1.png"))//这样可以直接添加文件,无需单独创建RequestBody
)
.addFormDataPart("key", "value")//添加表单数据
.addFormDataPart("file", "fileName", requestBody)
.build();
Request request = new Request.Builder()
.url("url")
.post(multipartBody)
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//请求失败
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//请求成功
}
});
}
/**
* 使用拦截器
*/
private void postAndInterceptor() {
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.addInterceptor(new MyInterceptor())
.build();
MediaType fileMediaType = MediaType.parse("image/png");
RequestBody requestBody = RequestBody.create(fileMediaType, new File("sd/mnt/1.png"));
MultipartBody multipartBody = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("key", "value")//添加表单数据
.addFormDataPart("file", "fileName", requestBody)
.build();
Request request = new Request.Builder()
.url("url")
.post(multipartBody)
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//请求失败
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//请求成功
}
});
}
/**
* 拦截器
*/
public class MyInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Response response = chain.proceed(request);
Log.d("TAG", "请求返回数据为:" + response.body().string());
return null;
}
}
3. OkHttp的架构是什么样的?
这个问题挺深入,唯有此图以示天下。
整体架构图.png上图主要以执行流程来划分,其主要涉及的类包含以下几个
- OkHttpClient 客户端对象
- OkHttpClient.Builder 以构件者模式创建OkHttpClient对象
- Request 请求对象
- HttpUrl url; 请求对象参数
- String method;请求对象参数
- Headers.Builder headers;请求对象参数
- RequestBody body;请求对象参数
- Map<Class<?>, Object> tags = Collections.emptyMap();请求对象参数
- Request.Builder 构建者模式用于创建Request
- Response 请求结果对象。
- Request request;返回对象参数
- Protocol protocol;返回对象参数
- int code;返回对象参数
- String message;返回对象参数
- Handshake handshake;返回对象参数
- Headers headers;返回对象参数
- ResponseBody body;返回对象参数
- Response networkResponse;返回对象参数
- Response cacheResponse;返回对象参数
- Response priorResponse;返回对象参数
- long sentRequestAtMillis;返回对象参数
- long receivedResponseAtMillis;返回对象参数
- Exchange exchange;返回对象参数
- RealCall implements Call 请求控制器,用于执行request
- Dispatcher 调度器,主要用于请求的策略处理。
- AsyncCall是一个Runnable,线程池调用。
- Callback 接收回调使用
- Interceptor 拦截器接口,所有的拦截器均实现该接口,责任链模式的每一个拦截器接口。
4. OkHttp的内部是咋实现的?
又到了你么最喜欢的讲道理摆姿势的环节了。
分析OkHttp的内部实现可以从使用角度出发,以使用逻辑分析框架入口,然后层层跟进。我们以普通的get请求为例,主要涉及以下几个入口。
-
1. OkHttpClient okHttpClient = new OkHttpClient();
-
2. Request request = new Request.Builder().url("url").build();
-
3. Call call = okHttpClient.newCall(request);
-
4. call.enqueue(new Callback() {... });
我们逐个分析一下。
4.1. OkHttpClient okHttpClient = new OkHttpClient();内部执行了哪些逻辑?我们看下源码
//OkHttpClient的构造函数
public OkHttpClient() {
this(new Builder());
}
//OkHttpClient的构造函数
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;
this.cookieJar = builder.cookieJar;
this.cache = builder.cache;
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = Util.platformTrustManager();
this.sslSocketFactory = newSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
if (sslSocketFactory != null) {
Platform.get().configureSslSocketFactory(sslSocketFactory);
}
this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.callTimeout = builder.callTimeout;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;
if (interceptors.contains(null)) {
throw new IllegalStateException("Null interceptor: " + interceptors);
}
if (networkInterceptors.contains(null)) {
throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
}
}
//OkHttpClient 中静态内部类Builder的构造方法
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
callTimeout = 0;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
//OkHttpClient 中静态内部类Builder的构造方法
Builder(OkHttpClient okHttpClient) {
this.dispatcher = okHttpClient.dispatcher;
this.proxy = okHttpClient.proxy;
this.protocols = okHttpClient.protocols;
this.connectionSpecs = okHttpClient.connectionSpecs;
this.interceptors.addAll(okHttpClient.interceptors);
this.networkInterceptors.addAll(okHttpClient.networkInterceptors);
this.eventListenerFactory = okHttpClient.eventListenerFactory;
this.proxySelector = okHttpClient.proxySelector;
this.cookieJar = okHttpClient.cookieJar;
this.internalCache = okHttpClient.internalCache;
this.cache = okHttpClient.cache;
this.socketFactory = okHttpClient.socketFactory;
this.sslSocketFactory = okHttpClient.sslSocketFactory;
this.certificateChainCleaner = okHttpClient.certificateChainCleaner;
this.hostnameVerifier = okHttpClient.hostnameVerifier;
this.certificatePinner = okHttpClient.certificatePinner;
this.proxyAuthenticator = okHttpClient.proxyAuthenticator;
this.authenticator = okHttpClient.authenticator;
this.connectionPool = okHttpClient.connectionPool;
this.dns = okHttpClient.dns;
this.followSslRedirects = okHttpClient.followSslRedirects;
this.followRedirects = okHttpClient.followRedirects;
this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure;
this.callTimeout = okHttpClient.callTimeout;
this.connectTimeout = okHttpClient.connectTimeout;
this.readTimeout = okHttpClient.readTimeout;
this.writeTimeout = okHttpClient.writeTimeout;
this.pingInterval = okHttpClient.pingInterval;
}
//通过静态内部类Builder的build()方法可创建 OkHttpClient 对象。
public OkHttpClient build() {
return new OkHttpClient(this);
}
我们分析下这一大段代码,其实逻辑内容非常简单,主要干了这么几件事。
- 设置了缓存、线程池、拦截器,超时等变量数据。
- OkHttpClient使用了构建者模式。
- new OkHttpClient()内部实际调用的是 new OkHttpClient(new Builder())。
- OkHttpClient还可通过new OkHttpClient.Builder().build();创建OkHttpClient对象。
- 5.为什么要使用构建者模式?当一个类的内部数据过于复杂的时候,要创建的话可能就需要了解这个类的内部结构以及相互关系等等。会大大提升框架的使用成本,因此创建的时候会有一个名为Builder的内部类模板,设置好了默认的值和逻辑关系。这种模板可以有多个,使得同样的构建过程可以创建不同的对象。使用户不了解内部逻辑的情况下也可以正常创建对象。大大降低的使用成本。
4.2. Request request = new Request.Builder().url("url").build();内部又干了啥?
从写法上看,Request 一样也使用了构建者模式,我们分开Request.Builder(),url("url"),build()一个一个看。
//Request构造函数
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tags = Util.immutableMap(builder.tags);
}
//Request的静态内部类Builder构造函数
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
//Request的静态内部类Builder构造函数
Builder(Request request) {
this.url = request.url;
this.method = request.method;
this.body = request.body;
this.tags = request.tags.isEmpty()
? Collections.emptyMap()
: new LinkedHashMap<>(request.tags);
this.headers = request.headers.newBuilder();
}
//Request的静态内部类Builder设置url方法
public Builder url(HttpUrl url) {
if (url == null) throw new NullPointerException("url == null");
this.url = url;
return this;
}
/**
* Request的静态内部类Builder设置url方法
*
* @如果url是无效的则抛出 throws IllegalArgumentException 通过调用HttpUrl.get(url)避免这种异常,
* 无效的url返回null
*/
public Builder url(String url) {
if (url == null) throw new NullPointerException("url == null");
// Silently replace web socket URLs with HTTP URLs.
if (url.regionMatches(true, 0, "ws:", 0, 3)) {
url = "http:" + url.substring(3);
} else if (url.regionMatches(true, 0, "wss:", 0, 4)) {
url = "https:" + url.substring(4);
}
return url(HttpUrl.get(url));
}
/**
* Request的静态内部类Builder设置url方法
*
* @throws IllegalArgumentException if the scheme of {@code url} is not {@code http} or {@code
* https}.
*/
public Builder url(URL url) {
if (url == null) throw new NullPointerException("url == null");
return url(HttpUrl.get(url.toString()));
}
// Request的静态内部类Builder设置build方法
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
我们总结下 new Request.Builder().url("url").build()干了几件事。
- 通过new Request.Builder()构造者模式设置了默认的请求方式GET,且通过headers的构建者创建了 headers。
- url()方法使用到了面向对象重载的方法,入参支持HttpUrl 、String 、URL三种类型。最终目的即是设置请求地址。
- .build()则通过以上设置的属性调用Request构造函数,创建Request对象。
4.3. Call call = okHttpClient.newCall(request);又是干什么的呢?
以上的4.1 , 4.2均是为了创建okHttpClient,Request对象,以及初始化一数据,并没有进行其他操作。
okHttpClient.newCall(request);又做了什么操作呢?
/**
* okHttpClient的newCall()
* 准备request,将在某个时间执行。
*/
@Override public Call newCall(Request request) {
//最后这个参数是否为web socket默认传false
return RealCall.newRealCall(this, request, false /* for web socket */);
}
/**
*RealCall.newRealCall(...)
*实例化RealCall对象,初始化RealCall.transmitter。
*/
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
/**
*RealCall(...)
*/
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
}
/**
*Transmitter类构造函数 。
*/
public Transmitter(OkHttpClient client, Call call) {
this.client = client;
this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
this.call = call;
this.eventListener = client.eventListenerFactory().create(call);
this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
}
总结:这一堆居然还是在创建对象,真正的请求还没有开始。
- 通过okHttpClient.newCall()调用了RealCall.newRealCall()并把Request,okHttpClient作为参数传过去。
- RealCall.newRealCall()中通过构造函数创建了RealCall和Transmitter对象。
- 3.Transmitter主要用于设置连接池,事件监听器,以及超时等待时间。
- 3.1. 注意Transmitter中这句 Internal.instance.realConnectionPool(client.connectionPool()); Internal.instance只在OkHttpClient有实现,因此也只会调用OkHttpClient中的实现接口,目的是拿到connectionPool连接池。
- 3.2. client.eventListenerFactory()获取的EventListener.Factory其实是在OkHttpClient的Builder()中通过 EventListener.factory(EventListener.NONE);创建的。默认传的EventListener是一个空的。
4.4. call.enqueue(new Callback() {... });应该开始请求了吧...
前边准备的所有准备均为了最后这一步请求,我们看下逻辑是怎么操作的。
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
//transmitter.callStart();
public void callStart() {
this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
eventListener.callStart(call);
}
这块有点复杂我们一点一点的分析,最后进行总结。
首先call.enqueue()中进行了执行状态赋值,防止重复执行请求。
其次调用了transmitter.callStart();启动堆栈的跟踪,以及eventListener的一些回调处理。
最后即是最关键的异步 client.dispatcher().enqueue(new AsyncCall(responseCallback));是真正的请求流程。
我继续深入分析
//Dispatcher.enqueue()
void enqueue(AsyncCall call) {
synchronized (this) {
//readyAsyncCalls是一个准备调用的AsyncCall队列
readyAsyncCalls.add(call);
// 如果不是WebSocket,将通过call.host查找是否有运行中的AsyncCall ,如果有将通过AtomicInteger类型共享到当前 AsyncCall 对象中。
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
//异步请求,OkHttp会对有相同主机的call在请求时进行记数,通过AtomicInteger对象进行即时同步。
//这个计数对后续的请求有影响,我们后边再详细分析。
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
//真正执行操作
promoteAndExecute();
}
//循环runningAsyncCalls和readyAsyncCalls队列
@Nullable private AsyncCall findExistingCallWithHost(String host) {
for (AsyncCall existingCall : runningAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
for (AsyncCall existingCall : readyAsyncCalls) {
if (existingCall.host().equals(host)) return existingCall;
}
return null;
}
/**
*readyAsyncCalls中满足条件的对象移动到runningAsyncCalls中 并且在 executor service上运行。
* 必须同步调用,因为要回调用户的代码
*
* 如果调度程序当前正在运行,则返回true
*/
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
//循环readyAsyncCalls队列。
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
//如果运行队列runningAsyncCalls超过了maxRequests直接break。(默认值为64)
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
//如果当前的主机计数器>5则continue,这个计数器就是上述enqueue()方法的计数器。
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
从readyAsyncCalls中移除
i.remove();
//相同主机的asyncCall计数器+1
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
//添加到runningAsyncCalls队列中
runningAsyncCalls.add(asyncCall);
}
//运行状态赋值,异步/同步只要有一个在运行则为true。
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
//开始运行,具体的运行逻辑我们后续分析。
asyncCall.executeOn(executorService());
}
return isRunning;
}
ok,我们总结下Dispatcher.enqueue()干了什么事
- 将AsyncCall添加到准备队列readyAsyncCalls。
- 通过AsyncCall对象查找runningAsyncCalls和readyAsyncCalls队列中是否想相同主机的AsyncCall,如果有则通过AtomicInteger对象将AsyncCall.callsPerHost引用到一起,方便后续的计数统计。
- 调用promoteAndExecute()循环readyAsyncCalls队列,判断runningAsyncCalls队列大于64或asyncCall.callsPerHost>5均终止添加到runningAsyncCalls队列中。
- asyncCall.callsPerHost计数器+1,AsyncCall添加到runningAsyncCalls队列中,从readyAsyncCalls中移除。启动for循环调用 asyncCall.executeOn(executorService());进行请求。
那asyncCall.executeOn(executorService());中又干了啥?
//Dispatcher.executorService()
//如果executorService 为null则创建一个ThreadPoolExecutor线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
/**
* AsyncCall.executeOn()
* 尝试通过 executorService启动执行AsyncCall
* 如果 executor已经关闭了则尝试清楚,并报失败
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
//线程池执行AsyncCall
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
//回调onFailure
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
//如果没成功,调用OkHttpClient.dispatcher().finished(this);
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
/**
* Dispatcher.finished()
*失败
*/
void finished(AsyncCall call) {
//AsyncCall 中相同主机计数器-1。
call.callsPerHost().decrementAndGet();
finished(runningAsyncCalls, call);
}
//Dispatcher.finished(...)
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
//从runningAsyncCalls中移除call。
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
//重新调用promoteAndExecute()
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
//如果同步异步均没有运行了并且idleCallback 不为空 则通知当前为空闲状态了。
idleCallback.run();
}
}
我们再总结下asyncCall.executeOn(executorService())到底干了啥。
- 通过executorService()创建了一个ThreadPoolExecutor线程池。
- 启动了executorService线程池。
- 如果线程池出现了异常则启动异常处理,通过 responseCallback回调onFailure
-
- 通过finally 关键字判断如果本次请求失败了则调用OkHttpClient.dispatcher().finished(this)
- 4.1. AsyncCall.callsPerHost 相同主机计数器-1。
- 4.2 . 从runningAsyncCalls中移除当前AsyncCall并重新调用promoteAndExecute()。
- 4.3 . 如果同步异步均没有运行了并且idleCallback 不为空 则通知当前为空闲状态了。
- executorService.execute(this);证明其实this为AsyncCall,类型为Runnable。
executorService.execute(this);是执行请求的逻辑内容,我们详细看下
executorService是一个线程池,而this代表的是AsyncCall这个类,现在其实可以发现AsyncCall实际上继承的是NamedRunnable,而NamedRunnable实现了Runnable,这下就简单了,我们直接找Runnable的run方法即可。
public abstract class NamedRunnable implements Runnable {
...
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//主要执行了这个方法,这个方法实际是一个抽象方法,需要看实现类的实现。
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
//AsyncCall.execute()
@Override
protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
//回调结果
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} catch (Throwable t) {
cancel();
if (!signalledCallback) {
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
responseCallback.onFailure(RealCall.this, canceledException);
}
throw t;
} finally {
//移除当前AsyncCall
client.dispatcher().finished(this);
}
}
我们观察以上代码大部分是异常处理的逻辑,而try中第一句即返回了Response对象,因此,我们主要看getResponseWithInterceptorChain()中的逻辑。
Response getResponseWithInterceptorChain() throws IOException {
// 建立一个完整的拦截器堆栈
List<Interceptor> interceptors = new ArrayList<>();
//将创建okhttpclient时的拦截器添加到interceptors
interceptors.addAll(client.interceptors());
//重试拦截器,负责处理失败后的重试与重定向
interceptors.add(new RetryAndFollowUpInterceptor(client));
//请求转化拦截器(用户请求转为服务器请求,服务器响应转为用户响应)
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存拦截器。负责
//1.根据条件,缓存配置,有效期等返回缓存响应,也可增加到缓存。
//2.设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
//3.可配置自定义的缓存拦截器。
interceptors.add(new CacheInterceptor(client.internalCache()));
//网络连接拦截器,主要负责和服务器建立连接。
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//创建okhttpclient时设置的networkInterceptors
interceptors.addAll(client.networkInterceptors());
}
//数据流拦截器,主要负责像服务器发送和读取数据,请求报文封装和解析。
interceptors.add(new CallServerInterceptor(forWebSocket));
//责任链模式的创建。
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
//启动责任链
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
其实上述逻辑中,主要干了3件事。
- 将所有拦截器添加到了list中
- 通过RealInterceptorChain创建责任链。
- 通过chain.proceed(originalRequest)启动执行。
我们只要搞懂创建责任链的逻辑,以及启动责任链的逻辑就全都明白了,ok,我们看下具体代码
//通过RealInterceptorChain构造函数创建每一个责任对象
public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
@Nullable Exchange exchange, int index, Request request, Call call,
int connectTimeout, int readTimeout, int writeTimeout) {
this.interceptors = interceptors;
this.transmitter = transmitter;
this.exchange = exchange;
this.index = index;
this.request = request;
this.call = call;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
}
//启动责任链代码
@Override
public Response proceed(Request request) throws IOException {
return proceed(request, transmitter, exchange);
}
//启动责任链代码
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// 如果我们已经有一个流,请确认传入的请求将使用它。
if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// 如果我们已经有一个流,确认这是对链的唯一调用。
if (this.exchange != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// 调用链中的下一个拦截器,注意参数index+1,通过+1的方式循环interceptors list中的拦截器
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
//执行当前拦截器逻辑,并设置下一个拦截器对象。
Response response = interceptor.intercept(next);
// 确认下一个拦截器对chain.proceed()进行了所需的调用。
if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
虽然看着上述这段代码很长,但是大篇幅都在判断各种异常情况,实际做的事非常简单。
1.创建完RealInterceptorChain后,通过procee()判断各种异常,并获取当前Interceptor对象。
2.通过Interceptor.intercept(RealInterceptorChain)启动当前拦截器逻辑,并且触发下一个拦截器启动。
3.如果当前拦截器出现异常等错误,则终止责任链。
具体的情况我们还需要看一个拦截器内部的逻辑,我们以简单的ConnectInterceptor为例。
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
//chain实际上是下一个责任对象。
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// 我们需要网络来满足这个要求。可能用于验证条件GET。
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
//执行下一个拦截器责任对象的proceed方法。
return realChain.proceed(request, transmitter, exchange);
}
}
可见,Interceptor.intercept(Chain chain)接收的是下一个拦截器责任对象。
该方法中执行了自己拦截器该有的逻辑,如果没异常则直接通过下一个拦截器责任对象的proceed()启动了下一个拦截器逻辑。
ok我们在回头看下,AsyncCall.execute()中的finally中的那句代码 client.dispatcher().finished(this);
//Dispatcher.finished()
void finished(AsyncCall call) {
call.callsPerHost().decrementAndGet();
finished(runningAsyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
//移除队列,如果失败则抛异常
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
//这个方法是不是有点眼熟,主要用来将readyAsyncCalls符合条件的添加到runningAsyncCalls中,并运行。
boolean isRunning = promoteAndExecute();
//闲置调用
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
ok至此所有的流程都非常清楚了。至于请求细节需要具体到了每一个拦截器里。我打算单独开一篇来分析。
看到此处,如果对你有一点帮助,麻烦给个赞鼓励一下。
其实写博客是一个自驱的学习管理方式。能给小伙伴说明白的,那你必定早已心中有数。翻阅伟大的框架代码就好比面对面跟这个伟人学习。