Android开发Android开发经验谈Android技术知识

OkHttp源码解析

2019-09-22  本文已影响0人  小村医

一、OkHttp同步网络请求

OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://www.baidu.com")
        .get().build();
Call call = client.newCall(request);
try {
    Response response = call.execute();
    System.out.println(response.body().string());
} catch (IOException e) {
    e.printStackTrace();
}
  1. 创建OkHttpClient对象
    OkHttpClient是Okhttp请求的客户端类,很多的功能都需要通过这个客户端类来进行转发或实现
    有两种方式创建:
public Builder() {
    // 网络请求分发类
    dispatcher = new Dispatcher();
    protocols = DEFAULT_PROTOCOLS;
    connectionSpecs = DEFAULT_CONNECTION_SPECS;
    eventListenerFactory = EventListener.factory(EventListener.NONE);
    proxySelector = ProxySelector.getDefault();
    cookieJar = CookieJar.NO_COOKIES;
    socketFactory = SocketFactory.getDefault();
    hostnameVerifier = OkHostnameVerifier.INSTANCE;
    certificatePinner = CertificatePinner.DEFAULT;
    proxyAuthenticator = Authenticator.NONE;
    authenticator = Authenticator.NONE;
    // 连接池,客户端和服务器的每一个连接都放在连接池中统一管理,请求的url相同是就可以复用连接池中的连接 
    connectionPool = new ConnectionPool();
    dns = Dns.SYSTEM;
    followSslRedirects = true;
    followRedirects = true;
    retryOnConnectionFailure = true;
    connectTimeout = 10_000;
    readTimeout = 10_000;
    writeTimeout = 10_000;
    pingInterval = 0;
}
  1. 创建Request对象
    也是通过Builder模式来创建
public Builder() {
    // 指定请求方式
    this.method = "GET";
    // 保持header信息
    this.headers = new Headers.Builder();
}
public Request build() {
    if (url == null) throw new IllegalStateException("url == null");
    return new Request(this);
}

  Request(Builder builder) {
    this.url = builder.url;
    this.method = builder.method;
    this.headers = builder.headers.build();
    this.body = builder.body;
    this.tag = builder.tag != null ? builder.tag : this;
  }
  1. 将Request封装成Call对象
    Call代表一个实际的网络请求,是连接request和response的一个桥梁
OkHttpClient:
  @Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

RealCall:
  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
  }
  1. 调用Call的execute()方法发布同步请求
    发布请求后,就会进入阻塞状态,直到收到响应
  @Override public Response execute() throws IOException {
    synchronized (this) {
      // 同一个网络请求只能执行一次
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    try {
      //  把请求添加到分发器的同步请求队列中
      client.dispatcher().executed(this);
      // 调用拦截器链获取网络返回数据
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      // 回收同步请求
      client.dispatcher().finished(this);
    }
  }

Dispatcher中添加同步队列、回收同步请求

public final class Dispatcher {
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      // 从当前队列中移出同步请求
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      if (promoteCalls) promoteCalls();
      // 计算目前还在进行的网络请求数量
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }
  public synchronized int runningCallsCount() {
    return runningAsyncCalls.size() + runningSyncCalls.size();
  } 
}
调用过程.png

二、OkHttp异步网络请求

Request request = new Request.Builder().url("http://www.baidu.com")
        .get().build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        System.out.println("Fail");
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        System.out.println(response.body().string());
    }
});
  1. 创建OkHttpClient和Request对象
  2. 将Request封装成Call对象
  3. 调用Call的enquene()方法发布同步请求
    onResponse和onFailure两个回调方法都是在子线程中执行的

enqueue方法
判断当前call -> 封装成一个AsyncCall对象 ->client.dispatcher().enqueue()

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

把传入的responseCallback封装成AsyncCall,AsyncCall继承自NamedRunnable,其实就是一个Runnable

Dispatcher将请求添加到异步请求队列

  synchronized void enqueue(AsyncCall call) {
    // 正在进行的网络请求数量是否小于最大请求数量,每个主机的请求是否小于设定的最大数量
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 添加到正在运行的异步请求队列中
      runningAsyncCalls.add(call);
      // 通过线程池执行异步请求
      executorService().execute(call);
    } else {
      // 添加到ready队列
      readyAsyncCalls.add(call);
    }
  }

加入线程池后会调用AsyncCallexecute方法完成真正的操作

@Override protected void execute() {
    boolean signalledCallback = false;
    try {
         // 调用拦截器链获取网络返回数据
        Response response = getResponseWithInterceptorChain();
        // 重定向拦截器是否取消
        if (retryAndFollowUpInterceptor.isCanceled()) {
            signalledCallback = true;
            responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
            signalledCallback = true;
            responseCallback.onResponse(RealCall.this, response);
        }
    } catch (IOException e) {
       
    } finally {
        // 调用dispatcher的finished方法
        client.dispatcher().finished(this);
    }
}

