okHttp源码阅读笔记

2022-01-25  本文已影响0人  bridegg

OkHttpClient的装饰着直接略过,直接分析开始网络请求
源码基于3.14.9

网络请求队列

  @Override public Call newCall(Request request) {
//将封装的参数和当前创建的client传入,创建一个新的RealCall
    return RealCall.newRealCall(this, request, false /* for web socket */);
  }

RealCall.class

  static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    //真正的创建realCall对象 此时realCall中持有 client,封装的request
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    //创建 Transmitter对象,让RealCall和Transmitter互相持有对方的实例
    call.transmitter = new Transmitter(client, call);
    return call;
  }

此时
RealCall 持有 Transmitter,client,request
transmitter持有client,RealCall

然后执行同步或者异步请求,先看同步请求execute

 @Override public Response execute() throws IOException {
   synchronized (this) {
//加锁保持原子性
     if (executed) throw new IllegalStateException("Already Executed");
     executed = true;
   }
//开始超时记时,初始化 transmitter的时候初始化的
   transmitter.timeoutEnter();
//回调事件监听 详见Builder eventListener
   transmitter.callStart();
   try {
//将当前call放到队列里,不为啥,就是记录,网上说队列里做请求的都是扯淡
     client.dispatcher().executed(this);
//好戏开始准备套娃执行插值器了
     return getResponseWithInterceptorChain();
   } finally {
     client.dispatcher().finished(this);
   }
 }

这段代码执行完毕以后,核心逻辑在getResponseWithInterceptorChain
在getResponseWithInterceptorChain 执行之前transmitter目前只持有client,RealCall,和几个初始化就创建的不重要对象

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
//请求前插值器详见 Builder addInterceptor 提供给业务层,所以放到最前面了
    interceptors.addAll(client.interceptors());
//从故障中恢复,重定向。如果调用被取消,它可能会抛出一个IOException。
    interceptors.add(new RetryAndFollowUpInterceptor(client));
//链接层,构建网络请求
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存层,请求前提供缓存,请求后保存缓存
    interceptors.add(new CacheInterceptor(client.internalCache()));
//执行请求层,这里创建了真正的请求对象,
    interceptors.add(new ConnectInterceptor(client));
//newWebSocketCall 用的业务插值器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
//真实请求发生的地方,也是整个链表的最末端
    interceptors.add(new CallServerInterceptor(forWebSocket));
//通过这个方法,把list转换成一个带有proceed(划重点)方法的链表
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    boolean calledNoMoreExchanges = false;
    try {
//开始执行链表
      Response response = chain.proceed(originalRequest);
//这以下代码都是处理后事的,可以不看,看看也可以知道有时候发生的各种诡异异常时什么时期的
      if (transmitter.isCanceled()) {
        closeQuietly(response);
        throw new IOException("Canceled");
      }
      return response;
    } catch (IOException e) {
      calledNoMoreExchanges = true;
      throw transmitter.noMoreExchanges(e);
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null);
      }
    }
  }

okHttp通过RealInterceptorChain这个对象将一个顺序列表转化成了一个链表,这个然后通过proceed去执行插值器里的intercept方法,顺便把链表的后置节点传入
代码如下

