Android开发Android开发Android知识

Okhttp3源码解析

2019-03-04  本文已影响17人  求闲居士

一 前言

Retrofit + Okhttp + RxJava,可以说是现在最火的网络请求组合了,而它们背后的设计模式或设计思想,是它们成功的重大原因之一。

分析源码,也是为了学习它们的设计思想,提升自己的编码能力。在上篇分析Retrofit的文章中,外观模式,代理模式,装饰器模式,工厂模式,策略模式等,都是在源码解析中加强了认识。

总体分四步:

  1. 创建OkHttpClient客户端对象
  2. 创建请求消息Request对象。
  3. 创建网络请求的调用封装对象RealCall
  4. 执行网络请求,会获取响应消息Response对象给回调方法

二 OkHttpClient

用建造者模式创建对象。和Retrofit一样,主要功能生成默认配置,且属性很多,这时用建造者模式选择需要配置的属性创建对象就方便了许多。

public OkHttpClient() {
    this(new Builder());
}

public Builder() {
  dispatcher = new Dispatcher();    //执行网络请求的任务调度器
  protocols = DEFAULT_PROTOCOLS;    //默认的协议 http2 http1.1
  connectionSpecs = DEFAULT_CONNECTION_SPECS; // 设置连接时支持的tls层协议以及不进行数据加密
  eventListenerFactory = EventListener.factory(EventListener.NONE);
  proxySelector = ProxySelector.getDefault();
  cookieJar = CookieJar.NO_COOKIES;
  socketFactory = SocketFactory.getDefault();   // socket生产工厂
  hostnameVerifier = OkHostnameVerifier.INSTANCE;
  certificatePinner = CertificatePinner.DEFAULT;
  proxyAuthenticator = Authenticator.NONE;
  authenticator = Authenticator.NONE;
  connectionPool = new ConnectionPool();    //连接池 支持多路复用
  dns = Dns.SYSTEM;
  followSslRedirects = true;
  followRedirects = true;
  retryOnConnectionFailure = true;  //是否重定向
  connectTimeout = 10_000;      //连接超时时间
  readTimeout = 10_000;     
  writeTimeout = 10_000;
  pingInterval = 0;
}

无参构造器是用Builder为参数创建的,相当于还是用Builder实现对象创建。参数是Builder构造器默认创建的。

它是在配置全局发送请求中所需要的各种定制化的参数,并且持有各个参数引用对象。

Dispatcher主要用来管理网络请求的线程池。

可以看出每个OkHttpClient对象对线程池和连接池都有管理,所有OkHttpClient最好做成用单例模式创建。创建多个OkHttpClient对象会占用更多内存。

2.1 Dispatcher

我看网上有说法是使用享元模式,享元共厂内部用线程池实现享元池。用于减少创建对象的数量,以减少内存占用和提高性能。但这里只有一种类型,没有分组对象。

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  //请求网络的线程池,懒汉式单例
  private @Nullable ExecutorService executorService;

  //准备异步队列,当运行队列满时存储任务
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  //运行的异步队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  //运行的同步队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
  
  //单例获取线程池
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
  //异步进行网络请求,AsyncCall是一个Runnable
  synchronized void enqueue(AsyncCall call) {
    //运行时异步队列未超过maxRequests且相同host的请求数量不超过maxRequestsPerHost,加入运行时队列并执行它
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
        //加入等待队列
      readyAsyncCalls.add(call);
    }
  }
  //同步网络请求,直接加入同步队列
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }
  //网络请求结束后,会将请求结束的Call从运行队列中移除
  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //如果是异步队列,还要再根据运行时异步队列和同host的请求数量操控运行时异步队列和等待队列
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }
  
  //从准备队列把Call已到运行队列来
  private void promoteCalls() {
    //运行队列已满,则结束
    if (runningAsyncCalls.size() >= maxRequests) return; // 准备队列为空,怎结束
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();
      //如果这个Call的同host请求在运行队列中不超过maxRequestsPerHost,则加入运行队列
      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }
    //如果运行队列已满,则退出
      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }
}

