
2019-11-04  本文已影响0人  猫KK

3. CacheInterceptor


class CacheInterceptor(internal val cache: Cache?) : Interceptor {

  override fun intercept(chain: Interceptor.Chain): Response {
    //获取缓存,cache 是通过OkHttpClient.Builder来配置的
    val cacheCandidate = cache?.get(chain.request())
    val now = System.currentTimeMillis()
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    if (cacheCandidate != null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.

    // If we're forbidden from using the network and the cache is insufficient, fail.
    //networkRequest 和cacheResponse 都为null,返回失败
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .message("Unsatisfiable Request (only-if-cached)")

    // If we don't need the network, we're done.
    //networkRequest 为null cacheResponse 不为null 使用缓存
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()

    var networkResponse: Response? = null
    try {
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))


        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.update(cacheResponse, response)
        return response
      } else {

    val response = networkResponse!!.newBuilder()

    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response)

      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
        } catch (_: IOException) {
          // The cache cannot be written.

    return response


    fun compute(): CacheStrategy {
      val candidate = computeCandidate()

      // We're forbidden from using the network and the cache is insufficient.
      if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
        return CacheStrategy(null, null)

      return candidate


    private fun computeCandidate(): CacheStrategy {
      // 没有缓存,返回networkRequest 即不走缓存
      if (cacheResponse == null) {
        return CacheStrategy(request, null)

      // 如果是https 并且缓存没有握手,返回networkRequest
      if (request.isHttps && cacheResponse.handshake == null) {
        return CacheStrategy(request, null)

      // If this response shouldn't have been stored, it should never be used as a response source.
      // This check should be redundant as long as the persistence store is well-behaved and the
      // rules are constant.
      if (!isCacheable(cacheResponse, request)) {
        return CacheStrategy(request, null)

      val requestCaching = request.cacheControl
      if (requestCaching.noCache || hasConditions(request)) {
        return CacheStrategy(request, null)
      //主要是通过请求头中 max-age 字段来判断,max-age 代表缓存过期时间
      val responseCaching = cacheResponse.cacheControl

      val ageMillis = cacheResponseAge()
      var freshMillis = computeFreshnessLifetime()

      if (requestCaching.maxAgeSeconds != -1) {
        freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))

      var minFreshMillis: Long = 0
      if (requestCaching.minFreshSeconds != -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())

      var maxStaleMillis: Long = 0
      if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())

      if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        val builder = cacheResponse.newBuilder()
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
        val oneDayMillis = 24 * 60 * 60 * 1000L
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
        //缓存没有过期,返回缓存 networkRequest 为null
        return CacheStrategy(null, builder.build())

      // Find a condition to add to the request. If the condition is satisfied, the response body
      // will not be transmitted.
      val conditionName: String
      val conditionValue: String?
      when {
        etag != null -> {
          conditionName = "If-None-Match"
          conditionValue = etag

        lastModified != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = lastModifiedString

        servedDate != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = servedDateString

        else -> return CacheStrategy(request, null) // No condition! Make a regular request.

      val conditionalRequestHeaders = request.headers.newBuilder()
      conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

      val conditionalRequest = request.newBuilder()
      return CacheStrategy(conditionalRequest, cacheResponse)

通过上面可以知道,主要是通过请求头信息来判断是否使用缓存,对于头信息标志的缓存信息可以来看这篇文章 OKHTTP之缓存配置详解

4. ConnectInterceptor


object ConnectInterceptor : Interceptor {

  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val request = realChain.request()
    val transmitter = realChain.transmitter()

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    val doExtensiveHealthChecks = request.method != "GET"
    //获取socket连接,初始化 Exchange
    val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
    return realChain.proceed(request, transmitter, exchange)


internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
    synchronized(connectionPool) {
      check(!noMoreExchanges) { "released" }
      //判断exchange 是否为null,不为null抛出异常
      check(exchange == null) {
        "cannot make a new request because the previous response is still open: " +
            "please call response.close()"
    val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
    val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)

    synchronized(connectionPool) {
      this.exchange = result
      this.exchangeRequestDone = false
      this.exchangeResponseDone = false
      return result


  fun find(
    client: OkHttpClient,
    chain: Interceptor.Chain,
    doExtensiveHealthChecks: Boolean
  ): ExchangeCodec {
    val connectTimeout = chain.connectTimeoutMillis()
    val readTimeout = chain.readTimeoutMillis()
    val writeTimeout = chain.writeTimeoutMillis()
    val pingIntervalMillis = client.pingIntervalMillis
    val connectionRetryEnabled = client.retryOnConnectionFailure

    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled,
          doExtensiveHealthChecks = doExtensiveHealthChecks
      //判断是http2 还是http1 返回
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      throw e
    } catch (e: IOException) {
      throw RouteException(e)


  private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
    while (true) {
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized(connectionPool) {
        if (candidate.successCount == 0) {
          return candidate

      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      return candidate


  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null
    var selectedRoute: Route? = null
    var releasedConnection: RealConnection?
    val toClose: Socket?
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")
      hasStreamFailure = false // This is a fresh attempt.
      //通过transmitter 获取连接
      releasedConnection = transmitter.connection
      //如果transmitter 获取到连接,将socket 赋值给toClose
      toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
      } else {
      if (transmitter.connection != null) {
        // We had an already-allocated connection and it's good.
        result = transmitter.connection
        releasedConnection = null
      if (result == null) {
        // 从连接池中获取一个连接,如果获取到就赋值给transmitter.connection
        //现在假设连接池中没有连接,所以result 还是null
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection!!.route()
    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection!!)
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result!!

    var newRouteSelection = false
    if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
      newRouteSelection = true
      routeSelection = routeSelector.next()

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        routes = routeSelection!!.routes
        if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
          foundPooledConnection = true
          result = transmitter.connection
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result

    // 如果是在连接池中找到的,返回
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!

    // tcp + 握手连接

    var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
      // 再次从连接池中查找
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result!!.noNewExchanges = true
        socket = result!!.socket()
        result = transmitter.connection

        // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
        // that case we will retry the route we just successfully connected with.
        nextRouteToTry = selectedRoute
      } else {

    eventListener.connectionAcquired(call, result!!)
    return result!!

来看 result!!.connect 是如何连接的

fun connect(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    call: Call,
    eventListener: EventListener
  ) {

    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
        } else {
          connectSocket(connectTimeout, readTimeout, call, eventListener)
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
        eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
      } catch (e: IOException) {
        socket = null
        rawSocket = null
        source = null
        sink = null
        handshake = null
        protocol = null
        http2Connection = null

        eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)

        if (routeException == null) {
          routeException = RouteException(e)
        } else {

走到了connectSocket 方法,看方法名就觉得是创建一个socket连接

  private fun connectSocket(
    connectTimeout: Int,
    readTimeout: Int,
    call: Call,
    eventListener: EventListener
  ) {
    val proxy = route.proxy
    val address = route.address
    val rawSocket = when (proxy.type()) {
      Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
      else -> Socket(proxy)
    this.rawSocket = rawSocket

    eventListener.connectStart(call, route.socketAddress, proxy)
    rawSocket.soTimeout = readTimeout
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
    } catch (e: ConnectException) {
      throw ConnectException("Failed to connect to ${route.socketAddress}").apply {

    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      //通过okio 获取sockte的输入输出流
      source = rawSocket.source().buffer()
      sink = rawSocket.sink().buffer()
    } catch (npe: NullPointerException) {
      if (npe.message == NPE_THROW_WITH_NULL) {
        throw IOException(npe)

到这里,连接就完成了,所以okhttp就是基于 sokect + okio 来实现的

上一篇 下一篇

