OkHttp源码解析 -- 拦截器链模式网络请求
前言:
拦截器的介绍如下,它是OkHttp的一个重要功能
/**
* Observes, modifies, and potentially short-circuits requests going out and the corresponding
* responses coming back in. Typically interceptors add, remove, or transform headers on the request
* or response.
*/
观察,修改并潜在地缩短请求发出的请求并返回相应的响应。通常,拦截器会在请求或响应中添加,删除或转换标头。
官网拦截器链
其中应用拦截器和网络拦截器都是我们自己定义的。中间的OkHttp core是系统为我们提供的。系统共为我们提供了五个拦截器。调用顺序如下:
责任链顺序
这五个拦截器的大概作用:
RetryAndFollowUpInterceptor:重试和重定向拦截器
BridgeInterceptor : 适配桥接拦截器,补充必须的请求头
CacheIntercotor: 缓存拦截器,做些缓存操作
ConnectInterceptor:连接拦截器,处理些可用连接操作
CallServerInterceptor:请求服务器拦截器,将请求信息写入,并接受服务器的输出流
1、责任链
getResponseWithInterceptorChain是外部的调用的入口,开始处理整个责任链
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
// 保存所有的责任链(我们自己定义的和系统提供的五大责任链)
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
//retryAndFollowUpInterceptor 这个责任链在初始化RealCall的时候就初始化了,因为需要用到一些他的方法
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
// 这个参数是从OkHttpClient中传递过来的
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
// 构建开始责任链, 他需要的参数有:
// 整个责任链集合和index坐标,通过这两个就可以找到处理的Interceptor
// originalRequest 原始请求Request 和 OkHttpClient中传递的连接、读写时间
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 最后执行整个责任链
return chain.proceed(originalRequest);
}
通过这几天,我对责任链的理解如下:
1.就是将处理类以链式的方法处理;
2.对于组成一条链,需要两个方法,获取请求参数和得到响应回复;
3.怎么组成链式了,也就是怎么创建下一个拦截器链。在链式响应的时候创建一个链并处理下个拦截器方法。
为了方便理解,模仿写了一个拦截器:
// 拦截器和链的接口定义
public interface Interceptor {
// 执行拦截方法,并传入下一个拦截器链这样才能组成一条链
String intercept(Chain chain);
// 拦截器链
interface Chain{
// 获取请求参数
int request();
// 创建爱你
String proceed(int request);
}
}
// 责任链的实现类
public class RealChain implements Interceptor.Chain {
private int index;
private int request;
private List<Interceptor> interceptors;
public RealChain(List<Interceptor> interceptors,int index,int request){
this.index = index;
this.interceptors = interceptors;
this.request = request;
}
@Override
public int request() {
return request;
}
// 责任链处理的时候,生成下一个Chain,并用当前拦截器拦截处理
@Override
public String proceed(int request) {
request++;
RealChain next = new RealChain(interceptors,index+1,request);
return interceptors.get(index).intercept(next);
}
}
拦截器实现类
public class InterceptorOne implements Interceptor {
@Override
public String intercept(Chain chain) {
int request = chain.request();
return chain.proceed(request);
}
}
public class InterceptorTwo implements Interceptor {
@Override
public String intercept(Chain chain) {
int request = chain.request();
return chain.proceed(request);
}
}
// 最后一个拦截器自己处理了,而不是将处理委托给下一个拦截器
public class InterceptorThree implements Interceptor {
@Override
public String intercept(Chain chain) {
int request = chain.request();
return "http 回复";
}
}
调用实现责任链
String getResponseWithInterceptorChain() {
// Build a full stack of interceptors.
List<com.phj.okhttp.Interceptor> interceptors = new ArrayList<>();
com.phj.okhttp.Interceptor interceptor = new InterceptorOne();
interceptors.add(interceptor);
interceptors.add(new InterceptorTwo());
interceptors.add(new InterceptorThree());
RealChain chain = new RealChain(interceptors, 0, 1);
return chain.proceed(1);
}
上面模仿了实现OkHttp责任链拦截器,其实责任链设计模式不确定,大体一样。上一个处理的类中要存有下一个处理类的引用即可处理。
2、RetryAndFollowUpInterceptor
/**
* This interceptor recovers from failures and follows redirects as necessary. It may throw an
* {@link IOException} if the call was canceled.
*/
这个拦截器从故障中恢复并在必要时遵循重定向。 如果呼叫被取消,它可能会抛出IOException
最主要的拦截代码
网络请求中会遇到很多问题,这时候的返回码就不是200,response可能不一定能用,也可能抛出了异常
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
// 进行流分配的,是为了创建Http请求的那些组件的;这这里创建,但不在这里使用,会在connectInterceptor中使用,用于连接服务端的输入输出流
参数分别对应:连接池,连接线路Address, 事件接口回调,堆栈对象
streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
call, eventListener, callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {
// 取消了 则抛出异常
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
// 网络请求
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
// 判断是不是要进一步的处理,返回null则不需要再处理了,直接返回response
Request followUp = followUpRequest(response);
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
// 关闭response.body()
closeQuietly(response.body());
// 判断最大的重来次数 太多了则释放streamAllocation
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
// 是否有相同的连接
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
总结:
1、创建streamAllocation对象,它是创建网络请求响应的所有组件
2、调用下一个拦截器链进行网络请求response = realChain.proceed(request, streamAllocation, null, null);
3、根据异常结果或者响应结果判断是否需要重新请求
3、BridgeInterceptor拦截器
/**
* Bridges from application code to network code. First it builds a network request from a user
* request. Then it proceeds to call the network. Finally it builds a user response from the network
* response.
*/
从应用程序代码到网络代码的桥梁。 首先它从用户请求建立一个网络请求。 然后继续呼叫网络。 最后,它从网络响应中建立用户响应。
BridgeInterceptor会对补充没有填写的但必须的请求参数,将应用的请求参数和网络的参数进行转化成各自需要的参数。
拦截方法如下:
cookieJar 主要就是存取Cookie的
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
// 生成新的Request.Builder
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
// Keep-Alive 是进行连接复用的基础
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
// 是否支持Gzip
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
// cookie的处理
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
// 网络请求
Response networkResponse = chain.proceed(requestBuilder.build());
// 保存cookie
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
// gzip的处理,交给Okio处理
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
// 转化成GzipSource类型
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
// 交给Okio处理压缩
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
1、补充头部信息,让request成为可以发送网络请求的
2、默认连接是Keep-Alive,他可以让一定的时间内不会关闭TCP连接,达到复用作用
3、对返回的Response进行处理,如gzip处理等,转化成我们需要的Response
四、CacheInterceptor 缓存拦截器
/** Serves requests from the cache and writes responses to the cache. */
提供来自缓存的请求并将响应写入缓存
如何使用:
在构建OkHttpClient的时候加入
cache(new Cache(new File("cache"),10*1024*1024))
4.1 Cache 类
先来看看Cache这个类,先看介绍中的内容;
他的作用就是在一定的时间段内,从缓存中获取Response返回
* <h3>Force a Network Response</h3>
*
* <p>In some situations, such as after a user clicks a 'refresh' button, it may be necessary to
* skip the cache, and fetch data directly from the server. To force a full refresh, add the {@code
* no-cache} directive: <pre> {@code
* // 跳过缓存,强制从服务器中获取新的数据
* Request request = new Request.Builder()
* .cacheControl(new CacheControl.Builder().noCache().build())
* .url("http://publicobject.com/helloworld.txt")
* .build();
* }</pre>
*
* If it is only necessary to force a cached response to be validated by the server, use the more
* efficient {@code max-age=0} directive instead: <pre> {@code
* // 只需要让服务器来处理缓存
* Request request = new Request.Builder()
* .cacheControl(new CacheControl.Builder()
* .maxAge(0, TimeUnit.SECONDS)
* .build())
* .url("http://publicobject.com/helloworld.txt")
* .build();
* }</pre>
*
* <h3>Force a Cache Response</h3>
*
* <p>Sometimes you'll want to show resources if they are available immediately, but not otherwise.
* This can be used so your application can show <i>something</i> while waiting for the latest data
* to be downloaded. To restrict a request to locally-cached resources, add the {@code
* only-if-cached} directive: <pre> {@code
* //只读取缓存中的内容
* Request request = new Request.Builder()
* .cacheControl(new CacheControl.Builder()
* .onlyIfCached()
* .build())
* .url("http://publicobject.com/helloworld.txt")
* .build();
* Response forceCacheResponse = client.newCall(request).execute();
* if (forceCacheResponse.code() != 504) {
* // The resource was cached! Show it.
* } else {
* // The resource was not cached.
* }
* }</pre>
在Cache类中存在InternalCache这个接口实现对象,它的所有的方法实际上都是Cache这个类执行的。
final InternalCache internalCache = new InternalCache() {
@Override public Response get(Request request) throws IOException {
return Cache.this.get(request);
}
@Override public CacheRequest put(Response response) throws IOException {
return Cache.this.put(response);
}
@Override public void remove(Request request) throws IOException {
Cache.this.remove(request);
}
@Override public void update(Response cached, Response network) {
Cache.this.update(cached, network);
}
@Override public void trackConditionalCacheHit() {
Cache.this.trackConditionalCacheHit();
}
@Override public void trackResponse(CacheStrategy cacheStrategy) {
Cache.this.trackResponse(cacheStrategy);
}
};
为此再来看下Cache中的put方法
@Nullable CacheRequest put(Response response) {
String requestMethod = response.request().method();
if (HttpMethod.invalidatesCache(response.request().method())) {
try {
remove(response.request());
} catch (IOException ignored) {
// The cache cannot be written.
}
return null;
}
// 只对GET方法起作用 其他的不存在缓存
if (!requestMethod.equals("GET")) {
// Don't cache non-GET responses. We're technically allowed to cache
// HEAD requests and some POST requests, but the complexity of doing
// so is high and the benefit is low.
return null;
}
if (HttpHeaders.hasVaryAll(response)) {
return null;
}
// 保存在缓存中的实体,将Response封装成Entry对象处理,包装类而已
Entry entry = new Entry(response);
// 缓存使用DiskLruCache策略操作完成的,Okhttp对DiskLruCache有改动
DiskLruCache.Editor editor = null;
try {
// 将URL转换成对应的key,进行加密,作为键
editor = cache.edit(key(response.request().url()));
if (editor == null) {
return null;
}
// 写入磁盘中,写入的有响应的头部信息和请求的头部信息,如果是HTTPS则还有握手信息
entry.writeTo(editor);
// 暴露给缓存拦截器使用
return new CacheRequestImpl(editor);
} catch (IOException e) {
abortQuietly(editor);
return null;
}
}
Cache中的get方法,根据url读取相应体Response
@Nullable Response get(Request request) {
String key = key(request.url());
DiskLruCache.Snapshot snapshot;
Entry entry;
try {
snapshot = cache.get(key);
if (snapshot == null) {
return null;
}
} catch (IOException e) {
// Give up because the cache cannot be read.
return null;
}
try {
/// 生成该Entry对象
entry = new Entry(snapshot.getSource(ENTRY_METADATA));
} catch (IOException e) {
Util.closeQuietly(snapshot);
return null;
}
// 组成Response并返回
Response response = entry.response(snapshot);
if (!entry.matches(request, response)) {
Util.closeQuietly(response.body());
return null;
}
return response;
}
请求拦截器的操作
@Override public Response intercept(Chain chain) throws IOException {
// 尝试获取缓存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
/// 缓存的策略,内部维护了request和response对象,来决定使用网络还是缓存还是网络和缓存并用对比的策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
// 缓存的命中率
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
// 没有网络请求有没有缓存,构建一个504响应
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.
// 有缓存又不能使用网络,直接返回网络结果
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
// 进行网络请求
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
// 回复一个304,代表从缓存中读取
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
// 满足条件,有响应体又可以写入,那就想入response
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
5、ConnectInterceptor 连接拦截器
/** Opens a connection to the target server and proceeds to the next interceptor. */
打开到目标服务器的连接并转到下一个拦截器
看下拦截方法
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
// 获取从重定向拦截器中的StreamAllocation
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// 这是个编码Request和解码Response
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
// 进行实际的网络IO传输的,传递给下一个拦截器使用
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
streamAllocation.newStream(client, chain, doExtensiveHealthChecks)的分析,大体分为两步
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
// 找到一个健康可用的RealConnection
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
// 并通过RealConnection 生成HttpCodec
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
RealConnection 的处理
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
尝试复用连接
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
开始连接
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(
connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
放入连接池中
// Pool the connection.
Internal.instance.put(connectionPool, result);
}
小结:
1、创建一个RealConnection连接对象
2、选择不同的链接方式,Socket或者隧道连接
3、开始CallServerInterceptor拦截
OkHttp连接池 ConnectionPool
/**
* Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
* share the same {@link Address} may share a {@link Connection}. This class implements the policy
* of which connections to keep open for future use.
*/
用于管理RealConnection,在时间范围内实现连接复用。
get() put()操作
/**
* Returns a recycled connection to {@code address}, or null if no such connection exists. The
* route is null if the address has not yet been routed.
*/
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
// 判断连接是否可用
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
this.reportedAcquired = reportedAcquired;
// 放入到一个List集合
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
put 方法
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
// 异步的执行清理工作
executor.execute(cleanupRunnable);
}
////添加到RrealConnection队列当中
connections.add(connection);
}
线程池来回收
// 创建一个无核心的线程池
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
// 具体的回收工作
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
// 阻塞线程
while (true) {
// 下一次清理的时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
// wait 等待waitMillis
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
在Cleanup中实现了对gc回收算法,标记清除算法
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
// 标记泄露的链接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 如何找到嘴不活跃的链接,就是看弱引用集合的值是不是为null来判断
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
五、CallServerInterceptor
/** This is the last interceptor in the chain. It makes a network call to the server. */
最后一个拦截器,他向服务器发起正在的网络请求,并接受服务器返回的响应
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
// 向socket中写入请求头部信息
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
/// 向socket中写入body信息
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
/// 读取网络返回的头部信息
responseBuilder = httpCodec.readResponseHeaders(false);
}
// 创建Response
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}