对于异步请求,调用enqueue方法,将要执行的AsyncCall放入运行或等待队列并交予线程池执行,执行完成后,再调用finished来移除请求。

对于同步请求,调用executed方法,将RealCall放入运行同步队列,在执行完成后调用finished将Call移除队列。

当为异步任务时,会调用promoteCalls方法。遍历准备队列,如果运行队列未满,且运行队列中同一主机请求不超过5个,则将其从准备队列移入运行队列;否则不移动。

三 Request

这个对象是为了配置http请求的请求消息Request,而请求消息分为四部分:请求行(request line)、请求头部(header)、空行和请求数据。

从请求头的组成就可知Request有哪些配置。

Request request = new Request.Builder()
    .url(baseUrl + "top250")
    .build();
    
public static class Builder {
    HttpUrl url;    //请求的url
    String method;  //请求方法
    Headers.Builder headers;    //请求头
    RequestBody body;   //请求数据
    Object tag;

    public Builder() {
      this.method = "GET";
      this.headers = new Headers.Builder();
    }
}

很明显,就是HTTP请求消息Request的组成

四 RealCall

okhttp3.Call call = client.newCall(request);

@Override public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
}

RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    final EventListener.Factory eventListenerFactory = client.eventListenerFactory();

    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);

    // TODO(jwilson): this is unsafe publication and not threadsafe.
    this.eventListener = eventListenerFactory.create(this);
  }

创建的Call对象实际是RealCall实现的,RealCall创建就是初始化了几个属性,OkHttpClient,Request,retryAndFollowUpInterceptor。

retryAndFollowUpInterceptor是重定向拦截器,和先前的任务调度器Dispatcher一样,是实现网络请求的关键之一。

五 call.enqueue

前面都是初始化和配置设置工作,终于到实现网络请求的步骤了,enqueue是上面介绍的RealCall实现的。

@Override public void enqueue(Callback responseCallback) {
    //只能执行一次
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    //使用dispatcher用线程池执行网络请求
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
  
final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
    
     @Override protected void execute() {
      boolean signalledCallback = false;
      try {
      //获取返回值
        Response response = getResponseWithInterceptorChain();
        //对返回值进行回调
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
}

call.enqueue中,调用了OkHttpClient的Dispatcher的enqueue,将AsyncCall将于线程池执行,线程真正执行的内容在AsyncCall的execute中。

通过getResponseWithInterceptorChain获取到网络请求的返回值,那实现网络请求的重点就在getResponseWithInterceptorChain中了。

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);//在网络请求失败后进行重试
    interceptors.add(new BridgeInterceptor(client.cookieJar()));//桥接拦截器,对头部信息进行一系列设置
    interceptors.add(new CacheInterceptor(client.internalCache()));//缓存拦截器
    interceptors.add(new ConnectInterceptor(client));//连接拦截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

getResponseWithInterceptorChain里添加了一系列的拦截器,再创建RealInterceptorChain开始通过进行责任链模式实现网络请求。

而责任链的顺序:

  1. OkHttpClient的addInterceptor的拦截器
  2. BridgeInterceptor
  3. CacheInterceptor
  4. ConnectInterceptor
  5. OkHttpClient的addNetworkInterceptor的拦截器
  6. CallServerInterceptor

5.1 Interceptor

5.1.1 RealInterceptorChain

那从责任链模式模式的角度先来分析它的是如何实现的。这种模式为请求创建了一个接收者对象的链,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。

public final class RealInterceptorChain implements Interceptor.Chain {
@Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    
    // 1.创建下一阶段的链对象
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    //2.获取这一阶段链的拦截者
    Interceptor interceptor = interceptors.get(index);
    //3.让这一阶段的拦截者处理请求
    Response response = interceptor.intercept(next);

    // 确保下一阶段的拦截者next,执行了chain.proceed()
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // 确保拦截器请求结果不为空
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    return response;
  }
}