RealInterceptorChain.class
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
      throws IOException {
//省略抛出异常代码
    calls++;
//省略抛出异常代码
    // If we already have a stream, confirm that this is the only call to chain.proceed().
//这段注释和异常留着是因为 有人不知道calls++干啥使,因为我看的时候也不知道,后来发现就是为了保证单链表的proceed只执行一次用的
    if (this.exchange != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
//整个列表转换链表核心,先分析index
//index在RealCall中传入的是0  next传入的就是 1,此处index正好对应RealCall中传入的插值器List的下标
//而且把持有的 transmitter, exchange(初始为null,后来在ConnectInterceptor创建)等等持有对象传入
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
//拿到当前插值器
    Interceptor interceptor = interceptors.get(index);
//执行插值器核心代码,这段Response是每个插值返回的,可以理解成一个递归操作,直到执行到CallServerInterceptor的intercept才会真正返回Response
    Response response = interceptor.intercept(next);
    // Confirm that the next interceptor made its required call to chain.proceed().
//以下省略一坨抛出异常的代码

    return response;
  }

看到这里不得不佩服OkHttp这段代码的精妙之处,通过插值器对应OSI七层模型,通过链表对应执行顺序,通过入参来逐步封装Request,通过返回值来封装Response。。6啊!
调用伪代码如下

->getResponse(Request)
{
  return  client.interceptors.intercept(Request,RetryAndFollowUpInterceptor)
      {
       ...//Request封装
       Response1=   RetryAndFollowUpInterceptor.intercept(Request,BridgeInterceptor)
          {
            ...//Request封装
             Response2= BridgeInterceptor.intercept(Request,CacheInterceptor)
            {
              ...//Request封装
              Response3 =ConnectInterceptor.intercept(Request,CacheInterceptor)
              {
                ...//Request封装
                    Response4= CallServerInterceptor.intercept(Request)
                ...//Response封装
                return Response4
              }
              ...//Response封装
              return Response3
            }
            ...//Response封装
            return Response2
          }
       ...//Response封装
       return Response1
      }
}

相当于各个插值器干了各种不同的事情,那就一个一个看都干了啥

RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor.class
  @Override public Response intercept(Chain chain) throws IOException {
//获取request
    Request request = chain.request();
//持有链表下个插值器
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
//拿 transmitter.
    Transmitter transmitter = realChain.transmitter();

    int followUpCount = 0;
    Response priorResponse = null;
//死循环走起,不达目的不罢休
    while (true) {
//transmitter初始化,主要目的是为了创建 exchangeFinder对象
//exchangeFinder 这玩意主要是做寻找已存在链接用的,详细可以看exchangeFinder类注释,主要在find方法可以获取健康的请求链接,如果没有会创建新的
//而exchangeFinder通过find可以得到RealConnection 而RealConnection 通过newCodec可以创建一个真正的网络请求链接对象ExchangeCodec
//ExchangeCodec 中持有http2Connection或者直接返回Http1ExchangeCodec
//Http2Connection里就是Socket链接了。。。
俄罗斯套娃呢!!!
//这句话就是重定向和复用链接的东西下面简单展开
      transmitter.prepareToConnect(request);
//如果取消直接抛出异常
      if (transmitter.isCanceled()) {
        throw new IOException("Canceled");
      }
//
      Response response;
      boolean success = false;
      try {
//这里开始执行链表套娃,在此处执行完毕以后其实就已经得到最终结果了,根据最终结果来确定是否需要重新执行或者重定向或者断开链接等等一系列的操作,具体在这行代码下面
        response = realChain.proceed(request, transmitter, null);
        success = true;
      } catch (RouteException e) {
        // 路由链接异常
        if (!recover(e.getLastConnectException(), transmitter, false, request)) {
          throw e.getFirstConnectException();
        }
        continue;
      } catch (IOException e) {
        // 重复请求异常

        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, transmitter, requestSendStarted, request)) throw e;
        continue;
      } finally {
        if (!success) {
      //发生异常了,把Exchange置空,Exchange在ConnectInterceptor中初始化的(就是链表套娃那)
          transmitter.exchangeDoneDueToException();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Exchange exchange = Internal.instance.exchange(response);
      Route route = exchange != null ? exchange.connection().route() : null;
//这段代码来确定到底是真实拿到了返回结果,还是需要重新发起请求,还是直接返回异常信息
      Request followUp = followUpRequest(response, route);
//验证出现问题了,具体原因在followUpRequest
      if (followUp == null) {
        if (exchange != null && exchange.isDuplex()) {
          transmitter.timeoutEarlyExit();
        }
        return response;
      }
//这里应该是请求成功了
      RequestBody followUpBody = followUp.body();
      if (followUpBody != null && followUpBody.isOneShot()) {
        return response;
      }
//下面的都是处理当次请求的后事,清除各种缓存变量对象,判断是否超出最大请求数目
      closeQuietly(response.body());
      if (transmitter.hasExchange()) {
        exchange.detachWithViolence();
      }

      if (++followUpCount > MAX_FOLLOW_UPS) {
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      request = followUp;
      priorResponse = response;
    }
  }

从上面代码来看,这个插值器就是弄了个大循环,不停的尝试每次请求返回的结果,如果正确就直接返回Response,如果链接异常则按照各种各样的问题抛出异常,或者重新请求,主要代码是followUpRequest来判断的这里就不深究了
继续看第二个俄罗斯套娃

BridgeInterceptor

代码如下

BridgeInterceptor.class

 @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
---------从此处开始------------
    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }
    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
--------到此处截止,都是处理header信息
//这里套娃,执行CacheInterceptor
    Response networkResponse = chain.proceed(requestBuilder.build());
//处理cookie
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
开始封装   Response
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
//处理Gzip
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

这个类没啥太多的东西主要是封装request和response的东西,所以叫桥
继续下个套娃

CacheInterceptor

CacheInterceptor.class
需要注意在RealCall中传参 说明cache是配置的
    interceptors.add(new CacheInterceptor(client.internalCache()));


@Override public Response intercept(Chain chain) throws IOException {
//拿配置的缓存
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
//通过请求参数获取一个工厂对象,里面不用关心,否则看不完代码,越看越懵
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
//如果配置的缓存对象不为空,就把这个缓存结果扔进去
    if (cache != null) {
      cache.trackResponse(strategy);
    }
//请求没对上解析,直接把缓存的body里的持有的东西处理了
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // 这个是如果没网,直接返回,也不用走套娃了主要是networkRequest判断的没网,cacheResponse判断没有缓存
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // 这种情况是没网,但是没缓存的情况已经被上面的判断过滤了,所以这里就是没有网有缓存,直接返回缓存
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
//开始套娃,拿真实的请求结果
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // 得到返回结果,而且是服务器告知和上次结果一致的前提下,直接返回缓存结果并且更新缓存
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
//先将缓存结果赋予Response,如果服务器返回没有问题,直接赋值服务器返回结果
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }

就是处理单次链接没网有网,服务器返回结果和缓存结果比较简单复杂逻辑主要在CacheStrategy中,不深究,主要是一些流的读写,请求比对等逻辑

下一个套娃 网络链接

ConnectInterceptor(这里初始化了很多东西需要仔细阅读)

ConnectInterceptor.class
  @Override public Response intercept(Chain chain) throws IOException {
//套娃持有一大坨    
RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    //判断是否是get请求
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
//核心逻辑,创建 Exchange
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
//套娃
    return realChain.proceed(request, transmitter, exchange);
  }



这里展开transmitter.newExchange

Transmitter.class

  Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if (exchange != null) {
        throw new IllegalStateException("cannot make a new request because the previous response "
            + "is still open: please call response.close()");
      }
    }
