Android篇

OkHttp源码分析:五大拦截器详解

2021-03-26  本文已影响0人  w达不溜w

OkHttp源码分析:五大拦截器详解

一、RetryAndFollowUpInterceptor(重试与重定向拦截器)

主要完成两件事:重试与重定向

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    //创建StreamAllocation对象(包含http请求组件)
    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // ① 路由异常,连接未成功,请求还没发出去
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        //②请求发出去了,但是和服务器通信失败了(socket流正在读写数据的时候断开连接)
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // ③不是前两种失败,直接关闭和清理所有资源
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }
            //...
      //重定向
      Request followUp = followUpRequest(response, streamAllocation.route());
      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        //不需要重定向直接返回response终止循环
        return response;
      }
      //...
    }
  }

重试与重定向拦截器主要处理Response,可以看到RouteException和IOException都是调用了recover,返回true表示允许重试。允许重试—>continue—> while (true)—>realChain.proceed,这就完成了重试的过程。

private boolean recover(IOException e, StreamAllocation streamAllocation,
                        boolean requestSendStarted, Request userRequest) {
  streamAllocation.streamFailed(e);

  // ①OkHttpClient是否设置了不允许重试(默认允许),则一旦请求失败就不再重试——>全局配置
  if (!client.retryOnConnectionFailure()) return false;
  // ②针对某个请求配置是否需要重试 用户自己实现UnrepeatableRequestBody请求体——>单个请求配置
  if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
  // ③是否属于重试的异常(协议异常、超时异常、证书异常、SSL握手未授权异常)
  if (!isRecoverable(e, requestSendStarted)) return false;
  // ④是否存在更多的路由
  if (!streamAllocation.hasMoreRoutes()) return false;

  return true;
}
重试.png

接着看重定向

Request followUp = followUpRequest(response, streamAllocation.route());
private Request followUpRequest(Response userResponse, Route route) throws IOException {
   
    int responseCode = userResponse.code();

    switch (responseCode) {
      //407 客户端使用了HTTP代理服务器,在请求头中添加"Proxy-Authorization"让代理服务器授权
      case HTTP_PROXY_AUTH:
        return client.proxyAuthenticator().authenticate(route, userResponse);
            //401 服务器接口需要验证使用者身份,在请求头中添加"Authorization"
      case HTTP_UNAUTHORIZED:
        return client.authenticator().authenticate(route, userResponse);
            //308 永久重定向
      case HTTP_PERM_REDIRECT:
      //307临时重定向
      case HTTP_TEMP_REDIRECT:
        //如果请求方式不是GET或HEAD,框架不会自动重定向请求
        if (!method.equals("GET") && !method.equals("HEAD")) {
          return null;
        }
      // 300 301 302 303 各种重定向
      case HTTP_MULT_CHOICE:
      case HTTP_MOVED_PERM:
      case HTTP_MOVED_TEMP:
      case HTTP_SEE_OTHER:
        // 客户端不允许重定向,返回null
        if (!client.followRedirects()) return null;
                //①从响应头取出location
        String location = userResponse.header("Location");
        if (location == null) return null;
        //②根据location配置新的请求url
        HttpUrl url = userResponse.request().url().resolve(location);
        // 取不出HttpUrl,返回null,不进行重定向
        if (url == null) return null;

        //如果重定向在http到https之间切换,检查用户是否允许(默认允许)
        boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
        if (!sameScheme && !client.followSslRedirects()) return null;

        Request.Builder requestBuilder = userResponse.request().newBuilder();
        //请求不是get与head
        if (HttpMethod.permitsRequestBody(method)) {
          final boolean maintainBody = HttpMethod.redirectsWithBody(method);
          //除了 PROPFIND 请求之外都改成GET请求
          if (HttpMethod.redirectsToGet(method)) {
            requestBuilder.method("GET", null);
          } else {
            RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
            requestBuilder.method(method, requestBody);
          }
          // 不是 PROPFIND 的请求,把请求头中关于请求体的数据删掉
          if (!maintainBody) {
            requestBuilder.removeHeader("Transfer-Encoding");
            requestBuilder.removeHeader("Content-Length");
            requestBuilder.removeHeader("Content-Type");
          }
        }

        // 在跨主机重定向时,删除身份验证请求头
        if (!sameConnection(userResponse, url)) {
          requestBuilder.removeHeader("Authorization");
        }
                //③构建Request
        return requestBuilder.url(url).build();
            //408 客户端请求超时
      case HTTP_CLIENT_TIMEOUT:
        // 判断用户是否允许重试
        if (!client.retryOnConnectionFailure()) {
          return null;
        }

        if (userResponse.request().body() instanceof UnrepeatableRequestBody) {
          return null;
        }
                //本次重试结果还是408,就放弃,不再重复请求。
        if (userResponse.priorResponse() != null
            && userResponse.priorResponse().code() == HTTP_CLIENT_TIMEOUT) {
          // We attempted to retry and got another timeout. Give up.
          return null;
        }
                //如果服务器告诉我们了 Retry-After 多久后重试,那框架不管了。
        if (retryAfter(userResponse, 0) > 0) {
          return null;
        }

        return userResponse.request();
            // 503 服务不可用
      case HTTP_UNAVAILABLE:
        //再次请求还是503,就放弃,不再重复请求。
        if (userResponse.priorResponse() != null
            && userResponse.priorResponse().code() == HTTP_UNAVAILABLE) {
          // We attempted to retry and got another timeout. Give up.
          return null;
        }
                //服务器告诉我们 Retry-After:0(意思就是立即重试) 才重请求
        if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
          // specifically received an instruction to retry without delay
          return userResponse.request();
        }

        return null;

      default:
        return null;
    }
  }

