OkHttp源码解析 -- 拦截器链模式网络请求

2018-03-31  本文已影响48人  PuHJ

前言:

拦截器的介绍如下,它是OkHttp的一个重要功能
/**
 * Observes, modifies, and potentially short-circuits requests going out and the corresponding
 * responses coming back in. Typically interceptors add, remove, or transform headers on the request
 * or response.
 */
观察,修改并潜在地缩短请求发出的请求并返回相应的响应。通常,拦截器会在请求或响应中添加,删除或转换标头。
官网拦截器链

其中应用拦截器和网络拦截器都是我们自己定义的。中间的OkHttp core是系统为我们提供的。系统共为我们提供了五个拦截器。调用顺序如下:


责任链顺序

这五个拦截器的大概作用:
RetryAndFollowUpInterceptor:重试和重定向拦截器
BridgeInterceptor : 适配桥接拦截器,补充必须的请求头
CacheIntercotor: 缓存拦截器,做些缓存操作
ConnectInterceptor:连接拦截器,处理些可用连接操作
CallServerInterceptor:请求服务器拦截器,将请求信息写入,并接受服务器的输出流

1、责任链

getResponseWithInterceptorChain是外部的调用的入口,开始处理整个责任链

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    // 保存所有的责任链(我们自己定义的和系统提供的五大责任链)
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
   //retryAndFollowUpInterceptor 这个责任链在初始化RealCall的时候就初始化了,因为需要用到一些他的方法
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    // 这个参数是从OkHttpClient中传递过来的
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    // 构建开始责任链, 他需要的参数有:
    // 整个责任链集合和index坐标,通过这两个就可以找到处理的Interceptor
   // originalRequest 原始请求Request 和 OkHttpClient中传递的连接、读写时间
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    // 最后执行整个责任链
    return chain.proceed(originalRequest);
  }

通过这几天,我对责任链的理解如下:
1.就是将处理类以链式的方法处理;
2.对于组成一条链,需要两个方法,获取请求参数和得到响应回复;
3.怎么组成链式了,也就是怎么创建下一个拦截器链。在链式响应的时候创建一个链并处理下个拦截器方法。

为了方便理解,模仿写了一个拦截器:

// 拦截器和链的接口定义
public interface Interceptor {

    // 执行拦截方法,并传入下一个拦截器链这样才能组成一条链
    String intercept(Chain chain);

    // 拦截器链
    interface Chain{
   
        // 获取请求参数
        int request();
        
        // 创建爱你
        String proceed(int request);
    }
}


// 责任链的实现类
public class RealChain implements Interceptor.Chain {


    private int index;
    private int request;
    private List<Interceptor> interceptors;

    public RealChain(List<Interceptor> interceptors,int index,int request){
        this.index = index;
        this.interceptors = interceptors;
        this.request = request;
    }

    @Override
    public int request() {
        return request;
    }

    // 责任链处理的时候,生成下一个Chain,并用当前拦截器拦截处理
    @Override
    public String proceed(int request) {

        request++;
        RealChain next = new RealChain(interceptors,index+1,request);

        return interceptors.get(index).intercept(next);
    }
}

拦截器实现类

public class InterceptorOne implements Interceptor {

    @Override
    public String intercept(Chain chain) {
        int request = chain.request();
        return chain.proceed(request);
    }
}

public class InterceptorTwo implements Interceptor {
    @Override
    public String intercept(Chain chain) {
        int request = chain.request();
        return chain.proceed(request);
    }
}

// 最后一个拦截器自己处理了,而不是将处理委托给下一个拦截器
public class InterceptorThree implements Interceptor {

    @Override
    public String intercept(Chain chain) {
        int request = chain.request();
        return "http 回复";
    }
}

调用实现责任链

    String getResponseWithInterceptorChain() {
        // Build a full stack of interceptors.
        List<com.phj.okhttp.Interceptor> interceptors = new ArrayList<>();
        com.phj.okhttp.Interceptor interceptor = new InterceptorOne();
        interceptors.add(interceptor);
        interceptors.add(new InterceptorTwo());
        interceptors.add(new InterceptorThree());

        RealChain chain = new RealChain(interceptors, 0, 1);
        return chain.proceed(1);
    }

上面模仿了实现OkHttp责任链拦截器,其实责任链设计模式不确定,大体一样。上一个处理的类中要存有下一个处理类的引用即可处理。

2、RetryAndFollowUpInterceptor

/**
 * This interceptor recovers from failures and follows redirects as necessary. It may throw an
 * {@link IOException} if the call was canceled.
 */
这个拦截器从故障中恢复并在必要时遵循重定向。 如果呼叫被取消,它可能会抛出IOException

最主要的拦截代码
网络请求中会遇到很多问题,这时候的返回码就不是200,response可能不一定能用,也可能抛出了异常

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();
    // 进行流分配的,是为了创建Http请求的那些组件的;这这里创建,但不在这里使用,会在connectInterceptor中使用,用于连接服务端的输入输出流
 参数分别对应:连接池,连接线路Address, 事件接口回调,堆栈对象
    streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
        call, eventListener, callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      // 取消了 则抛出异常
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
       // 网络请求
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }
     // 判断是不是要进一步的处理,返回null则不需要再处理了,直接返回response
      Request followUp = followUpRequest(response);

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

     // 关闭response.body()
      closeQuietly(response.body());
     // 判断最大的重来次数 太多了则释放streamAllocation
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }
     // 是否有相同的连接
      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }

总结:
1、创建streamAllocation对象,它是创建网络请求响应的所有组件
2、调用下一个拦截器链进行网络请求response = realChain.proceed(request, streamAllocation, null, null);
3、根据异常结果或者响应结果判断是否需要重新请求

3、BridgeInterceptor拦截器

/**
 * Bridges from application code to network code. First it builds a network request from a user
 * request. Then it proceeds to call the network. Finally it builds a user response from the network
 * response.
 */
从应用程序代码到网络代码的桥梁。 首先它从用户请求建立一个网络请求。 然后继续呼叫网络。 最后,它从网络响应中建立用户响应。

BridgeInterceptor会对补充没有填写的但必须的请求参数,将应用的请求参数和网络的参数进行转化成各自需要的参数。

拦截方法如下:
cookieJar 主要就是存取Cookie的

 @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    // 生成新的Request.Builder
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }
    // Keep-Alive 是进行连接复用的基础
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
   // 是否支持Gzip
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }
    // cookie的处理
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
    // 网络请求
    Response networkResponse = chain.proceed(requestBuilder.build());
    // 保存cookie
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    // gzip的处理,交给Okio处理
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      // 转化成GzipSource类型
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      // 交给Okio处理压缩
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

1、补充头部信息,让request成为可以发送网络请求的
2、默认连接是Keep-Alive,他可以让一定的时间内不会关闭TCP连接,达到复用作用
3、对返回的Response进行处理,如gzip处理等,转化成我们需要的Response

四、CacheInterceptor 缓存拦截器

/** Serves requests from the cache and writes responses to the cache. */
提供来自缓存的请求并将响应写入缓存

如何使用:
在构建OkHttpClient的时候加入

 cache(new Cache(new File("cache"),10*1024*1024))

4.1 Cache 类

先来看看Cache这个类,先看介绍中的内容;

他的作用就是在一定的时间段内,从缓存中获取Response返回
 * <h3>Force a Network Response</h3>
 *
 * <p>In some situations, such as after a user clicks a 'refresh' button, it may be necessary to
 * skip the cache, and fetch data directly from the server. To force a full refresh, add the {@code
 * no-cache} directive: <pre>   {@code
 *    // 跳过缓存,强制从服务器中获取新的数据
 *   Request request = new Request.Builder()
 *       .cacheControl(new CacheControl.Builder().noCache().build())
 *       .url("http://publicobject.com/helloworld.txt")
 *       .build();
 * }</pre>
 *
 * If it is only necessary to force a cached response to be validated by the server, use the more
 * efficient {@code max-age=0} directive instead: <pre>   {@code
 *   // 只需要让服务器来处理缓存
 *   Request request = new Request.Builder()
 *       .cacheControl(new CacheControl.Builder()
 *           .maxAge(0, TimeUnit.SECONDS)
 *           .build())
 *       .url("http://publicobject.com/helloworld.txt")
 *       .build();
 * }</pre>

 *
 * <h3>Force a Cache Response</h3>
 *
 * <p>Sometimes you'll want to show resources if they are available immediately, but not otherwise.
 * This can be used so your application can show <i>something</i> while waiting for the latest data
 * to be downloaded. To restrict a request to locally-cached resources, add the {@code
 * only-if-cached} directive: <pre>   {@code
 *      //只读取缓存中的内容
 *     Request request = new Request.Builder()
 *         .cacheControl(new CacheControl.Builder()
 *             .onlyIfCached()
 *             .build())
 *         .url("http://publicobject.com/helloworld.txt")
 *         .build();
 *     Response forceCacheResponse = client.newCall(request).execute();
 *     if (forceCacheResponse.code() != 504) {
 *       // The resource was cached! Show it.
 *     } else {
 *       // The resource was not cached.
 *     }
 * }</pre>

在Cache类中存在InternalCache这个接口实现对象,它的所有的方法实际上都是Cache这个类执行的。

 final InternalCache internalCache = new InternalCache() {
    @Override public Response get(Request request) throws IOException {
      return Cache.this.get(request);
    }

    @Override public CacheRequest put(Response response) throws IOException {
      return Cache.this.put(response);
    }

    @Override public void remove(Request request) throws IOException {
      Cache.this.remove(request);
    }

    @Override public void update(Response cached, Response network) {
      Cache.this.update(cached, network);
    }

    @Override public void trackConditionalCacheHit() {
      Cache.this.trackConditionalCacheHit();
    }

    @Override public void trackResponse(CacheStrategy cacheStrategy) {
      Cache.this.trackResponse(cacheStrategy);
    }
  };