在第一次调用chain.proceed(originalRequest)开启责任链处理请求时,index还是0,

RealInterceptorChainInterceptor关系就很明了了:RealInterceptorChain的proceed方法中,会创建链的下一阶段RealInterceptorChain对象,做为Interceptor的intercept方法参数,在intercept中会执行chain.proceed()进而到链的下一环。

简单来讲,proceed方法就是RealInterceptorChain根据index获取响应登记的Interceptor,调用intercept方法时又会调用传入的chain.proceed。

在proceed方法中,有四个参数:Request,StreamAllocation,HttpCodec,RealConnection。但在RealCall的getResponseWithInterceptorChain中

 Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);

只穿入了Request对象,其它对象则是在后续的拦截者中创建的。下面就根据这几个参数来解析这些的拦截者。

5.1.2 RetryAndFollowUpInterceptor

顾名思义,重试与重定向拦截器。也就是在网络请求失败的情况下,会自动进行重连,内部通过while(true)死循环来进行重试获取Response(有重试上限,超过会抛出异常)。

影响process方法中的StreamAllocation的创建。

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    //创建streamAllocation对象
    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    //1.循环获取请求结果
    while (true) {
    Response response = null;
      boolean releaseConnection = true;
      try {
        //2.责任链下一环去处理请求,获取请求结果
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // 3.通过路由连接的尝试失败,这时请求没有发送出去。判断满足可恢复条件,满足则继续循环重试。
        if (!recover(e.getLastConnectException(), false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // 尝试与服务器通信失败。请求可能已经发送。
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // 抛出异常如果没有被catch,或没有异常发生,释放streamAllocation资源
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }
      
      //4.分析返回码,303的话需要重定向
      Request followUp = followUpRequest(response);
      //不需要重定向就返回请求结果
      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }
      //5.重定向次数不能超过MAX_FOLLOW_UPS
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
    }
  }
  1. 通过循环来获取请求返回值
  2. 责任链叫给下一级获取请求返回值
  3. 请求过程中发生RouteException,IOException异常,则进行重连请求。
  4. 重定向判断,如需要就重定向,不需要就返回请求值给上一级责任链。
  5. 重定向次数不能超过MAX_FOLLOW_UPS,20次。

5.1.3 BridgeInterceptor

适配器模式(Adapter Pattern)是作为两个不兼容的接口之间的桥梁。而BridgeInterceptor也是起着这样的作用,它是实现应用层和网络层直接的数据格式编码的桥,用于完善请求头。影响process方法中的Request参数。

public final class BridgeInterceptor implements Interceptor {
    @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        //1.完善请求头
        RequestBody body = userRequest.body();
        if (body != null) {
            MediaType contentType = body.contentType();
              if (contentType != null) {
                requestBuilder.header("Content-Type", contentType.toString());
              }
              ...
        }
        ...
        
        //2.责任链下一环,获取请求返回值
        Response networkResponse = chain.proceed(requestBuilder.build());
        
        //3.对返回值的消息报头进行转化
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
        ...
    }
}

如注释所写的,分为三步

  1. 完善请求头
  2. 责任链下一环,获取请求返回值
  3. 对返回值的消息报头进行转化

就是对请求头和消息报头处理。

5.1.4 CacheInterceptor

要了解这里缓存如何实现,还是要先了解http缓存机制。但也要注意一点,有些缓存字符意义并不完全一样,这个解析CacheInterceptor代码时介绍。

5.1.4.1 http缓存机制

这里就不大篇幅描述,只做简要概述。