//通过exchangeFinder找到健康的ExchangeCodec,通过ExchangeCodec创建Exchange
    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      return result;
    }
  }

显然核心代码在exchangeFinder.find和 new Exchange中,先看exchangeFinder.find

public ExchangeCodec find(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();
//上面是各种参数赋值,主要是超时和重新链接
    try {
//这里去寻找健康的RealConnection,核心在这里
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//这块的代码把上面的逻辑搞懂后再看
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw new RouteException(e);
    }
  }

继续看findHealthyConnection


  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
//又一个死循环,主要是判断是否健康那里,找到不健康的干掉,找到健康的就返回
    while (true) {
//核心寻找逻辑
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);
//下面的都是判断健不健康

      synchronized (connectionPool) {
//纯新链接 逻辑不深究,
        if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
          return candidate;
        }
      }

      // 判断健康不健康,注释说可能挺久的
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges();
        continue;
      }

      return candidate;
    }
  }

继续看findConnection方法

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
//锁链接池
    synchronized (connectionPool) {
//transmitter凉了,就直接抛出异常
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; // This is a fresh attempt.

      // 把当前存在的链接标记释放位,可能为空,也可能被锁定不能创建Exchange

      releasedConnection = transmitter.connection;
//如果被链接池锁定了,不能创建新的Exchange,标记到关闭
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;
//直接现在存在的链接,如果链接不为空就把释放位置空
      if (transmitter.connection != null) {
        result = transmitter.connection;
        releasedConnection = null;
      }
//从这里就接触到核心的代码了
      if (result == null) {
//这里主要是过滤connection.isEligible返回false的connect
//connection.isEligible= false 包含 该connection连词次数超过最大,没有地址
//如果过滤后还能拿到connection则通过transmitter.acquireConnectionNoEvents(connection);将connection赋与transmitter,返回ture,如果没有查到任何符合条件的connection就返回false
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
//这里找到了健康的存活的connection并且将connection赋予了transmitter中,让result指向该connection
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
//如果没有找到connection判断nextRouteToTry
//这个东西在prepareToConnect的时候执行hasRouteToTry会赋值
//主要目的是第一次或者某次尝试链接后,如果当前路由或者还有没有尝试的下个路由,将当前路由或者下次准备尝试的路由作为路由目标
//为什么我自己写的中国话我自己都这么难理解
          selectedRoute = nextRouteToTry;
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
//如果上面条件都不符合,那就直接去connection中寻找当前路由,如果connection为空,上面的条件也不成立
          selectedRoute = transmitter.connection.route();
        }
      }
    }
