androidAndroid开发经验谈Android开发

OkHttp源码解析

2018-01-16  本文已影响928人  砺雪凝霜
OkHttp优点

OkHttp是一个高效的Http客户端,有如下的特点:


源码涉及的主要几个对象

源码解析

源码开始之前我先贴一段OkHttp请求网络的实例

   OkHttpClient mOkHttpClient = new OkHttpClient();

        final Request request = new Request.Builder()
                .url("https://www.jianshu.com/u/b4e69e85aef6")
                .addHeader("user_agent","22222")
                .build();
          Call call = mOkHttpClient.newCall(request);
          call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {

            }

            @Override
            public void onResponse(Call call, Response response) throws IOException {
                if(response != null )
                Log.i(TAG, "返回服务端数据:"+ String.valueOf(response.body().string()));
            }
        });

(1)OkHttp网络请求流程

Call call = mOkHttpClient.newCall(request);

首先会new一个Call对象出来,但其实真正new出来的对象是NewCall对象

 static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }

然后会执行call的enqueue方法

 @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

该方法中首先判断请求有没有被执行,如果请求已经执行,那么直接抛出异常,如果请求没有执行,就会执行Dispatcher对象的enqueue方法,Dispatcher的enqueue方法的源码如下所示:

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

如果正在运行的异步请求数量小于最大的并发数,且正在运行的客户端实际数量请求小于规定的每个主机最大请求数量,那么就把该请求放进正在运行的异步请求队列中,否则就把该请求放进将要执行的异步请求队列中。


(2) Dispatcher任务调度

Dispatcher的各个参数的说明如下:

  //支持的最大并发请求数量
  private int maxRequests = 64;
 //每个主机的最大请求数量
  private int maxRequestsPerHost = 5;

  //请求线程池
  private @Nullable ExecutorService executorService;

  //将要运行的异步请求队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  //正在运行的异步请求队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  //正在运行的同步请求队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

继续看看Dispatcher的executorService方法,如下:

 public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

Dispatcher初始化了一个线程池,核心线程的数量为0 ,最大的线程数量为Integer.MAX_VALUE,空闲线程存在的最大时间为60秒,这个线程类似于CacheThreadPool,比较适合执行大量的耗时比较少的任务。同时我们Dispatcher也可以来设置自己线程池。

Dispatcher我们大概了解之后,回到之前说的,call的enqueue方法其实执行的使Dispatcher的enqueue方法,Dispatcher之后会把call放进请求队列中,最终执行由线程池来执行请求任务。下面来看看RealCall里究竟执行了什么任务。

  @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
       ...
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

RealCall通过执行getResponseWithInterceptorChain()返回Response,如果请求被取消则在进行OnFailue回调,如果请求成功则进行onResponse的回调。
这里要注意两点:


(3) 拦截器

那么RealCall 的getResponseWithInterceptorChain方法中究竟干了些什么呢,它是如何返回Response的呢?

 Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//1
    interceptors.add(retryAndFollowUpInterceptor);//2
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//3
    interceptors.add(new CacheInterceptor(client.internalCache()));//4
    interceptors.add(new ConnectInterceptor(client));//5
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());//6 
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));//7

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);//8
  }

  1. 在配置 OkHttpClient 时设置的 interceptors ()

  2. 负责失败重试以及重定向的RetryAndFollowUpInterceptor

  3. 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转为用户友好的响应的 BridgeInterceptor

  4. 负责读取缓存直接返回、更新缓存的 CacheInterceptor

  5. 负责和服务器建立连接的 ConnectInterceptor

  6. 配置 OkHttpClient 时设置的 networkInterceptors

  7. 负责向服务器发送请求数据、从服务器读取响应数据的 CallServerInterceptor

  8. 在 return chain.proceed(originalRequest),中开启链式调用

RealInterceptorChain的proceed方法源码如下:

  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

理解了这段代码其实整个OkHttp核心流程你就基本掌握了,开始看的时候大家可能头都大了,可是当你debug一下你就豁然开朗了。这段代码核心在下面这部分:

 // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(ne
xt);

首先来了解一下拦截器吧,拦截器是一种能够监控、重写,重试调用的机制。通常情况下,拦截器用来添加、移除、转换请求和响应的头部信息。比如将域名替换为IP地址,在请求头中移除添加host属性;也可以添加我们应用中的一些公共参数,比如设备id、版本号,等等。

拦截器的基本代码结构如下:

public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

}
}
okhttp拦截器.png