缓存其实可以分成两类,一种是不请求网络读取本地缓存的,一种是请求网络进行对比的缓存。Header中的Expires/Cache-Control属于第一种,Last-Modified / If-Modified-Since,Etag / If-None-Match属于第二种。

  1. 第一种缓存的流程:客户端请求数据,直接从本地缓存返回结果;本地缓存失效则请求网络,再将缓存请求结果。
  2. 第二种缓存流程:客户端请求数据,从缓存获取缓存里header的缓存标示,再请求服务器验证本地缓存是否失效,失效则返回结果并缓存;有效通知客户端缓存有效,客户端读取本地缓存。
5.1.4.1.1 Expires

Expires的值为服务端返回的到期时间,即下一次请求时,请求时间小于服务端返回的到期时间,直接使用缓存数据。

Expires 是HTTP 1.0的东西,现在默认浏览器均默认使用HTTP 1.1,所以它的作用基本忽略。所以HTTP 1.1 的版本,使用Cache-Control替代。

5.1.4.1.2 Cache-Control

请求头

指令 说明
no-cache 需要使用网络对比缓存来验证缓存数据
no-store 所有内容都不会缓存
max-age=[秒] 缓存的内容将在 xxx 秒后失效
max-stale=[秒] 可接受的最大过期时间
min-fresh=[秒] 询问再过[秒]时间后资源是否过期,若过期则不返回
only-if-cached 只获取缓存的资源而不联网获取,如果缓存没有命中,返回504

响应头还有public,private等,这些在特别情况下如响应状态码为302,307等情况作用缓存响应消息。

5.1.4.1.3 Last-Modified / If-Modified-Since
5.1.4.1.4 Etag / If-None-Match(优先级高于Last-Modified / If-Modified-Since)

5.1.4.2 CacheInterceptor

OKHttp的缓存策略,就是实现Http的缓存策略。

public final class CacheInterceptor implements Interceptor {
 @Override public Response intercept(Chain chain) throws IOException {
    //1.如果配置了缓存,获取同一请求的缓存Response
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
    //2.根据请求头和缓存的响应,生成缓存策略
    //也就是对请求头和响应头中缓存相关的标示生成策略,看用哪种HTTP缓存方式
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    //网络请求,为空则不请求网络
    Request networkRequest = strategy.networkRequest;
    //缓存,为空则不获取缓存
    Response cacheResponse = strategy.cacheResponse;

    //记录网络请求或缓存结果获取的次数
    if (cache != null) {
      cache.trackResponse(strategy);
    }
    //如果有缓存这次请求却不用缓存,则释放缓存
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); 
    }

    // 3.如果网络没有且无法从缓存中获取结果,返回504错误
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // 4.如果不需要网络请求,缓存结果有,则使用缓存获取结果
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    //5.走到这步,则说明有网络请求。通过责任链下一环获取响应值。
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // 如果网络请求失败且缓存结果不为空,则释放缓存
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // 6.,缓存结果不为空,且网络请求返回304,则读取本地缓存
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
      //合并网络请求返回结果和缓存结果,主要是合并网络请求结果的header
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // 6.1 将结果集更新到缓存中
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    //7.走到这,说明不需要使用缓存,则直接使用网络结果
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();
    //8.如果有缓存,经过服务器校验缓存过期了
    if (cache != null) {
    //如果网络请求结果有响应正文,且根据请求和响应的Header判断需要保存缓存,就保存
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // 将网络结果保存到缓存中
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }
    //不需要保存缓存,且是POST,PATCH,PUT,DELETE,MOVE中的请求方法,则删除缓存
      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }
    //9.走到这里,说明网络结果过程都没用到缓存,是无缓存操作
    return response;
  }
}

这个方法,其实就是实现Http缓存,大部分可以按HTTP缓存来理解它。Header里缓存字段如何配置,决定了缓存要如何处理。这些上面介绍Http缓存时已经说过,就不再赘述。

说几个要注意的地方

但缓存实现也有与Http缓存介绍不同的地方,这需要到CacheStrategy中分析。

5.1.4.3 CacheStrategy