重定向总结

响应码 说明 重定向条件
407 代理需要授权:如付费代理,需要验证身份 通过proxyAuthenticator获得到了Request(添加"Proxy-Authorization"请求头)
401 服务器需要授权:如某些接口需要登录后才能使用(不安全) 通过authenticator获得到了Request(添加"Authorization"请求头)
300、301、302、303、307、308 重定向响应 307和308必须为GET/HEAD请求再继续判断①用户允许自动重定向(默认允许) ②能够获取到Location响应头,并且值为有效的url ③允许http到https切换(默认允许)
408 客户端请求超时 ①用户允许自动重定向(默认允许) ②本次重试结果不是408 ③服务器未响应Retry-After(稍后重试),或响应Retry-After为0
503 服务不可用 ①本次重试结果不是503②服务器响应Retry-After为0,立即重试

另附HTTP响应状态码分类:

分类 描述
1xx 信息,服务器收到请求,需要请求者继续执行操作
2xx 成功,操作被成功接收并处理
3xx 重定向,需要进一步的操作以完成请求
4xx 客户端错误,请求包含语法错误或无法完成请求
5xx 服务端错误,服务器在处理请求的过程中发生了错误

小结:RetryAndFollowUpInterceptor是整个责任链中的第一个,首次接触到Request和最后接收Response的角色,它的主要功能是判断是否需要重试与重定向。

重试的前提是出现了RouteException或IOException,会通过recover方法进行判断是否进行重试。

重定向是发生在重试判定后,不满足重试的条件,会进一步调用followUpRequest根据Response的响应码进行重定向操作。

二、BridgeInterceptor(桥接拦截器)
@Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }

    //读取对应的cookie数据设置进请求头
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }

    Response networkResponse = chain.proceed(requestBuilder.build());
        //保存cookie,默认cookieJar不提供的实现(我们可以通过okHttpClient.cookieJarokHttpClient.cookieJar(cookieJar)去设置)
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
     //如果使用gzip返回的数据,则使用 GzipSource 包装便于解析。
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

补全请求头:

请求头 说明
Content-Type 请求体类型(如 application/x-www-form-urlencoded)
Content-Length/Transfer-Encoding 请求体解析方式
Host 请求的主机站点
Connection:Keep-Alive 默认保持长连接
Accept-Encoding:gzip 接受响应体使用gzip压缩
Cookie Cookie身份识别
User-Agent 用户信息,如操作系统、浏览器等