//在这里关闭了链接池锁定的connection
    closeQuietly(toClose);
//回调监听就是监听请求状态的那个监听,在httpClient里传入的
    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
//同样回调监听
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
//如果找到了,就直接返回了,第一次一般都是空
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result;
    }

    // 判断当前如果没有路由,需要获取路由
//获取方式在创建ExchangeFinder的时候,会初始化
//    this.routeSelector = new RouteSelector(
        address, connectionPool.routeDatabase, call, eventListener);
//routeSelector这个对象可以根据地址获取路由列表
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
//此处拿到路由列表
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        // 如果是新拿的路由,说明connection又得重新获取
//说实话这里我觉得写的有问题,应该可以优化

        routes = routeSelection.getAll();
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }
//如果上面找健康的connection还是没找到,所有的东西重新创建吧
      if (!foundPooledConnection) {
//拿新路由
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // 创建新的连接
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

//如果经历上面的两段代码发现拿到了,再告诉监听,直接返回
//相当于第一次直接拿Connection没拿到,换了个路由拿,拿到了,这里返回
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // 这里就是新建的链接了,先执行一次链接  TCP + TLS握手
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());
//执行完上面的源码以后RealConnection已经拿到了
// rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP? address.socketFactory().createSocket() : new Socket(proxy);
 //source = Okio.buffer(Okio.source(rawSocket));
 //  sink = Okio.buffer(Okio.sink(rawSocket));
//此时已经执行完第一次握手
    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // OKhttp的团队干了件很奇怪的事,创建完新的链接以后又再池子里面找一遍
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
//这段注释让我笑了很久,自己辛辛苦苦创建了个链接,然后又从里面找竟然tm的找到了!!!那就把找到的链接返回回去
        // We lost the race! Close the connection we created and return the pooled connection.
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;

        //这种情况找到的链接很可能非常不安全,通过nextRouteToTry标签重新走一遍上面的流程
        nextRouteToTry = selectedRoute;
      } else {
//没找到,说明真的没有链接,创建connection放到池子里,并且让transmitter持有
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);
      }
    }
    closeQuietly(socket);
//回调监听
    eventListener.connectionAcquired(call, result);
    return result;
  }

这段代码非常的长,但是可见exchangeFinder主要的逻辑就在这里了,不停的调用transmitterAcquirePooledConnection寻找池子里的链接,如果各种情况尝试了都没找到才会新生成一个链接,更可怕的事,新生成链接执行握手之后,再次从池子里找了一遍。。。恐怖

继续深入fresult.connect(connectTimeout, readTimeout, writeTimeout,pingIntervalMillis,connectionRetryEnabled, call, eventListener);

这里只看简单流程了
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
      EventListener eventListener) {
  //省略各种初始化判断代码
    while (true) {
      try {
        if (route.requiresTunnel()) {//是否通道链接https
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
        } else {///普通链接
          connectSocket(connectTimeout, readTimeout, call, eventListener);
        }
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);

      } catch (IOException e) {
        }
//省略各种异常处理
    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }
 private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
      EventListener eventListener) throws IOException {
创建通道请求
    Request tunnelRequest = createTunnelRequest();
    HttpUrl url = tunnelRequest.url();
//https循环21次,有啥说法吗?求大佬解析
    for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
//执行socket链接
      connectSocket(connectTimeout, readTimeout, call, eventListener);
//处理返回结果是否抛出异常
      tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
//省略代码
    }
  }
//这代码看看就行了,理解思路,再往下点就是网络链接的东西了
  private void connectSocket(int connectTimeout, int readTimeout, Call call,
      EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);
//各种监听回调
    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
//真正的链接操作
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }
//这个注释留下来了,因为okhttp说android7.0会崩溃。。。哈哈哈哈哈
    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }


//这个代码通过请求返回的参数来控制当前是http1还是http2
  private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
      int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
    if (route.address().sslSocketFactory() == null) {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        socket = rawSocket;
        protocol = Protocol.H2_PRIOR_KNOWLEDGE;
        startHttp2(pingIntervalMillis);
        return;
      }

      socket = rawSocket;
      protocol = Protocol.HTTP_1_1;
      return;
    }

    eventListener.secureConnectStart(call);
    connectTls(connectionSpecSelector);
    eventListener.secureConnectEnd(call, handshake);

    if (protocol == Protocol.HTTP_2) {
//在这里创建http2Connection
      startHttp2(pingIntervalMillis);
    }
  }

  private void startHttp2(int pingIntervalMillis) throws IOException {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)
        .listener(this)
        .pingIntervalMillis(pingIntervalMillis)
        .build();
    http2Connection.start();
  }

看完上面的代码就可以回到find里了

  public ExchangeCodec find(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//通过前面的代码分析,此处已经获取到一个健康的connection了
//并且如果是http2的话,会创建一个 Http2Connection 对象的 http2Connection
//所以看看这段代码执行了什么
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw new RouteException(e);
    }
  }

  ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
    if (http2Connection != null) {
//如果是http2的请求,返回Http2ExchangeCodec
      return new Http2ExchangeCodec(client, this, chain, http2Connection);
    } else {
//否则返回Http1ExchangeCodec
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1ExchangeCodec(client, this, source, sink);
    }
  }

最后回到Transmitter newExchange里

  Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if (exchange != null) {
        throw new IllegalStateException("cannot make a new request because the previous response "
            + "is still open: please call response.close()");
      }
    }

    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
//这里让Exchange创建出来,并持有了一个已经创建好的Http2ExchangeCodec或者Http1ExchangeCodec
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      return result;
    }
  }

终于看完了ConnectInterceptor 此时Transmitter里已经初始化了几个重要的变量 RealConnection connection;Exchange exchange;Request request;ExchangeFinder exchangeFinder;OkHttpClient client;ealConnectionPool connectionPool;Call call;
下吗开始读最复杂的插值器CallServerInterceptor

CallServerInterceptor

源码不多但是逻辑可不少

@Override public Response intercept(Chain chain) throws IOException {
//拿到链子,注意,这里拿完了以后没有再次调用chain.proceed,所以这里是链的最低端
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

//写入请求头,这里执行了      codec.writeRequestHeaders(request);
//此处代码我们默认都创建了Http2ExchangeCodec,所以直接对Http2ExchangeCodec源码阅读
//Http2ExchangeCode.writeRequestHeaders中创建了Http2Stream对象,并且将header传入,用stream持有
    exchange.writeRequestHeaders(request);

    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
//请求体不为空
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      //是否支持100-continue协议,不展开了,百度即可

      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
//是否支持双工通讯,支持准备双工体
        if (request.body().isDuplex()) {
          //双工体就是收发可以同时进行,这里创建了一个套接字
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else {
        //普通通讯 直接准备一个body流
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
        //responseBuilder不为空的话说明这个请求是个有问题的请求,下面都出处理异常,
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
     //
          exchange.noNewExchangesOnConnection();
        }
      }
    } else {
    //请求体都没有,有问题,处理处理
      exchange.noRequestBody();
    }
//如果请求体时空或者不是双工,说明发起请求的串已经传输完毕了,可以停止request了
    if (request.body() == null || !request.body().isDuplex()) {
//执行slink的close->connection.writeSynReset 真正的写入
      exchange.finishRequest();
    }
//此时请求已经结束
//如果不是100协议,开始读取返回的header
    if (!responseHeadersStarted) {
//这里只是回调接口。。。这里逻辑没看懂,。。。
      exchange.responseHeadersStart();
    }
// 这里是真真的读取
    if (responseBuilder == null) {
    //简易流程Http2ExchangeCodec等待stream写入 也是真正的接口请求代码请求流程看我后面贴的图,此处发生耗时操作 body在stream的Source中
      responseBuilder = exchange.readResponseHeaders(false);
    }
//获得返回结果。。。至此一个完整的请求就已经结束了,下面就是对请求的封装了
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
//通过code封装返回结果
    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }

    exchange.responseHeadersEnd(response);

    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