缓存策略的生成,它是通过工厂模式创建,和Retrofit的字符转换器网络适配器生成几乎一样的行式:创建工厂在调用get创建。

CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();

先分析Factory

public static class Factory {
    public Factory(long nowMillis, Request request, Response cacheResponse) {
      this.nowMillis = nowMillis;
      this.request = request;
      this.cacheResponse = cacheResponse;
      //将缓存结果的header分析,将缓存相关字符分离出来
      if (cacheResponse != null) {
        this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
        this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
        Headers headers = cacheResponse.headers();
        for (int i = 0, size = headers.size(); i < size; i++) {
          String fieldName = headers.name(i);
          String value = headers.value(i);
          if ("Date".equalsIgnoreCase(fieldName)) {
            servedDate = HttpDate.parse(value);
            servedDateString = value;
          } else if ("Expires".equalsIgnoreCase(fieldName)) {
            expires = HttpDate.parse(value);
          } else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
            lastModified = HttpDate.parse(value);
            lastModifiedString = value;
          } else if ("ETag".equalsIgnoreCase(fieldName)) {
            etag = value;
          } else if ("Age".equalsIgnoreCase(fieldName)) {
            ageSeconds = HttpHeaders.parseSeconds(value, -1);
          }
        }
      }
    }
}

主要作用就是将缓存结果头中的缓存字符分离出来,为后面的缓存策略生成做准备。

public CacheStrategy get() {
      CacheStrategy candidate = getCandidate();
      //如果请求头有only-if-cached,说明只用缓存,但缓存又失效了,网络缓存都不能用
      if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
        return new CacheStrategy(null, null);
      }

      return candidate;
    }

对only-if-cached字符加了道验证,策略实现的主要方法是getCandidate

    private CacheStrategy getCandidate() {
      // 1.没有缓存结果,创建的策略只用网络请求
      if (cacheResponse == null) {
        return new CacheStrategy(request, null);
      }

      // 2.当请求的协议是https的时候,如果cache没有TLS握手就丢弃缓存,只用网络请求
      if (request.isHttps() && cacheResponse.handshake() == null) {
        return new CacheStrategy(request, null);
      }

      // 3.cacheResponse和request的header的Cache-Control都要不是no-store,否则只用网络请求
      if (!isCacheable(cacheResponse, request)) {
        return new CacheStrategy(request, null);
      }
    
      //4.如果请求头里的Cache-Control值是no-cache或有If-Modified-Since,If-None-Match,则只用网络请求
      CacheControl requestCaching = request.cacheControl();
      if (requestCaching.noCache() || hasConditions(request)) {
        return new CacheStrategy(request, null);
      }
     
      //5.缓存有效性校验,根据缓存的缓存时间,缓存可接受最大过期时间等等HTTP协议上的规范来判断缓存是否可用。
      //这里是第一种不请求网络只用本地缓存的策略
      long ageMillis = cacheResponseAge();
      long freshMillis = computeFreshnessLifetime();

      if (requestCaching.maxAgeSeconds() != -1) {
        freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
      }

      long minFreshMillis = 0;
      if (requestCaching.minFreshSeconds() != -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
      }

      long maxStaleMillis = 0;
      CacheControl responseCaching = cacheResponse.cacheControl();
      if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
      }

      if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        Response.Builder builder = cacheResponse.newBuilder();
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
        }
        long oneDayMillis = 24 * 60 * 60 * 1000L;
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
        }
        return new CacheStrategy(null, builder.build());
      }
      
      // 6.这里是看请求是否为缓存网络比对。
      String conditionName;
      String conditionValue;
      if (etag != null) {
        conditionName = "If-None-Match";
        conditionValue = etag;
      } else if (lastModified != null) {
        conditionName = "If-Modified-Since";
        conditionValue = lastModifiedString;
      } else if (servedDate != null) {
        conditionName = "If-Modified-Since";
        conditionValue = servedDateString;
      } else {
        //7.如果比对缓存也不是,那只有常规的网络请求了
        return new CacheStrategy(request, null); 
      }

      Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
      Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);

      Request conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build();
      //7.返回第二种网络比对的缓存策略
      return new CacheStrategy(conditionalRequest, cacheResponse);
    }

