Okhttp主流程源码浅析(2)
上一篇Okhttp主流程源码浅析(1)分析到任务调度方面,接着把剩下的主流程分析.
当一个任务被执行起来,会调用getResponseWithInterceptorChain()
来获取到响应结果 response
//TODO: 责任链模式,拦截器链 执行请求
//TODO: 拿到回调结果
Response response = getResponseWithInterceptorChain();
getResponseWithInterceptorChain()源码:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
//TODO 责任链
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
//TODO 处理重试与重定向
interceptors.add(retryAndFollowUpInterceptor);
//TODO 处理 配置请求头等信息
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//TODO 处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
//TODO 设置请求头
interceptors.add(new CacheInterceptor(client.internalCache()));
//TODO 连接服务器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//TODO 执行流操作(写出请求体、获得响应数据)
//TODO 进行http请求报文的封装与请求报文的解析
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
创建List<Interceptor> interceptors = new ArrayList<>()
集合来保存拦截器,平时我们自己给OkHttpClient添加的拦截器也是在这里处理.
例如添加log 拦截器:
//添加log拦截器
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient.Builder builder = new OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(15, TimeUnit.SECONDS)
.writeTimeout(15, TimeUnit.SECONDS)
.retryOnConnectionFailure(true)
.addInterceptor(interceptor);
这些拦截器保存在OkHttpClient实例里面,到这里会调用interceptors.addAll(client.interceptors())
把我们的拦截器先添加到集合里面,然后再添加一些默认的拦截器来处理请求与响应
责任链的拦截器集合内容如下:
1.自定义拦截器
2.retryAndFollowUpInterceptor: 重试与重定向拦截器
3.BridgeInterceptor: 处理配置请求头等信息
4.CacheInterceptor: 处理缓存信息
5.ConnectInterceptor: 连接服务器
6.CallServerInterceptor: http报文封装与解析,与服务器执行流操作
责任链调用顺序图:
责任链调用顺序图:调用顺序:
当发起一个请求,首先会进入到自定义拦截器里面,然后再依次进入okhttp默认的各个拦截器里面,最终连接到服务器,进行流的读写, 拿到服务器返回的响应结果,再反过来,依次的回调到调用者.
在上面的getResponseWithInterceptorChain()方法里面,把拦截器添加完成之后,会执行如下代码:
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
创建RealInterceptorChain realInterceptorChain = new RealInterceptorChain
,然后开始执行责任链,调用chain.proceed(originalRequest)把请求传入进去,看下RealInterceptorChain的proceed方法
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
//省略代码
...
//TODO: 创建新的拦截链,链中的拦截器集合index+1
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
//TODO: 执行当前的拦截器 默认是:retryAndFollowUpInterceptor
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
//省略代码
...
return response;
}
删除了部分判断的代码,这里看重点,在proceed()里面看到,会再创建一个新的对象RealInterceptorChain next = new RealInterceptorChain(...)并且把拦截器集合通过构造方法传入,还有一些其他信息.
注意: 这里的会把 index + 1再传入进去,下面会用到
然后再从拦截器集合里面拿到一个拦截器interceptors.get(index)
,并且去执行这个拦截器的方法拿到响应结果Response response = interceptor.intercept(next);
回到上面看下调用顺序图,如果我们有给okhttpClient加入自定义拦截器,这里就先调用自定义拦截器,否则就开始执行第一个默认的拦截器retryAndFollowUpInterceptor
看下retryAndFollowUpInterceptor.intercept(next)方法
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
//TODO 核心 协调连接、请求/响应以及复用
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
//TODO 执行责任链 实际上就是下一个拦截器
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
} catch (IOException e) {
} finally {
}
...删除部分代码
return response;
}
}
这里会StreamAllocation streamAllocation = new StreamAllocation(...)
创建一个StreamAllocation对象,并且把连接池,和请求地址信息,call等等通过构造传入,这里只关心主流程,StreamAllocation的作用不说了
然后开了个while (true) {}
循环,在里面会调用response = realChain.proceed(request, streamAllocation, null, null);
继续执行责任链的下一个拦截器,这个realChain就是上面retryAndFollowUpInterceptor.intercept(next)
传入的这个next,就是上面提到的那个RealInterceptorChain对象,注意这个时候的index = 1然后继续执行proceed()方法
这里又回到了前面RealInterceptorChain.proceed()
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
//省略代码
...
//TODO: 创建新的拦截链,链中的拦截器集合index+1
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
//TODO: 执行当前的拦截器 默认是:retryAndFollowUpInterceptor
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
//省略代码
...
return response;
}
这里的代码有点绕,第一次看有点晕头转向的感觉~~
没错,这里又回到上面的地方,不过有两点不同了
1.第一次进来的时候index = 0,streamAllocation = null
2.这一次index = 1,streamAllocation刚刚被 new 出来
然后再次创建一个新的RealInterceptorChain对象,把信息传入构造.
注意: 这个时候 index = 1, 所以传入next 里面的 index = 1+1
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
然后再调用Interceptor interceptor = interceptors.get(index);
从拦截器里头取下一个拦截器,因为index=1,这个时候取出的是BridgeInterceptor拦截器(处理配置请求头等信息),执行Response response = interceptor.intercept(next);
看下BridgeInterceptor拦截器的intercept()方法
@Override
public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//TODO 执行下一个拦截器
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
代码挺多,但是逻辑也清晰,就是为我们的请求,按照http协议添加一些请求头信息,为后面使用 socket 写出数据做准备
配置一些请求头信息 ,例如:
1.Content-Type 请求中的媒体类型信息
2.Content-Length 请求体body的长度
3.Host 请求域名
4.Connection: "Keep-Alive" 是否保持长连接
5.Cookie 等等....
关于http的内容,大伙可以上网去搜一下对应的文章了解一下..
然后调用Response networkResponse = chain.proceed(requestBuilder.build());
跟上面一样,还是会回到RealInterceptorChain.proceed()方法里面,再次创建一个新的RealInterceptorChain对象,并且把 index+1,然后继续执行一下个拦截的intercept(next)方法,这里不贴出相同的源码了.然后会执行到CacheInterceptor拦截器
看一下CacheInterceptor.intercept(next)
@Override public Response intercept(Chain chain) throws IOException {
//TODO request对应缓存的Response
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
//TODO 执行响应缓存策略
long now = System.currentTimeMillis();
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
//TODO 缓存无效 关闭资源
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
//TODO 如果禁止使用网络,并且没有缓存,就返回失败
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.
//TODO 不使用网络请求直接返回响应
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
//TODO 执行下一个拦截器
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.
//TODO 如果存在缓存 更新
if (cacheResponse != null) {
//TODO 304响应码 自从上次请求后,请求需要响应的内容未发生改变
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//TODO 缓存Response
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
这个拦截器是处理缓存相关,具体细节这里不管,只分析主流程...
不过从注释可以清晰的看出一些大概的逻辑
1.从缓存里面取出响应结果Response cacheCandidate
2.判断缓存是否失效,如果失效就关闭资源
3.判断如果禁止使用网络,并且没有缓存,就返回失败
等等....
然后又继续调用networkResponse = chain.proceed(networkRequest);
执行下一个拦截器ConnectInterceptor
ConnectInterceptor.intercept(Chain chain)
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//TODO 连接服务器/复用socket
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
//TODO 找到复用的 socket或者创建新的 socket,并且连接服务器,但是这里还没有把请求写出去
//TODO 然后执行下一个拦截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
这个拦截器负责连接服务器,会调用streamAllocation.newStream
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled,
doExtensiveHealthChecks);
//TODO HttpCodec 处理解析请求与响应的工具类
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
newStream()方法里面会调用findHealthyConnection()去寻找一个健康的可用的连接来进行socket连接
有关sockter连接部分还请大伙去查看对应的资料,这里只需要知道:
1.TPC/IP协议是传输层协议,主要规范数据如何在网络中传输
2.Socket则是对TCP/IP协议的封装和应用(程序员层面上)
3.Http 是应用层协议,解决如何包装数据,然后通过Socket发送到服务器
4.这里的RealConnection类是对Socket的封装,里面持有Socket的引用
5.这里的HttpCodec是处理解析请求与响应的工具类
回到上面,通过调用streamAllocation.newStream(...)
连接之后,拿到一个HttpCodec,然后继续执行下一个拦截器,也就是最后的拦截器CallServerInterceptor,通过socket和服务器进行I/O流读写操作
CallServerInterceptor.intercept()
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
...
//TODO 把请求通过 socket 以http的协议格式,通过I/O流的形式写出去
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
...
//TODO 请求写出完成
httpCodec.finishRequest();
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
//TODO 处理响应
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
...
return response;
}
删除了很多代码.具体实现先不去看,如果大伙想知道,可以去搜索对应方面的具体细节文章...
这里了解到:
1.在CallServerInterceptor这个拦截器,实现Socket与服务器进行TCP连接,然后进行I/O数据流的通信.
2.发送请求, Socket是以http协议的格式,把请求拆分,请求行,请求头,请求体等等,一行一行写出去的
3.接受响应, Socket也是以http协议的格式,把响应结果一行一行的读出来,然后再解析
Socket通信例子:
例如 : 用浏览器模拟 GET 去请求获取天气的接口,然后拿到结果
image.png
image.png
怎样用Socket来实现请求呢?
java的jdk为我们提供了具体的实现Socket,直接创建使用就行
static void doHttp() throws Exception {
//创建一个Socket ,传入 域名和端口,http默认是80
Socket socket = new Socket("restapi.amap.com", 80);
//接受数据的输入流
final BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//发送数据 输出流
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
new Thread() {
@Override
public void run() {
while (true) {
String line = null;
try {
while ((line = br.readLine()) != null) {
System.out.println("recv :" + line);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
bw.write("GET /v3/weather/weatherInfo?city=%E9%95%BF%E6%B2%99&key=13cb58f5884f9749287abbead9c658f2 HTTP/1.1\r\n");
bw.write("Host: restapi.amap.com\r\n\r\n");
bw.flush();
}
1.创建一个Socket ,传入域名和端口,http默认是80
2.通过Socket 拿到接受数据的输入流和发送数据的输出流
3.按http协议规范,把请求写出去bw.write("GET /v3/weathe....
和bw.write("Host: restapi.amap.com\r\n\r\n");
4.接受服务器响应结果
打印结果:
recv :HTTP/1.1 200 OK
recv :Server: Tengine
recv :Date: Thu, 05 Jul 2018 09:47:14 GMT
recv :Content-Type: application/json;charset=UTF-8
recv :Content-Length: 253
recv :Connection: close
recv :X-Powered-By: ring/1.0.0
recv :gsid: 011178122113153078403472700229732459707
recv :sc: 0.010
recv :Access-Control-Allow-Origin: *
recv :Access-Control-Allow-Methods: *
recv :Access-Control-Allow-Headers: DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,
If-Modified-Since,Cache-Control,Content-Type,key,x-biz,x-info,platinfo,encr,enginever,gzipped,poiid
recv :
recv :{"status":"1","count":"1","info":"OK","infocode":"10000","lives":
[{"province":"广东","city":"深圳市","adcode":"440300","weather":
"云","temperature":"29","winddirection":"南","windpower":"5","humidity":"77",
"reporttime":"2018-07-05 17:00:00"}]}
响应也是一行一行读写出来
最后一个拦截器拿到服务器响应结果,然后再依次往上面的拦截器返回response,这个流程就不多说了,到此一次通信就完成...
小结:
okhttp采用责任链模式,每一个拦截器负责具体的每一块的功能,降低每个功能模块的耦合度,让整体框架更加灵活,我们可以轻松的加入自己自定义的拦截器.
okhttp维护着一个连接池,能做到相同Host的socket复用