三、OkHttp的任务调度

OkHttp的任务调度.png

发送的同步/异步请求都会在dispatcher中管理其状态

dispatcher的作用是维护请求的状态,并维护一个线程池,用于执行请求

public final class Dispatcher {
  /** 等待执行异步请求队列 */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** 正在执行的异步请求队列. 包括没有完成被取消的任务 */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** 正在执行的同步请求队列. 包括没有完成被取消的任务*/
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  // 处理异步请求的线程池
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
 
  // 异步请求
  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }
  // 同步请求
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
  //call执行完成需要在runningAsyncCalls中移出这个call 
  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      // 从当前队列中移出同步请求
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      // 调整任务队列
      if (promoteCalls) promoteCalls();
      // 计算目前还在进行的网络请求数量
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    } 
  }

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        // 从readyAsyncCalls中取出call添加到runningAsyncCalls中
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

}

异步请求为什么需要两个队列

四、OkHttp拦截器

拦截器是OkHttp中提供的一个强大的机制,它可以实现网络监听、请求以及响应重写、请求失败重试等功能。


okhttp core.png

从上图可以看出有两种拦截器一种是application拦截器、一种是network拦截器 ,OkHttp Core中是系统内部的拦截器
下面这张图是系统内部提供的拦截器:


系统内部拦截器.png

1、拦截器链

从上面的源码分析可以知道网络请求是通过调用RealCallgetResponseWithInterceptorChain方法实现的

Response getResponseWithInterceptorChain() throws IOException {
    // 创建一个拦截器链,通过依次执行每一个拦截器来获取网络返回
    List<Interceptor> interceptors = new ArrayList<>();
    // 添加用户自定义的application拦截器
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
        // 添加用户自定义的network拦截器
        interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //将拦截器传入拦截器链中 
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
     // 调用拦截器链的proceed方法执行请求
    return chain.proceed(originalRequest);
}

下面接着看一下拦截器链的proceed方法的实现


  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {

    // 创建一个拦截器的链,传入的index是index+1
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    // 执行拦截器的intercept方法 
    Response response = interceptor.intercept(next);

    // 确保下一个拦截器也调用了chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    

    return response;
  }
  1. 创建一系列拦截器,并将其添加的拦截器list中
  2. 创建一个拦截器链RealInterceptorChain,并执行拦截器链的proceed方法
    proceed方法的核心是创建下一个拦截器链,这样就可以依次调用下一个拦截器的intercept方法

拦截器在发起网络请求前对request进行处理->调用下一个拦截器获取response->对response进行处理返回给上一个拦截器

2、RetryAndFollowUpInterceptor 重试重定向拦截器

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();
     //用来建立执行http请求所需要的组件,真正的使用是在ConnectInterceptor中,主要用于获取连接服务端的connection和数据传输的输入输出流
    streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
        call, eventListener, callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        
      }
      // 根据response重新封装request请求 
      Request followUp = followUpRequest(response);

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());
      // 判断重试的次数
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }

  1. 创建StreamAllocation对象
  2. 调用RealInterceptorChain.proceed(...)进行网络请求
  3. 根据异常结果或响应结果判断是否要进行重试
  4. 调用下一个拦截器,对response进行处理,返回给上一个拦截器

3、BridgeInterceptor 桥接拦截器

主要是设置请求内容长度、编码方式、压缩、解压缩等 header相关处理

  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
       //保持网络请求连接状态
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
    // 调用拦截链的proceed
    Response networkResponse = chain.proceed(requestBuilder.build());

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
    // 解压缩
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

  1. 负责将用户构建的一个Request请求转化为能够进行网络访问的请求
  2. 将这个符合网络请求的Request进行网络请求
  3. 将网络请求回来的响应Response转化为用户可用的Response

4、CacheInterceptor 缓存拦截器

如何使用OkHttp缓存
OkHttpClient client = new OkHttpClient
        .Builder()
        .cache(new Cache(new File("cache"), 24 * 1024 * 1024)).build();

Request request = new Request.Builder().url("http://www.baidu.com").build();

Call call = client.newCall(request);

try {
    Response response = call.execute();
    response.close();
} catch (IOException e) {
    e.printStackTrace();
}
保存网络请求数据