小结:BridgeInterceptor是连接应用程序和服务器的桥梁,它为我们补全请求头,将请求转化为符合网络规范的Request。得到响应后:1.保存Cookie,在下次请求会读取对应的cookie数据设置进请求头,默认cookieJar不提供的实现 2.如果使用gzip返回的数据,则使用 GzipSource 包装便于解析。

三、CacheInterceptor(缓存拦截器)
@Override public Response intercept(Chain chain) throws IOException {
  //cache不为空从cache中取(GET请求才有缓存)
  Response cacheCandidate = cache != null
      ? cache.get(chain.request())
      : null;

  long now = System.currentTimeMillis();
    //缓存策略
  CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
  Request networkRequest = strategy.networkRequest;
  Response cacheResponse = strategy.cacheResponse;

  if (cache != null) {
    cache.trackResponse(strategy);
  }

  if (cacheCandidate != null && cacheResponse == null) {
    closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
  }

  //没有网络也没有缓存 ——> 请求失败直接返回504
  if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
        .request(chain.request())
        .protocol(Protocol.HTTP_1_1)
        .code(504)
        .message("Unsatisfiable Request (only-if-cached)")
        .body(Util.EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
  }

  // 没有请求,有缓存 ——> 使用缓存
  if (networkRequest == null) {
    return cacheResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build();
  }

  //有网络,无缓存 ——> 去发起请求
  Response networkResponse = null;
  try {
    networkResponse = chain.proceed(networkRequest);
  } finally {
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) {
      closeQuietly(cacheCandidate.body());
    }
  }
  // If we have a cache response too, then we're doing a conditional get.
  if (cacheResponse != null) {
    //有网络,有缓存 ——> 服务器返回304(表示无修改),则更新缓存响应并返回。
    if (networkResponse.code() == HTTP_NOT_MODIFIED) {
      Response response = cacheResponse.newBuilder()
          .headers(combine(cacheResponse.headers(), networkResponse.headers()))
          .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
          .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
          .cacheResponse(stripBody(cacheResponse))
          .networkResponse(stripBody(networkResponse))
          .build();
      networkResponse.body().close();

      // Update the cache after combining headers but before stripping the
      // Content-Encoding header (as performed by initContentStream()).
      cache.trackConditionalCacheHit();
      cache.update(cacheResponse, response);
      return response;
    } else {
      closeQuietly(cacheResponse.body());
    }
  }

  Response response = networkResponse.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .networkResponse(stripBody(networkResponse))
      .build();

  if (cache != null) {
    if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
      //cache不为空,有请求头和缓存策略时,通过cache.put进行缓存
      // Offer this request to the cache.
      CacheRequest cacheRequest = cache.put(response);
      return cacheWritingResponse(cacheRequest, response);
    }

    if (HttpMethod.invalidatesCache(networkRequest.method())) {
      try {
        cache.remove(networkRequest);
      } catch (IOException ignored) {
        // The cache cannot be written.
      }
    }
  }

  return response;
}

缓存拦截器顾名思义处理缓存的,但是要建立在get请求的基础上,我们可以去通过okHttpClient.cache(cache)去设置。缓存拦截器的处理流程:

1.从缓存中取出对应请求的响应缓存

2.通过CacheStrategy判断使用缓存或发起网络请求,此对象中的networkRequest代表需要发起网络请求,cacheResponse表示直接使用缓存。

networkRequest cacheResponse 说明
null null 直接返回504(网关超时)
null not null 直接使用缓存
not null null 发起请求
not null not null 发起请求,若得到响应为304(表无修改),则更新缓存响应并返回

即:networkRequest存在则优先发起网络请求,否则使用cacheResponse缓存,若都不存在则请求失败。

  1. cache不为空,有请求头和缓存策略时,通过cache.put进行缓存

如果最终判定不能使用缓存,需要发起网络请求,则来到下一个拦截器ConnectInterceptor

