OkHttp3 流程分析(上)
OkHttp3有很多大佬分析过了 这里自己也写一篇加深记忆。
内容比较多,分为上下两篇八。
OKHttp 解析
基本流程
OkHttpClient client = new OkHttpClient(); //生成Client实例
String run(String url) throws IOException {
Request request = new Request.Builder() //生成request
.url(url)
.build();
try (Response response = client.newCall(request).execute()) {//生成Call并运行请求得到response
return response.body().string();
}
}
//上面的代码运行在一个线程中,因为execute是同步方法。自然也有一个异步调用方法
client.newCall(request).enqueue(Callback responseCallback);
下面我们就根据基本的流程分析一下OkHttp3。
Request
没什么好讲的,看一下基本属性
final HttpUrl url;//封装好的url地址
final String method;//请求方法
final Headers headers;//请求头
final @Nullable RequestBody body;//请求体
final Map<Class<?>, Object> tags;//设置的标签 可以用来取消请求
private volatile CacheControl cacheControl;//缓存控制
Request是通过Builder构造的。
Call
这是一个接口,其方法的具体实现是通过RealCall来完成的。
在调用的时候通过client.newCall(request)生成一个RealCall。
//OkHttpClient
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
//RealCall
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);//先通过构造方法生成call实例
call.eventListener = client.eventListenerFactory().create(call);//创建eventListener并设置给call用于监听请求过程
return call;
}
首先我们看一下RealCall的一些属性
final OkHttpClient client;//客户端实例
final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;//用于重试请求的拦截器
private EventListener eventListener;//监听
final Request originalRequest;//我们刚才创建的原始请求
final boolean forWebSocket;//是否是用于websocket
// Guarded by this.
private boolean executed;//是否已经执行了
然后就是execute和enqueue方法
我们先看一下同步的请求情况
execute
//execute
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);//回调通知请求开始
try {
client.dispatcher().executed(this);//通知分发器立即执行此请求
Response result = getResponseWithInterceptorChain();//通过拦截器链获取response
if (result == null) throw new IOException("Canceled");
return result;//返回response
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);//最后通知分发器结束此请求
}
}
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
//构造拦截器链
return chain.proceed(originalRequest);//通过拦截器链进行请求并返回
}
同步的方法比较简单
1 修改执行状态,通知回调等。
2 通知dispatcher此请求开始执行拉。
3 构造拦截器链并通过其进行请求,返回请求结果。
4 通知dispatcher此请求结束了。
下面再看一下enqueue异步请求。
enqueue
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
1 修改执行状态,通知回调等。
2 将请求包装成AsyncCall并调用dispatcher的enqueue方法将其入队。
我们这里先看一下AsyncCall。
AsyncCall
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
···
//主要看他的执行方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
AsyncCall实际是一个Runnable,其run方法被调用之后会调用execute方法,并在期间更改其当前线程名。这点暂不赘述。我们可以看到在他的execute期间做的事和同步的方法差不多。
1 构造拦截器链并通过其进行请求,通过回调返回请求结果。不过在这里判断了此请求是否cancel了。
2 通知dispatcher此请求结束了。
到这里Call类的事暂时告于段落了。我们接下来看一下上面多次提到的分发器dispatcher。
Dispatcher
先看一下一些基本属性
private int maxRequests = 64;//最大请求数
private int maxRequestsPerHost = 5;//最大Host数
private @Nullable Runnable idleCallback;//闲置状态下的回调
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;//请求线程池
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();//等待被执行的请求队列
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();//正在执行的异步请求队列
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();//正在执行的同步请求队列
在同步请求的情况下会被调用分发器的executed方法
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);//将请求存入了同步队列。
}
在异步请求的情况下被调用enqueue方法存入包装的AsyncCall
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {//如果运行中的异步请求数小于最大请求数且如果算上此请求,当前运行的请求的主机数小于最大请求数
runningAsyncCalls.add(call);//存入异步队列
executorService().execute(call);//通过线程池执行这个请求,后续我们刚才分析过了。
} else {
readyAsyncCalls.add(call);//否则现存入预备队列
}
}
最后,当一个call运行完毕后,会调用分发器的finished方法。
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);//移除此请求,运行下一个异步的请求
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);//移除此请求,
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");//把结束的请求从相应队列中移除
if (promoteCalls) promoteCalls();//运行下一个异步请求
runningCallsCount = runningCallsCount();//计算运行中的请求总数
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {//如果当前没有网络请求以及闲置callback
idleCallback.run();//运行闲置任务
}
}
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
//运行下一个异步请求
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; //异步请求到达最大容量了
if (readyAsyncCalls.isEmpty()) return; //没有别的请求了
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {//遍历准备队列
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {//如果算上此请求,当前运行的请求的主机数小于最大请求数
i.remove();//从预备队列中移除
runningAsyncCalls.add(call);//加入异步队列
executorService().execute(call);//放入线程池运行
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
默认的请求线程池类似一个CachedThreadPool,我们看一下参数
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
- corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
- 线程空闲60s后自动结束。
- workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
- 快速处理大量耗时较短的任务
请求的分发逻辑到这里就结束了,下面看一下一个请求链是如何被组装的。
Interceptor&Chain
先回到之前Call里面的逻辑再看一下
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
之前讲过了,这里创建了一个拦截器链,就是通过这个链来请求并获取响应的。
我们直接看一下这个链的具体实现。
RealInterceptorChain
实际上一个Chain对象可以看作是链的一个节点。
首先是一些基本属性和构造方法
private final List<Interceptor> interceptors;//添加的拦截器list
private final StreamAllocation streamAllocation;//流分配器
private final HttpCodec httpCodec;//http编解码器
private final RealConnection connection;//链接
private final int index;//此节点对应的拦截器在list中的索引
private final Request request;//用户构造的请求
private final Call call;//本次请求call
private final EventListener eventListener;
private final int connectTimeout;//链接超时时限
private final int readTimeout;//读超时时限
private final int writeTimeout;//写入超时时限
private int calls;//此节点proceed调用次数
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
···
}
上面有没见过的类没有关系,暂时还不需要,等下会详细分析。
在构造chain时 StreamAllocation HttpCodec RealConnection 传入的都是null,因为这几个属性也是在拦截器链运行的过程中才生成的。
按照前面的逻辑,通过proceed方法获取响应。下面详细进行分析。
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();//此节点对应的拦截器索引越界。
calls++;//计数++
//省略异常判断
···
// 生成下一个节点。
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);//获取当前节点对应的拦截器
Response response = interceptor.intercept(next);//通过当前拦截器调用下一个节点的process方法来获取响应值
//省略异常判断
···
return response;
}
省略一些异常判断之后逻辑看起来比较清晰。
Interceptor
基本的请求包含以下几个拦截器。
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
RetryAndFollowUpInterceptor
首先要看的自然是intercept方法。
@Override public Response intercept(Chain chain) throws IOException {
//获取相应数据
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
//实例化一个StreamAllocation对象。暂时不用管他是干啥的。
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {//死循环
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;//是否需要释放链接
try {
response = realChain.proceed(request, streamAllocation, null, null);//运行下一个节点的proceed方法得到响应数据
releaseConnection = false;//不需要释放链接
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();//抛出不可处理的异常,需要释放链接了
}
releaseConnection = false;//异常,跳过下面的步骤再次循环尝试请求
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;//异常,跳过下面的步骤再次循环尝试请求
} finally {
if (releaseConnection) {//如果在前面抛出了异常. 这里需要释放链接
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {//如果存在priorresponse,把他添加给response
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp;//跟踪请求
try {
followUp = followUpRequest(response, streamAllocation.route());//获取重定向后的请求
} catch (IOException e) {
streamAllocation.release();
throw e;
}
if (followUp == null) {//无需重定向
if (!forWebSocket) {
streamAllocation.release();
}
return response;//直接返回
}
closeQuietly(response.body());//关闭body停止写入
if (++followUpCount > MAX_FOLLOW_UPS) {//重定向次数过多异常
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {//禁止重定向
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();//根据链接判断是否需要新生成streamAllocation
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;//重定向
priorResponse = response;//设置priorResponse
}
}
只是为了了解整体过程,其余方法这里不赘述了。
BridgeInterceptor
1. 根据原本的请求新建request并填充需要的各种header,cookie等属性,使用此request进行网络请求
2. 从返回的数据中接收cookie,根据返回的response信息构建新的response用于返回上一层处理(okhttp默认没有实现cookie的存取,如果需要要自己实现cookiejar)
3. 如果Content-Encoding == gzip 则解码后返回
CacheInterceptor
@Override public Response intercept(Chain chain) throws IOException {
//根据request获取缓存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//获取缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
//网络请求及缓存response
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy); //记录缓存命中
}
//有缓存,不符合要求
if (cacheCandidate != null && cacheResponse == null) {
//关闭缓存流
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
//没网没缓存,返回504网关超时
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_BODY)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
//网络请求数据
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (validate(cacheResponse, networkResponse)) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers())) //合并header
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response); //更新缓存
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (HttpHeaders.hasBody(response)) {
CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
response = cacheWritingResponse(cacheRequest, response);
}
return response;
}
ConnectInterceptor
//连接目标服务器并把请求传递给下一拦截器
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);//生成Http编解码器
RealConnection connection = streamAllocation.connection();//获取链接
return realChain.proceed(request, streamAllocation, httpCodec, connection);//交给最后一个拦截器去
}
CallServerInterceptor
@Override public Response intercept(Chain chain) throws IOException {
···
httpCodec.writeRequestHeaders(request);//写入请求头
···
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
//有请求体的情况
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();//这种情况需要先请求等待有HTTP/1.1 100 Continue响应头再写入请求体
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);//读取响应头,如果responsecode为100,这里会返回null。
}
if (responseBuilder == null) {//可以写入请求体
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));//创建请求体输出流
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);//添加写入缓冲
request.body().writeTo(bufferedRequestBody);//把请求body写入输出流
bufferedRequestBody.close();//关闭输出流
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {//发送了100-continue的请求,但是没有得到期望结构,要避免此Http/1链接被重用。
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();//请求写入完毕
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);//读取响应头生成响应构建者
}
Response response = responseBuilder//构建一个响应
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);//重新读取响应头并构建
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {//websocket链接
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))//解析响应体
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();//是否需要关闭streamAllocation
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {//报错
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
上面的代码中涉及了 100 (Continue) HTTP 状态码,在客户端发送 Request Message 之前,HTTP/1.1 协议允许客户端先判定服务器是否愿意接受客户端发来的消息主体(基于 Request Headers)。即, 客户端 在 Post(较大)数据到服务端之前,允许双方“握手”,如果匹配上了,Client 才开始发送(较大)数据。这么做的原因是,如果客户端直接发送请求数据,但是服务器又将该请求拒绝的话,这种行为将带来很大的资源开销。
缓存
此部分可以看关于DiskLruCache的另一篇文章
网络
终于到了激动人心的最后一部分~这里算是OkHttp3的核心部分了
首先我们先看一下刚刚见到的httpCodec。
HttpCodec
HttpCodec是一个接口,具体实现交给了Http1Codec和Http2Codec。我们先看看这个接口
public interface HttpCodec {
int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
//创建一个用于写入requestBody的输出流
Sink createRequestBody(Request request, long contentLength);
//写入请求头
void writeRequestHeaders(Request request) throws IOException;
//相当于flush,把请求刷入底层socket
void flushRequest() throws IOException;
//flush并结束本次请求
void finishRequest() throws IOException;
//解析响应头
Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
//获取用于读取响应体的流
ResponseBody openResponseBody(Response response) throws IOException;
//取消请求
void cancel();
}
下面我们看一下对于Http/1.x与Http/2的请求的具体实现。
Http1Codec
首先是基本属性和构造方法
final OkHttpClient client;
/** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
final StreamAllocation streamAllocation;
final BufferedSource source;//用于读取响应的source
final BufferedSink sink;//输入请求的sink
int state = STATE_IDLE;
private long headerLimit = HEADER_LIMIT;
public Http1Codec(OkHttpClient client, StreamAllocation streamAllocation, BufferedSource source,
BufferedSink sink) {
this.client = client;
this.streamAllocation = streamAllocation;
this.source = source;
this.sink = sink;
}
按照之前看到的进行一次网络请求的顺序,首先要写入请求头
@Override public void writeRequestHeaders(Request request) throws IOException {
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
这里的逻辑比较简单,就是拼接及按顺序写入请求头。
然后要做的事获取一个用于写入requestBody的输出流。
@Override public Sink createRequestBody(Request request, long contentLength) {
if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
return newChunkedSink();//长度不固定
}
if (contentLength != -1) {
return newFixedLengthSink(contentLength);//固定长度
}
throw new IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!");
}
public Sink newChunkedSink() {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_WRITING_REQUEST_BODY;//判断并修改当前状态
return new ChunkedSink();
}
public Sink newFixedLengthSink(long contentLength) {
if (state != STATE_OPEN_REQUEST_BODY) throw new IllegalStateException("state: " + state);
state = STATE_WRITING_REQUEST_BODY;
return new FixedLengthSink(contentLength);
}
这里按照是否固定长度生成了两种输出流,并修改当前状态。
先看一下不固定长度的。
private final class ChunkedSink implements Sink {
private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
private boolean closed;
ChunkedSink() {
}
@Override public Timeout timeout() {
return timeout;
}
@Override public void write(Buffer source, long byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
if (byteCount == 0) return;
sink.writeHexadecimalUnsignedLong(byteCount);
sink.writeUtf8("\r\n");
sink.write(source, byteCount);
sink.writeUtf8("\r\n");
}
@Override public synchronized void flush() throws IOException {
if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
sink.flush();
}
@Override public synchronized void close() throws IOException {
if (closed) return;
closed = true;
sink.writeUtf8("0\r\n\r\n");
detachTimeout(timeout);
state = STATE_READ_RESPONSE_HEADERS;
}
}
这里可以看到,对编解码器持有的sink进行了一次包装,在写入数据的时候加入了换行等操作。
同理对于定长的sink也是一样的逻辑。
private final class FixedLengthSink implements Sink {
private final ForwardingTimeout timeout = new ForwardingTimeout(sink.timeout());
private boolean closed;
private long bytesRemaining;
FixedLengthSink(long bytesRemaining) {
this.bytesRemaining = bytesRemaining;
}
@Override public Timeout timeout() {
return timeout;
}
@Override public void write(Buffer source, long byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
checkOffsetAndCount(source.size(), 0, byteCount);
if (byteCount > bytesRemaining) {
throw new ProtocolException("expected " + bytesRemaining
+ " bytes but received " + byteCount);
}
sink.write(source, byteCount);
bytesRemaining -= byteCount;
}
@Override public void flush() throws IOException {
if (closed) return; // Don't throw; this stream might have been closed on the caller's behalf.
sink.flush();
}
@Override public void close() throws IOException {
if (closed) return;
closed = true;
if (bytesRemaining > 0) throw new ProtocolException("unexpected end of stream");
detachTimeout(timeout);
state = STATE_READ_RESPONSE_HEADERS;
}
}
在构造的时候传入长度标示并在写入和关闭的时候做判断。
在请求体写入之后调用flush将请求刷入底层socket。
@Override public void flushRequest() throws IOException {
sink.flush();
}
然后解析出我们的响应头
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {//状态判断
throw new IllegalStateException("state: " + state);
}
try {
StatusLine statusLine = StatusLine.parse(readHeaderLine());//读取并解析第一行状态行
Response.Builder responseBuilder = new Response.Builder()//根据信息构造responseBuilder
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());//读取剩余行
if (expectContinue && statusLine.code == HTTP_CONTINUE) {//根据参数和响应码返回
return null;
} else if (statusLine.code == HTTP_CONTINUE) {
state = STATE_READ_RESPONSE_HEADERS;
return responseBuilder;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// Provide more context if the server ends the stream before sending a response.
IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
//读取一行响应头
private String readHeaderLine() throws IOException {
String line = source.readUtf8LineStrict(headerLimit);//直接读取一行
headerLimit -= line.length();
return line;
}
//读取剩余行
public Headers readHeaders() throws IOException {
Headers.Builder headers = new Headers.Builder();
// parse the result headers until the first blank line
for (String line; (line = readHeaderLine()).length() != 0; ) {//循环读取并写入headers
Internal.instance.addLenient(headers, line);
}
return headers.build();
}
最后就是获取响应体了
@Override public ResponseBody openResponseBody(Response response) throws IOException {
streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
String contentType = response.header("Content-Type");
if (!HttpHeaders.hasBody(response)) {
Source source = newFixedLengthSource(0);
return new RealResponseBody(contentType, 0, Okio.buffer(source));
}
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
Source source = newChunkedSource(response.request().url());
return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
Source source = newFixedLengthSource(contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}
return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}
在这个方法里,对source进行了两层包装,先根据是否定长包装成了对应的source,然后又包装了缓冲层。
在第一层包装中,几个类ChunkedSource/FixedLengthSource/UnknownLengthSource都继承了AbstractSource
这个类提供了当数据读取完毕后关闭缓存条目并释放socket用于重复使用的方法。
protected final void endOfInput(boolean reuseConnection, IOException e) throws IOException {
if (state == STATE_CLOSED) return;//判断状态
if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
detachTimeout(timeout);//移除超时
state = STATE_CLOSED;//设置状态关闭
if (streamAllocation != null) {
streamAllocation.streamFinished(!reuseConnection, Http1Codec.this, bytesRead, e);//finishstream
}
}
因为http1一个链接只能有一个请求,而http2支持一个链接多个请求,所以其实现和http1codec不同,我们看一下。
Http2Codec
先看一下基本属性
private final Interceptor.Chain chain;
final StreamAllocation streamAllocation;
private final Http2Connection connection;
private Http2Stream stream;
private final Protocol protocol;
与1不同,多了很多没见过的对象。我们先看一下请求流程再依次分析
和刚才一样,首先是写入请求头
@Override public void writeRequestHeaders(Request request) throws IOException {
if (stream != null) return;//如果当前链接存在运行中的流,不能再此写入了
boolean hasRequestBody = request.body() != null;
List<Header> requestHeaders = http2HeadersList(request);//解析出请求头
stream = connection.newStream(requestHeaders, hasRequestBody);//使用链接获取新的流的实例
stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);//设置超时
stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
}
剩下几个方法就不展示了,实际上就是对stream的操作替换了http1中直接操作链接中的source和sink,有兴趣可以看一下。
下面我们重点看一下这里的Http2Connection和Http2Stream。
Http2Connection
首先看一下他的部分属性和构造方法。
final Map<Integer, Http2Stream> streams = new LinkedHashMap<>();//流集合
final String hostname;//主机名
int lastGoodStreamId;//上一个好的流的id
int nextStreamId;//下一个流id
boolean shutdown;//是否终止
···
final Socket socket;//底层socket
final Http2Writer writer;
Http2Connection(Builder builder) {
pushObserver = builder.pushObserver;
client = builder.client;
listener = builder.listener;
// http://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-5.1.1
nextStreamId = builder.client ? 1 : 2;
if (builder.client) {
nextStreamId += 2; // In HTTP/2, 1 on client is reserved for Upgrade.
}
// Flow control was designed more for servers, or proxies than edge clients.
// If we are a client, set the flow control window to 16MiB. This avoids
// thrashing window updates every 64KiB, yet small enough to avoid blowing
// up the heap.
if (builder.client) {
okHttpSettings.set(Settings.INITIAL_WINDOW_SIZE, OKHTTP_CLIENT_WINDOW_SIZE);
}
hostname = builder.hostname;
writerExecutor = new ScheduledThreadPoolExecutor(1,
Util.threadFactory(Util.format("OkHttp %s Writer", hostname), false));
if (builder.pingIntervalMillis != 0) {
writerExecutor.scheduleAtFixedRate(new PingRunnable(false, 0, 0),
builder.pingIntervalMillis, builder.pingIntervalMillis, MILLISECONDS);
}
// Like newSingleThreadExecutor, except lazy creates the thread.
pushExecutor = new ThreadPoolExecutor(0, 1, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
Util.threadFactory(Util.format("OkHttp %s Push Observer", hostname), true));
peerSettings.set(Settings.INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE);
peerSettings.set(Settings.MAX_FRAME_SIZE, Http2.INITIAL_MAX_FRAME_SIZE);
bytesLeftInWriteWindow = peerSettings.getInitialWindowSize();
socket = builder.socket;
writer = new Http2Writer(builder.sink, client);
readerRunnable = new ReaderRunnable(new Http2Reader(builder.source, client));
}
可以看到,一个http2的链接中又多个stream用于请求。当我们写入请求头时,通过newstream获取一个流实例。
private Http2Stream newStream(
int associatedStreamId, List<Header> requestHeaders, boolean out) throws IOException {
boolean outFinished = !out;
boolean inFinished = false;
boolean flushHeaders;
Http2Stream stream;
int streamId;
synchronized (writer) {
synchronized (this) {
if (nextStreamId > Integer.MAX_VALUE / 2) {//数量过多,关闭一部分拒绝的流
shutdown(REFUSED_STREAM);
}
if (shutdown) {
throw new ConnectionShutdownException();//当前链接已经关闭了,抛出异常
}
streamId = nextStreamId;
nextStreamId += 2; //计数器+2
stream = new Http2Stream(streamId, this, outFinished, inFinished, requestHeaders);//生成一个流
flushHeaders = !out || bytesLeftInWriteWindow == 0L || stream.bytesLeftInWriteWindow == 0L;//是否需要立即把请求头写入socket
if (stream.isOpen()) {//如果成功开启了,把这个流存入缓存
streams.put(streamId, stream);
}
}
if (associatedStreamId == 0) {//写入数据
writer.synStream(outFinished, streamId, associatedStreamId, requestHeaders);
} else if (client) {
throw new IllegalArgumentException("client streams shouldn't have associated stream IDs");
} else { // HTTP/2 has a PUSH_PROMISE frame.
writer.pushPromise(associatedStreamId, streamId, requestHeaders);
}
}
if (flushHeaders) {
writer.flush();
}
return stream;
}
下面我们结合Http2Stream来看
Http2Stream
先看一下基本属性和构造方法
long bytesLeftInWriteWindow;//流量控制窗口值
final int id;
final Http2Connection connection;
/** Request headers. Immutable and non null. */
private final List<Header> requestHeaders;//请求头
/** Response headers yet to be {@linkplain #takeResponseHeaders taken}. */
private List<Header> responseHeaders;//响应头
/** True if response headers have been sent or received. */
private boolean hasResponseHeaders;//是否有响应头
private final FramingSource source;//读取一帧的流
final FramingSink sink;//写入一帧的流
在Http2中,使用帧的方法传递数据,并添加了流量控制的方案。具体的可以去查看相关资料,这里不赘述。
Http2Stream(int id, Http2Connection connection, boolean outFinished, boolean inFinished,
List<Header> requestHeaders) {
if (connection == null) throw new NullPointerException("connection == null");
if (requestHeaders == null) throw new NullPointerException("requestHeaders == null");
this.id = id;
this.connection = connection;
this.bytesLeftInWriteWindow =
connection.peerSettings.getInitialWindowSize();
this.source = new FramingSource(connection.okHttpSettings.getInitialWindowSize());
this.sink = new FramingSink();
this.source.finished = inFinished;
this.sink.finished = outFinished;
this.requestHeaders = requestHeaders;
}
在构造函数中生成了source和sink的实例。
private final class FramingSource implements Source {
/** Buffer to receive data from the network into. Only accessed by the reader thread. */
private final Buffer receiveBuffer = new Buffer();
/** Buffer with readable data. Guarded by Http2Stream.this. */
private final Buffer readBuffer = new Buffer();
/** Maximum number of bytes to buffer before reporting a flow control error. */
private final long maxByteCount;
/** True if the caller has closed this stream. */
boolean closed;
/**
* True if either side has cleanly shut down this stream. We will receive no more bytes beyond
* those already in the buffer.
*/
boolean finished;
FramingSource(long maxByteCount) {
this.maxByteCount = maxByteCount;
}
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
long read = -1;
ErrorCode errorCode;
synchronized (Http2Stream.this) {
waitUntilReadable();
if (closed) {
throw new IOException("stream closed");
}
errorCode = Http2Stream.this.errorCode;
if (readBuffer.size() > 0) {
// Move bytes from the read buffer into the caller's buffer.
read = readBuffer.read(sink, Math.min(byteCount, readBuffer.size()));
unacknowledgedBytesRead += read;
}
if (errorCode == null
&& unacknowledgedBytesRead >= connection.okHttpSettings.getInitialWindowSize() / 2) {
// Flow control: notify the peer that we're ready for more data! Only send a WINDOW_UPDATE
// if the stream isn't in error.
connection.writeWindowUpdateLater(id, unacknowledgedBytesRead);
unacknowledgedBytesRead = 0;
}
}
if (read != -1) {
// Update connection.unacknowledgedBytesRead outside the stream lock.
updateConnectionFlowControl(read);
return read;
}
if (errorCode != null) {
// We defer throwing the exception until now so that we can refill the connection
// flow-control window. This is necessary because we don't transmit window updates until the
// application reads the data. If we throw this prior to updating the connection
// flow-control window, we risk having it go to 0 preventing the server from sending data.
throw new StreamResetException(errorCode);
}
return -1; // This source is exhausted.
}
private void updateConnectionFlowControl(long read) {
assert (!Thread.holdsLock(Http2Stream.this));
connection.updateConnectionFlowControl(read);
}
/** Returns once the source is either readable or finished. */
private void waitUntilReadable() throws IOException {
readTimeout.enter();
try {
while (readBuffer.size() == 0 && !finished && !closed && errorCode == null) {
waitForIo();
}
} finally {
readTimeout.exitAndThrowIfTimedOut();
}
}
void receive(BufferedSource in, long byteCount) throws IOException {
assert (!Thread.holdsLock(Http2Stream.this));
while (byteCount > 0) {
boolean finished;
boolean flowControlError;
synchronized (Http2Stream.this) {
finished = this.finished;
flowControlError = byteCount + readBuffer.size() > maxByteCount;
}
// If the peer sends more data than we can handle, discard it and close the connection.
if (flowControlError) {
in.skip(byteCount);
closeLater(ErrorCode.FLOW_CONTROL_ERROR);
return;
}
// Discard data received after the stream is finished. It's probably a benign race.
if (finished) {
in.skip(byteCount);
return;
}
// Fill the receive buffer without holding any locks.
long read = in.read(receiveBuffer, byteCount);
if (read == -1) throw new EOFException();
byteCount -= read;
// Move the received data to the read buffer to the reader can read it.
synchronized (Http2Stream.this) {
boolean wasEmpty = readBuffer.size() == 0;
readBuffer.writeAll(receiveBuffer);
if (wasEmpty) {
Http2Stream.this.notifyAll();
}
}
}
}
@Override public Timeout timeout() {
return readTimeout;
}
@Override public void close() throws IOException {
long bytesDiscarded;
synchronized (Http2Stream.this) {
closed = true;
bytesDiscarded = readBuffer.size();
readBuffer.clear();
Http2Stream.this.notifyAll();
}
if (bytesDiscarded > 0) {
updateConnectionFlowControl(bytesDiscarded);
}
cancelStreamIfNecessary();
}
}
先到这里,下一篇详细分析一下OkHttp请求的三个核心 StreamAllocation RealConnection 和ConnectionPool