再来看看各个拦截器的源码:

  1. 在配置 OkHttpClient 时设置的 interceptors ()

  2. 负责失败重试以及重定向的RetryAndFollowUpInterceptor

  3. 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转为用户友好的响应的 BridgeInterceptor

  4. 负责读取缓存直接返回、更新缓存的 CacheInterceptor

  5. 负责和服务器建立连接的 ConnectInterceptor

  6. 配置 OkHttpClient 时设置的 networkInterceptors

  7. 负责向服务器发送请求数据、从服务器读取响应数据的 CallServerInterceptor


 @Override public Response intercept(Chain chain) throws IOException {
  Request request = chain.request();
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Call call = realChain.call();
...

  followUpCount = 0;
  Response priorResponse = null;
 while (true) {
    if (canceled) {
      streamAllocation.release();
 throw new IOException("Canceled");
  }

    Response response;
 boolean releaseConnection = true;
 try {
      response = realChain.proceed(request, streamAllocation, null, null);
 ...

   Request followUp = followUpRequest(response);   
if (followUp == null) {
      if (!forWebSocket) {
        streamAllocation.release();
  }
      return response;
  }

...

 if (++followUpCount > MAX_FOLLOW_UPS) {
  streamAllocation.release();
 throw new ProtocolException("Too many follow-up requests: " + followUpCount); }
...
    
   request = followUp;
   priorResponse = response;
  }
}

整段代码就是在一个死循环

  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

拦截request并读取缓存,该操作在proceed方法之前执行,也就是在请求的时候进行缓存判断。

 @Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

判断是否应该更新缓存

 // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }
 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

实际上建立连接就是创建了一个 HttpCodec 对象,它将在后面的步骤中被使用,那它又是何方神圣呢?它是对 HTTP 协议操作的抽象,有两个实现:Http1Codec和 Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。

在 Http1Codec中,它利用 Okio 对 Socket 的读写操作进行封装,它对 java.io和 java.nio 进行了封装,让我们更便捷高效的进行 IO 操作。

而创建 HttpCodec 对象的过程涉及到 StreamAllocation、RealConnection代码较长,这个过程概括来说,就是找到一个可用的 RealConnection,再利用 RealConnection 的输入输出(BufferedSource 和 BufferedSink)创建 HttpCodec 对象,供后续步骤使用。

 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    realChain.eventListener()
        .responseHeadersEnd(realChain.call(), response);

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
  1. 向服务器发送 request header;

  2. 如果有 request body,就向服务器发送;

  3. 读取 response header,先构造一个 Response对象;

  4. 如果有 response body,就在 3 的基础上加上 body 构造一个新的 Response对象;

这里我们可以看到,核心工作都由 HttpCodec 对象完成,而 HttpCodec 实际上利用的是 Okio,而 Okio 实际上还是用的 Socket,所以没什么神秘的,只不过一层套一层,层数有点多。

其实 Interceptor的设计也是一种分层的思想,每个 Interceptor 就是一层。为什么要套这么多层呢?分层的思想在 TCP/IP 协议中就体现得淋漓尽致,分层简化了每一层的逻辑,每层只需要关注自己的责任(单一原则思想也在此体现),而各层之间通过约定的接口/协议进行合作(面向接口编程思想),共同完成复杂的任务,这是典型的责任链设计模式

责任链模式是一种对象的行为模式。在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链。请求在这个链上传递,直到链上的某一个对象决定处理此请求。发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求,这使得系统可以在不影响客户端的情况下动态地重新组织和分配责任。

okhttp_full_process.png

(4)OkHttp的复用连接池

Http有一种叫做keepalive connections的机制,而okHttp支持5个并发socket连接,默认keepalive时间为5分钟,接下来我们学习okHttp是如何复用连接的。

  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;

主要变量说明一下:

 public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

通过构造方法可以看出CollectionPool默认空闲的socket最大连接数为5个,socket的keepalive时间为5分钟。CollectionPool实在OkHttpClient实例化的时候创建的

  OkHttpClient(Builder builder) {
    this.dispatcher = builder.dispatcher;

    ...
    this.connectionPool = builder.connectionPool;
    ...
 
  }

ConnectionPool提供对Deque<RealConnection>进行操作的方法分别为put,get,connectionBecameIdle和evictAll
这几个操作,分别对应放入连接,获取连接,移除连接和移除所有连接操作。这里我们只举例说明put和get操作。

  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

再添加到Deque之前首先要清理空闲线程,这个后面会讲到。再来看看get操作:

  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

遍历connections缓存列表。当某个连接计数小于限制的大小,并且request的地址和缓存列表中此连接的地址完全匹配时,则直接复用缓存列表中的connection作为request的连接。

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

线程不断地调用clearup方法进行清理,并返回下次需要清理的间隔时间,然后调用wait方法进行等待以释放锁与时间片。当等待时间到了后,再次进行清理,并返回下次需要清理的间隔时间,如此循环下去。接下来看看clearup方法,如下所示:

  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

   
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

     
        if (pruneAndGetAllocationCount(connection, now) > 0) {//注释<1>
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

      
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {//注释<2>
     
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
    
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
    
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;//注释<3>
      }
    }

    closeQuietly(longestIdleConnection.socket());
    return 0;
  }

clearup方法所做的事情非常简单总结就是,根据连接中的引用计数来计算空闲连接数和活跃连接数,然后标记空闲的连接。

 private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {//注释<1>
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

pruneAndGetAllocationCount方法首先遍历传进来的RealConnection的StreamAllocation;如果StreamAllocation未被使用,则接下来遍历下一个StreamAllocation;如果StreamAllocation未被使用,则从列表中移除。在上面代码注释1处,如果列表为空,则说明此连接没有引用了,这时返回0,表示此连接时空闲连接;否则就返回非0的数,表示此连接时活跃连接。那么StreamAllocation是什么?怎么才能判断StreamAllocation使用与否?接着往下看。

 public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();

    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

RealConnection是socket物理连接的包装,它里面维护了
List<Reference<StreamAllocation>>的引用。List中StreamAllocation的数量也是socket被引用的计数。如果计数为0,则说明此连接没有被复用,也就是空闲的,需要通过下文的算法实现回收;如果计数不为0,则表示上层代码仍然在引用,就无需关闭连接。

可以看出此连接池复用的核心就是用Deque<RealConnection>来存储连接,通过put,getconnectionBecameIdle和evictAll几个操作来对Deque进行操作,另外通过判断连接中的计数对象StreamAllocation来进行自动回收连接。

创作不易,如果本文对您有用的话,记得点一个赞哦


(1)参考文章:https://blog.piasy.com/2016/07/11/Understand-OkHttp/
(2)《Android进阶之光》

上一篇下一篇

猜你喜欢

热点阅读