CacheControl是request和Response中为辅助缓存处理的类,通过分解header的缓存字符来创建。

其实创建的策略就三种,

only-if-cached不能单独使用,它只是限制网络请求,缓存有效的判断还是要另外设置。

缓存策略与Http缓存不同的点:

5.1.5 ConnectInterceptor

解析完缓存,责任链下一环就是ConnectInterceptor了,它通过StreamAllocation获取到HttpCodec和RealConnection。

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    //这是RetryAndFollowUpInterceptor创建并传入的
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    // HTTP/1.1生成Http1xStream;HTTP/2 生成Http2xStream ,实际上是通过RealConnection生成的
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    //从连接池中获取可用连接
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
  
  public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    try {
    //1.获取连接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
    //通过连接获取HttpCodec
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

HttpCodec里实际是利用Okio来实现读写。

先来看如何获取RealConnection吧

5.1.5.1 RealConnection的获取

findHealthyConnection -> findConnection

  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    //connectionPool 连接池
    synchronized (connectionPool) {
    
      // 1.复用已有连接
      RealConnection allocatedConnection = this.connection;
      //已有连接是否可用
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // 2.从连接池中获取连接,赋值给了connection
      Internal.instance.get(connectionPool, address, this, null);
      //连接池获到了connection,就使用
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }

    // 如果需要路由,就获取一个,里面有代理信息
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // 3.加上路由再从连接池获取连接一次
      // This could match due to connection coalescing.
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) return connection;

      // 4.自己创建连接
      // 用一个异步cancel()来中断我们将要进行的握手。
      route = selectedRoute;
      refusedStreamCount = 0;
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }

    // 这里初始化初始化了BufferedSource和BufferedSink,与创建HttpCodec有关。
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // 5.将新建的连接放入连接池
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
  }

RealConnection的获取,总得来说有四个优先级:

  1. StreamAllocation中已有的connection。
  2. 无需路由参数从连接池获取的connection。
  3. 有路由参数从连接池获取的connection。
  4. 自己创建的connection

RealConnection的connect方法里,调用connectSocket将实现网络请求的Socket和用Okio实现了读写,也就是说请求消息和响应消息都通过Okio来实现读写了,实现读写的关键BufferedSourceBufferedSink在这里初始化,也实现HttpCodec功能的重要参数。

对于从连接池获取connection,分析下连接池ConnectionPool就能明白怎么实现的了。

5.1.5.2 ConnectionPool

既然是连接池,那我们首要关心的是如果存储连接,和如何管理连接。

private final Deque<RealConnection> connections = new ArrayDeque<>();

在ConnectionPool中,是用双向队列Deque来存储连接。

再看放入连接方法

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    //进行连接池清理
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    //放入队列
    connections.add(connection);
  }

在每次放入新的连接时,先清理存放的连接,再存放连接。

//连接存放的最大数量,默认为5
private final int maxIdleConnections;
//空闲连接存活时间,最多五分钟
  private final long keepAliveDurationNs;
//用线程池来进行连接清理
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
 private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
      //cleanup就是根据设置的存活条件,进行连接清理
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

在OKHttp3 的默认实现中,这些连接中最多只能存在 5 个空闲连接,空闲连接最多只能存活 5 分钟。

@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
    //根据传入的地址和路由,判断连接是否合格
      if (connection.isEligible(address, route)) {
        //acquire方法赋值连接给streamAllocation的connection
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }
  //streamAllocation的acquire方法
  public void acquire(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();
    //进行赋值
    this.connection = connection;
    //用allocations来记录每个使用了connection的streamAllocation对象
    connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
  }