为此再来看下Cache中的put方法

 @Nullable CacheRequest put(Response response) {
    String requestMethod = response.request().method();

    if (HttpMethod.invalidatesCache(response.request().method())) {
      try {
        remove(response.request());
      } catch (IOException ignored) {
        // The cache cannot be written.
      }
      return null;
    }
   // 只对GET方法起作用 其他的不存在缓存
    if (!requestMethod.equals("GET")) {
      // Don't cache non-GET responses. We're technically allowed to cache
      // HEAD requests and some POST requests, but the complexity of doing
      // so is high and the benefit is low.
      return null;
    }

    if (HttpHeaders.hasVaryAll(response)) {
      return null;
    }
      // 保存在缓存中的实体,将Response封装成Entry对象处理,包装类而已
    Entry entry = new Entry(response);
    // 缓存使用DiskLruCache策略操作完成的,Okhttp对DiskLruCache有改动
    DiskLruCache.Editor editor = null;
    try {
       // 将URL转换成对应的key,进行加密,作为键
      editor = cache.edit(key(response.request().url()));
      if (editor == null) {
        return null;
      }
       // 写入磁盘中,写入的有响应的头部信息和请求的头部信息,如果是HTTPS则还有握手信息
      entry.writeTo(editor);
     // 暴露给缓存拦截器使用
      return new CacheRequestImpl(editor);
    } catch (IOException e) {
      abortQuietly(editor);
      return null;
    }
  }

Cache中的get方法,根据url读取相应体Response

@Nullable Response get(Request request) {
    String key = key(request.url());
    DiskLruCache.Snapshot snapshot;
    Entry entry;
    try {
      snapshot = cache.get(key);
      if (snapshot == null) {
        return null;
      }
    } catch (IOException e) {
      // Give up because the cache cannot be read.
      return null;
    }

    try {
    /// 生成该Entry对象
      entry = new Entry(snapshot.getSource(ENTRY_METADATA));
    } catch (IOException e) {
      Util.closeQuietly(snapshot);
      return null;
    }
    // 组成Response并返回
    Response response = entry.response(snapshot);

    if (!entry.matches(request, response)) {
      Util.closeQuietly(response.body());
      return null;
    }

    return response;
  }

请求拦截器的操作

 @Override public Response intercept(Chain chain) throws IOException {
  // 尝试获取缓存
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
  /// 缓存的策略,内部维护了request和response对象,来决定使用网络还是缓存还是网络和缓存并用对比的策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
    // 缓存的命中率
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    // 没有网络请求有没有缓存,构建一个504响应
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
// 有缓存又不能使用网络,直接返回网络结果
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      // 进行网络请求
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
     // 回复一个304,代表从缓存中读取
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        // 满足条件,有响应体又可以写入,那就想入response
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }

5、ConnectInterceptor 连接拦截器

/** Opens a connection to the target server and proceeds to the next interceptor. */
打开到目标服务器的连接并转到下一个拦截器

看下拦截方法

 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
   // 获取从重定向拦截器中的StreamAllocation 
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
      // 这是个编码Request和解码Response
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
   // 进行实际的网络IO传输的,传递给下一个拦截器使用
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

streamAllocation.newStream(client, chain, doExtensiveHealthChecks)的分析,大体分为两步

public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
     // 找到一个健康可用的RealConnection 
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      // 并通过RealConnection 生成HttpCodec 
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

RealConnection 的处理
  /**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }



private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {

尝试复用连接
 if (this.connection != null) {
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }
开始连接
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(
        connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

放入连接池中
      // Pool the connection.
      Internal.instance.put(connectionPool, result);
}

小结:
1、创建一个RealConnection连接对象
2、选择不同的链接方式,Socket或者隧道连接
3、开始CallServerInterceptor拦截

OkHttp连接池 ConnectionPool

/**
 * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
 * share the same {@link Address} may share a {@link Connection}. This class implements the policy
 * of which connections to keep open for future use.
 */
用于管理RealConnection,在时间范围内实现连接复用。

get() put()操作

  /**
   * Returns a recycled connection to {@code address}, or null if no such connection exists. The
   * route is null if the address has not yet been routed.
   */
  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
    // 判断连接是否可用
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

  public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
   // 放入到一个List集合
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }


put 方法
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      // 异步的执行清理工作
      executor.execute(cleanupRunnable);
    }
 ////添加到RrealConnection队列当中
    connections.add(connection);
  }

线程池来回收

   // 创建一个无核心的线程池
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

// 具体的回收工作
private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      // 阻塞线程
      while (true) {
         // 下一次清理的时间
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              // wait 等待waitMillis
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

在Cleanup中实现了对gc回收算法,标记清除算法
long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
     // 标记泄露的链接
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
       // 如何找到嘴不活跃的链接,就是看弱引用集合的值是不是为null来判断
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
  }

五、CallServerInterceptor

/** This is the last interceptor in the chain. It makes a network call to the server. */
最后一个拦截器,他向服务器发起正在的网络请求,并接受服务器返回的响应
 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    // 向socket中写入请求头部信息
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
         /// 向socket中写入body信息
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      /// 读取网络返回的头部信息
      responseBuilder = httpCodec.readResponseHeaders(false);
    }
   // 创建Response
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    realChain.eventListener()
        .responseHeadersEnd(realChain.call(), response);

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
上一篇下一篇

猜你喜欢

热点阅读