四、ConnectInterceptor(连接拦截器)
Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    //从拦截器链获取StreamAllocation对象,这里的StreamAllocation对象是在第一个拦截器中初始化完成的(设置了连接池、url路径等),真正使用的地方在这里。
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //创建HttpCodec对象  HttpCodec:用来编码HTTP request和解码HTTP response
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    //获取RealConnection
    RealConnection connection = streamAllocation.connection();
        //执行下一个拦截器,返回response
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

StreamAllocation对象是在第一个拦截器RetryAndFollowUpInterceptor中初始化完成的(设置了连接池、url路径等),当一个请求发出,需要建立连接,建立连接之后需要使用流来读取数据,这个StreamAllocation就是协调请求、连接与数据流三者之前的关系,它负责为一次请求寻找连接,然后获得流来实现网络通信。

public final class StreamAllocation {
    private final ConnectionPool connectionPool;
  private RealConnection connection;
    //...
  
  public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }
}

StreamAllocation对象有两个关键角色:

真正的连接是在RealConnection中实现的,连接由ConnectionPool管理。

//ConnectionPool.java
public ConnectionPool() {
  //连接池最多保存5个连接的keep-alive,每个时长5分钟
  this(5, 5, TimeUnit.MINUTES);
}

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
  }

void put(RealConnection connection) {
  assert (Thread.holdsLock(this));
  if (!cleanupRunning) {
    cleanupRunning = true;
    //清理无效的连接
    executor.execute(cleanupRunnable);
  }
  connections.add(connection);
}

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

//获取可复用的连接
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
  assert (Thread.holdsLock(this));
  for (RealConnection connection : connections) {
    //要拿到的连接与连接池中的连接,连接的配置(DNS、代理、SSL证书、服务器域名、端口等)一致,就可以复用
    if (connection.isEligible(address, route)) {
      streamAllocation.acquire(connection, true);
      return connection;
    }
  }
  return null;
}

long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
                //检查连接是否正在使用
        // If the connection is in use, keep searching.
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
                //否则记录闲置连接数
        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        //计算这个连接已经闲置多久
        long idleDurationNs = now - connection.idleAtNanos;
        //找到闲置最久的连接
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }
            //keep-alive时间>=5分钟||连接池内闲置连接>5,立即移除
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        //池内存在闲置连接,就等待 (保活时间-最长闲置时间,即到期时间)
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        //有正在使用的连接,5分钟后再清理
        return keepAliveDurationNs;
      } else {
        //无连接,停止清理(下次put会再次启动)
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;
      }
    }
        //关闭连接,返回时间0,立即再次进行请求(cleanupRunnable的while (true)会立即执行)
    closeQuietly(longestIdleConnection.socket());
    // Cleanup again immediately.
    return 0;
  }

接着我们看下RealConnection的创建和连接的建立:

streamAllocation.newStream—>findHealthyConnection—>findConnection

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");
            //①StreamAllocation的connection如果可以复用则复用
      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }
            
      //②如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回
      // Attempt to get a connection from the pool.
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

        RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // This could match due to connection coalescing.
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) {
        route = selectedRoute;
        return connection;
      }

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
      //③如果连接池里没有,则new一个RealConnection对象
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }
 
    // Do TCP + TLS handshakes. This is a blocking operation.
    //④调用RealConnection的connect()方法发起请求
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // ⑤将RealConnection对象存进连接池中,以便下次复用
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);
        //⑥返回RealConnection对象
    return result;
  }

findConnection:

①StreamAllocation的connection如果可以复用则复用

②如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回

③如果连接池里没有,则new一个RealConnection对象

④调用RealConnection的connect()方法发起请求

⑤将RealConnection对象存进连接池中,以便下次复用

⑥返回RealConnection对象

//RealConnection.java
public void connect(
      int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
         //...
       //进行Socket连接
       connectSocket(connectTimeout, readTimeout);
       //...
  }


private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      //建立socket连接  最终调用Java里的套接字Socket里的connect()方法。
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

小结:

ConnectInterceptor拦截器从拦截器链中获取StreamAllocation对象,这个对象在第一个拦截器中创建,在ConnectInterceptor中才用到。