5.1.6 CallServerInterceptor

这里就是执行网络请求的拦截器了。

@Override public Response intercept(Chain chain) throws IOException {
    //责任链
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    //从ConnectInterceptor中获取的httpCodec
    HttpCodec httpCodec = realChain.httpStream();
    //从retryAndFollowUpInterceptor获取的streamAllocation
    StreamAllocation streamAllocation = realChain.streamAllocation();
    //从ConnectInterceptor中获取的connection
    RealConnection connection = (RealConnection) realChain.connection();
    //在BridgeInterceptor进行组装Request的header
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    //将请求头放入
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    //对请求消息的请求数据不为空的类型进行操作
    //操作是对头包含“Expect: 100-continue”的请求,特别处理
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      //对头包含“Expect: 100-continue”的请求,先询问服务器是否愿意接受数据
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        //发送请求询问
        httpCodec.flushRequest();
        //获取返回值,如果code为100,则responseBuilder为空,表示服务器原因接收
        responseBuilder = httpCodec.readResponseHeaders(true);
      }
        
      if (responseBuilder == null) {
        // 通过请求消息,获得http的主体,httpCodec的内部类
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        //获取RealBufferedSink对象,和RealConnection中创建的BufferedSink是同一个类
        //相当于迭代执行请求消息写入
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        //将请求消息通过BufferedSink迭代写入,bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink
        //最后一个sink是与Socket关联的类,是Okio的静态内部类
        request.body().writeTo(bufferedRequestBody);
        //迭代关闭BufferedSink
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        // "Expect: 100-continue"询问结果为不接受body,所以关闭请求
        streamAllocation.noNewStreams();
      }
    }
    //将请求消息写入socket,socket发送请求消息;没有发送请求就会正常发送,发送过的就什么都不执行
    httpCodec.finishRequest();
    
    if (responseBuilder == null) {
        //读取响应头
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    //根据响应头先创建Response
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
    //将响应正文加入response,openResponseBody方法里会用Okio读取正文。
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

对于网络请求真正的实现,还是利用Okio对Socket的读写操作进行封装,例如有请求数据时的Okio写入流程:

bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink

最后一个sink是与Socket关联的类,是Okio的静态内部类

6.总结

总体流程: OKHttp流程.png

它们都用双向队列来存储管理数据。Dispatcher中有三个队列:Deque<AsyncCall> readyAsyncCalls,Deque<AsyncCall> runningAsyncCalls,Deque<RealCall> runningSyncCalls。ConnectionPool中有Deque<RealConnection> connections。

缓存策略与Http缓存不同的点:
CacheStrategy缓存策略,就是根据缓存字段来实现缓存算法。但有一点,request的header里包含If-Modified-Since或If-None-Match或no-cache,会直接走网络请求而不走缓存。这两个字符是要在缓存结果里有才会走缓存,这与上面介绍的HTTP缓存字段不同。具体看下面分析。
当只用本地缓存时,Http缓存会在本地缓存失效后再请求网络,而OKhttp只会请求本地缓存,不会再请求网络。

BufferedSource和BufferedSink是Okio连接网络的Socket实现其读写,RealConnection创建HttpCodec来统一管理网络请求的输入输出。例如有请求数据时的Okio写入流程:
bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink->socket.getOutputStream()

7.参考

关于HTTP协议,一篇就够了

http缓存浅谈

彻底弄懂HTTP缓存机制及原理

HTTP/1.1 协议Expect: 100-continue

HTTP状态码

Okhttp3基本使用

Okhttp基本用法和流程分析

OkHttp3源码和设计模式-1

OKHTTP3源码2-连接池管理

OkHttp3源码详解(二整体流程)

okhttp3源码的使用详解

OkHttp3源码解析

在 Retrofit 和 OkHttp 中使用网络缓存,提高访问效率

上一篇下一篇

猜你喜欢

热点阅读