网络请求数据的保存和获取的实现方法在Cache类中

  @Nullable CacheRequest put(Response response) {
     // 获取请求方式
    String requestMethod = response.request().method();
     //判断是否有效的缓存
    if (HttpMethod.invalidatesCache(response.request().method())) {
      try {
        remove(response.request());
      } catch (IOException ignored) {
        // The cache cannot be written.
      }
      return null;
    }
    // 只有get方法才会缓存
    if (!requestMethod.equals("GET")) {
      return null;
    }
    // header中是否含有*号
    if (HttpHeaders.hasVaryAll(response)) {
      return null;
    }
    // 要写入缓存的部分的实例
    Entry entry = new Entry(response);
    // 最终的缓存由DiskLruCache来实现
    DiskLruCache.Editor editor = null;
    try {
      editor = cache.edit(key(response.request().url()));
      if (editor == null) {
        return null;
      }
      entry.writeTo(editor);
      return new CacheRequestImpl(editor);
    } catch (IOException e) {
      abortQuietly(editor);
      return null;
    }
  }

 public static boolean invalidatesCache(String method) {
    return method.equals("POST")
        || method.equals("PATCH")
        || method.equals("PUT")
        || method.equals("DELETE")
        || method.equals("MOVE");     // WebDAV
  }

entry.writeTo(editor)将数据写入缓存

    /**
     * <pre>{@code
     *   http://google.com/foo
     *   GET
     *   2
     *   Accept-Language: fr-CA
     *   Accept-Charset: UTF-8
     *   HTTP/1.1 200 OK
     *   3
     *   Content-Type: image/png
     *   Content-Length: 100
     *   Cache-Control: max-age=600
     * }</pre>
     */
    public void writeTo(DiskLruCache.Editor editor) throws IOException {
      BufferedSink sink = Okio.buffer(editor.newSink(ENTRY_METADATA));

      sink.writeUtf8(url)
          .writeByte('\n');
      sink.writeUtf8(requestMethod)
          .writeByte('\n');
      sink.writeDecimalLong(varyHeaders.size())
          .writeByte('\n');
      for (int i = 0, size = varyHeaders.size(); i < size; i++) {
        sink.writeUtf8(varyHeaders.name(i))
            .writeUtf8(": ")
            .writeUtf8(varyHeaders.value(i))
            .writeByte('\n');
      }

      sink.writeUtf8(new StatusLine(protocol, code, message).toString())
          .writeByte('\n');
      sink.writeDecimalLong(responseHeaders.size() + 2)
          .writeByte('\n');
      for (int i = 0, size = responseHeaders.size(); i < size; i++) {
        sink.writeUtf8(responseHeaders.name(i))
            .writeUtf8(": ")
            .writeUtf8(responseHeaders.value(i))
            .writeByte('\n');
      }
      sink.writeUtf8(SENT_MILLIS)
          .writeUtf8(": ")
          .writeDecimalLong(sentRequestMillis)
          .writeByte('\n');
      sink.writeUtf8(RECEIVED_MILLIS)
          .writeUtf8(": ")
          .writeDecimalLong(receivedResponseMillis)
          .writeByte('\n');

      if (isHttps()) {
        sink.writeByte('\n');
        sink.writeUtf8(handshake.cipherSuite().javaName())
            .writeByte('\n');
        writeCertList(sink, handshake.peerCertificates());
        writeCertList(sink, handshake.localCertificates());
        sink.writeUtf8(handshake.tlsVersion().javaName()).writeByte('\n');
      }
      sink.close();
    }

上面只是将请求和响应的header信息写入了缓存,那Response的body怎么写入的呢?
put方法中最后会返回一个CacheRequestImpl,CacheInterceptor中获取到网络数据后会调用Cache的put方法并得到CacheRequestImpl对象,然后通过CacheRequestImpl将body数据写入缓存

 private final class CacheRequestImpl implements CacheRequest {
    private final DiskLruCache.Editor editor;
    private Sink cacheOut;
    private Sink body;
    boolean done;

    CacheRequestImpl(final DiskLruCache.Editor editor) {
      this.editor = editor;
      this.cacheOut = editor.newSink(ENTRY_BODY);
      this.body = new ForwardingSink(cacheOut) {
        @Override public void close() throws IOException {
          synchronized (Cache.this) {
            if (done) {
              return;
            }
            done = true;
            writeSuccessCount++;
          }
          super.close();
           // close时把数据写入缓存
          editor.commit();
        }
      };
    }
}
获取缓存中的网络请求数据
  @Nullable Response get(Request request) {
    String key = key(request.url());
    DiskLruCache.Snapshot snapshot;
    Entry entry;
    try {
      snapshot = cache.get(key);
      if (snapshot == null) {
        return null;
      }
    } catch (IOException e) {
      // Give up because the cache cannot be read.
      return null;
    }

    try {
      entry = new Entry(snapshot.getSource(ENTRY_METADATA));
    } catch (IOException e) {
      Util.closeQuietly(snapshot);
      return null;
    }

    Response response = entry.response(snapshot);

    if (!entry.matches(request, response)) {
      Util.closeQuietly(response.body());
      return null;
    }

    return response;
  } 
    //缓存快照转换成Response
    public Response response(DiskLruCache.Snapshot snapshot) {
      String contentType = responseHeaders.get("Content-Type");
      String contentLength = responseHeaders.get("Content-Length");
      Request cacheRequest = new Request.Builder()
          .url(url)
          .method(requestMethod, null)
          .headers(varyHeaders)
          .build();
      return new Response.Builder()
          .request(cacheRequest)
          .protocol(protocol)
          .code(code)
          .message(message)
          .headers(responseHeaders)
          .body(new CacheResponseBody(snapshot, contentType, contentLength))
          .handshake(handshake)
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(receivedResponseMillis)
          .build();
    }
  }