执行StreamAllocation对象的newStream方法创建HttpCodec对象,用来编码HTTP request和解码HTTP response。

newStream方法里面通过findConnection方法返回了一个RealConnection对象。

StreamAllocation对象的connect方法拿到上面返回的RealConnection对象,这个RealConnection对象是用来进行实际的网络IO传输的。

五、CallServerInterceptor(请求服务器拦截器)
@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    //①获取拦截器链中的HttpCodec、StreamAllocation、RealConnection对象
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    //②调用httpCodec.writeRequestHeaders(request)将请求头写入缓存(见下面Http2Codec的writeRequestHeaders方法)
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      //③判断是否有请求体,如果有,请求头通过携带特殊字段 Expect:100-continue来询问服务器是否愿意接受请求体。(一般用于上传大容量请求体或者需要验证)
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        //调用真正发送给服务器
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        //服务端返回100,表示愿意接受请求体responseBuilder为null(见下面Http2Codec的readResponseHeaders方法)
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        //服务器愿意会响应100(responseBuilder 即为nul)。这时候才能够继续发送剩余请求数据。
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
                //写入请求体
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        //服务器不愿意接受请求体,调用noNewStreams关闭相关socket
        streamAllocation.noNewStreams();
      }
    }
        //④结束请求
    httpCodec.finishRequest();
  
  /**代码走到这里的responseBuilder情况为:
    * 1.post请求,请求头包含Expect,服务端允许接受请求体,并且已经发出了请求体,responseBuilder为null
    * 2.post请求,请求头包含Expect,服务端不允许接受请求体,responseBuilder不为null
    * 3.post请求,没有请求体,responseBuilder为null
    *   3.get请求,responseBuilder为null
    */
  
    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
    //根据服务器返回的数据构建 responseBuilder对象  (传入的expectContinue为false,不会return null)
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    //⑤构建Response对象
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
        //第一次查询服务器是否支持接受请求体的,而不是真正的请求对应的结果响应。所以接着:
    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      //如果响应了100,这代表了请求Expect: 100-continue成功响应,需要马上再次读取一份响应头,这才是真正的请求对应的响应头。
      responseBuilder = httpCodec.readResponseHeaders(false);
            //⑤构建Response对象
      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }

    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);

    if (forWebSocket && code == 101) {
      //WebSocket请求
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      //读取响应体数据
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }
        //客户端或者服务端不希望长连接,那么就关闭socket
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }
        //如果服务器返回204/205(表示没有响应体),但是解析Content-Lenght>0(表示响应体字节长度),出现冲突,抛出协议异常
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
        //⑥返回response
    return response;
  }

writeRequestHeaders和readResponseHeaders(以Http2Codec为例)

//Http2Codec.java

 private Http2Stream stream;

@Override public void writeRequestHeaders(Request request) throws IOException {
  if (stream != null) return;
  boolean hasRequestBody = request.body() != null;
  List<Header> requestHeaders = http2HeadersList(request);
  stream = connection.newStream(requestHeaders, hasRequestBody);
  stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);
  stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
}

@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
    List<Header> headers = stream.takeResponseHeaders();
    Response.Builder responseBuilder = readHttp2HeadersList(headers);
    if (expectContinue && Internal.instance.code(responseBuilder) == HTTP_CONTINUE) {
      //注意传过来的expectContinue
      //服务端返回100,表示愿意接受请求体 responseBuilder为null
      return null;
    }
    return responseBuilder;
  }

小结:CallServerInterceptor完成HTTP协议报文的封装和解析。

①获取拦截器链中的HttpCodec、StreamAllocation、RealConnection对象

②调用httpCodec.writeRequestHeaders(request)将请求头写入缓存

③判断是否有请求体,如果有,请求头通过携带特殊字段 Expect:100-continue来询问服务器是否愿意接受请求体。(一般用于上传大容量请求体或者需要验证)

④通过httpCodec.finishRequest()结束请求

⑤通过responseBuilder构建Response

⑥返回Response

上一篇下一篇

猜你喜欢

热点阅读