//这里将没有问题的返回结果封装返回
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
//至此网络请求将再次通过插值器一层一层返回
    return response;
  }

关于http2的流的请求过程


image.png

芜湖,请求流程全部完事了~以上是同步请求。。再来看看异步请求是什么逻辑
异步请求核心和同步请求一致,主要是线程管理回到代码的最一开始

异步请求

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    transmitter.callStart();
//client会默认创建一个最大请求64个的调度器 负责记录和处理异步和同步请求的队列
//此处创建的AsyncCall 这个先留着,下面会分析
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }
Dispatcher.class
  void enqueue(AsyncCall call) {
    synchronized (this) {
      readyAsyncCalls.add(call);

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
//非webSocket需要在已存在的队列中寻找 
//这里代码非常简单就是两个队列寻找,先找runningAsyncCalls再找readyAsyncCalls
//runningAsyncCalls是正在执行的,因为上面代码已经解释了call和connection的关系,所以可以通过已经存在的call直接找到健康的connnection,此处队列只是用于寻找已存在的call
        AsyncCall existingCall = findExistingCallWithHost(call.host());
//如果队列存在,将存在队列的AtomicInteger赋予新的call
//AtomicInteger是原子计数器,主要是多线程处理加减法的时候,能保持原子性,也就是说线程阻塞,一个线程持有的并修改时,另一个线程不可修改
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
      }
    }
//这个处理从readyAsyncCalls里塞到runningAsyncCalls里
    promoteAndExecute();
  }
private boolean promoteAndExecute() {
//断言,没啥用
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
//循环readyAsyncCalls
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
//如果当前运行的队列太多了就不执行了,后面代码几乎都不执行了,虽然break但是executableCalls没东西呀
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
//通过每个请求的原子int去判断同一个域名最多能请求几个
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.

        i.remove();
//asyncCall的原子int自增,这里就会能知道每个asyncCall有执行了几次真实的请求
        asyncCall.callsPerHost().incrementAndGet();
//塞到要执行的临时队列
        executableCalls.add(asyncCall);
//塞到要负责记录的running队列
        runningAsyncCalls.add(asyncCall);
      }
//判断是否有正在运行的请求,用于返回值
      isRunning = runningCallsCount() > 0;
    }

    for (int i = 0, size = executableCalls.size(); i < size; i++) {
//把readyAsyncCalls能执行的接口全部执行了
      AsyncCall asyncCall = executableCalls.get(i);
//核心方法,需要看AsyncCall 
//executorService 通过线程池创建一个executorService没啥好说的
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

上面的代码只是做了个调度,用ready和running去处理接口管理,AsyncCall有个原子int不知道干啥的,可能在AsyncCall能得到答案,而且核心的请求应该在asyncCall.executeOn中

这个是AsyncCall继承类重写了run方法,在run里执行了execute
所以ExecutorService执行execute时会执行run里的execute
public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}
再次声明下ExecutorService执行execute时会执行run里的execute NamedRunnable搞得
final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    private volatile AtomicInteger callsPerHost = new AtomicInteger(0);
//业务层需要的回调作为构造函数入参
    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
//看原子int
    AtomicInteger callsPerHost() {
      return callsPerHost;
    }
//设置原子int
    void reuseCallsPerHostFrom(AsyncCall other) {
      this.callsPerHost = other.callsPerHost;
    }
//获取当前Call(不是AsyncCall)里的originalRequest.url
    String host() {
      return originalRequest.url().host();
    }
//获取当前Call(不是AsyncCall)里的originalRequest

    Request request() {
      return originalRequest;
    }
//获取当前Call(不是AsyncCall)
    RealCall get() {
      return RealCall.this;
    }

   //running里执行的
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
//通过 executorService的run执行AsyncCall的execute方法
        executorService.execute(this);
        success = true;
//下面都是错误处理
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
//在这里处理的插值器
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
//最后执行finish
        client.dispatcher().finished(this);
      }
    }
  }

Dispatcher.class
  private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }
//再刷一次reday
    boolean isRunning = promoteAndExecute();
啥也没有了,调用空闲run
    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

至此okhttp的主要请求流程就全部分析完毕了。。。
看这坨代码竟然用了两天。。。。。

上一篇下一篇

猜你喜欢

热点阅读