缓存拦截器
  @Override public Response intercept(Chain chain) throws IOException {
    // 时候存在网络缓存
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
    //生成缓存策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
    // 缓存统计
    if (cache != null) {
      cache.trackResponse(strategy);
    }
    // 有缓存数据但是缓存策略不允许使用缓存,关闭缓存
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // 禁止使用网络并且缓存为null是返回一个504错误
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // 不使用网络请求,直接返回缓存
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    //调用拦截器链的proceed方法
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      //存在缓存,并且网络请求返回code为304时使用缓存
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // 将请求返回的数据保存到本地
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }

5、ConnectInterceptor 网络连接拦截器

  1. ConnectInterceptor获取RetryAndFollowUpInterceptor传过来的StreamAllocation,执行streamAllocation.newStream
  2. 将创建的用于网络IO的RealConnection对象,以及对于服务器交互最为关键的HttpCodec对象传递给后面的拦截器
  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    // HttpCodec用来编码R equest和解码Response
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    //RealConnection用来进行实际的网络io传输 
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

来看一下streamAllocation.newStream 方法做了哪些事情

  public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    try {
      // 调用findHealthyConnection生成一个实际的网络请求连接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      // 生成HttpCodec
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

findHealthyConnection

  //找到一个连接,如果它是健康的,则返回它。如果不正常,则重复此过程,直到找到正常连接。
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // 如果这是一个全新的连接,可以跳过健康检查。
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }
      // 检查以确认连接仍然良好. 如果不健康把它从连接池中移出,然后重新查找
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

findConnection

  /**
   * 返回一个用来建立新的stream的连接. 如果存在现有连接那么复用现在的链接,然后是连接池,最后才会创建一个新的连接
   */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
   
    synchronized (connectionPool) {
      // 尝试使用已分配的连接.我们在这里要小心,因为已分配的连接可能已被限制创建新流
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        // 我们已经分配了一个连接,并且是健康的
        result = this.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // 从连接池中获取一个连接
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
   
    if (result != null) {
      // 从已经分配的连接或连接池中找到了可复用连接直接返回
      return result;
    }

    // 如果我们需要一个路由选择
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // 现在我们有了一组IP地址, 再次尝试从连接池中获取连接. 由于连接合并,这可能匹配到
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }
        // 创建连接
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }
    // 进行TCP+TLS握手
    result.connect(
        connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;
      // 将连接放到连接池中
      Internal.instance.put(connectionPool, result);
    }
    closeQuietly(socket);
    eventListener.connectionAcquired(call, result);
    return result;
  }

ConnectionPool复用网络连接

获取网络连接
  @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    // 遍历连接池中的connections
    for (RealConnection connection : connections) {
      // 根据address和routep判断连接是否可用
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }

调用streamAllocation.acquire获取可用的连接

  public void acquire(RealConnection connection, boolean reportedAcquired) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();
    //赋值给成员变量
    this.connection = connection;
    this.reportedAcquired = reportedAcquired;
    // 把StreamAllocation对象的弱引用添加到connection的allocations中,根据allocations可以判断当前连接持有的StreamAllocation数量
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }
保存网络连接
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      // 清理网络连接
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
连接回收
  /**
   * 如果已超过keep alive限制或空闲连接限制,则删除空闲时间最长的连接。
   * 返回此方法的下一次计划调用的持续时间(以纳秒为单位). Returns -1 无需进一步清理。
   */
  long cleanup(long now) {
  
    // 查找要删除的连接,或者下一次清理时间。
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // 如果连接正在使用,继续搜索。
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

        // 找到最长空闲时间的连接
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // 删除连接
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        //连接将准备退出。
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // 所有连接都在使用中
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
  }

查找需要回收的连接,根据StreamAllocation的弱引用是否为空来判断是否需要回收

  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }
  1. OkHttp使用了GC回收算法
  2. StreamAllocation的数量会渐渐变成0
  3. 被回收线程监测到并回收,这样就可以保持多个健康的keep-alive连接

6、CallServerInterceptor

发起真正的网络请求,并接收网络返回数据

 @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    ......

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    realChain.eventListener()
        .responseHeadersEnd(realChain.call(), response);

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

上一篇 下一篇

猜你喜欢

热点阅读