开源库

OkHttp源码流程分析(request篇)

2020-12-30  本文已影响0人  A邱凌

OkHttp现在几乎已经占据了所有网络请求 了解其内部原理可以更好的进行扩展、封装和优化

我们今天分析一下OkHttp源码 因为流程比较多 所以分为两篇(请求和响应)来分析 Okio本文暂时不涉及 后面可能会更新一篇Okio的文章 版本基于4.5.0-RC1

请求方式

fun load() {
      //1.创建请求(包含url,method,headers,body)
      val request = Request
              .Builder()
              .url("")
              .build()
       //2.创建OkHttpClient (包含分发器、拦截器、DNS等)
       val okHttpClient = OkHttpClient.Builder().build()
       //3.创建Call(用于调用请求)
       val newCall = okHttpClient.newCall(request)
       //4.通过异步请求数据
       newCall.enqueue(object :Callback{
           override fun onFailure(call: Call, e: IOException) {}
           override fun onResponse(call: Call, response: Response) {}
       })
       //4.通过同步请求数据
       val response =  newCall.execute()
}

我们会按照顺序来分析一下请求的流程

前面1,2,3步很多文章已经分析过很多遍了 也比较简单 同学们可以自己看一下 我们就不再赘述 我们直接看第四步进入今天的主要流程

Okhttp请求分为同步方式和异步方式 不过最终都是殊途同归 我们以异步的方式分析一下请求流程

enqueue()

话不多说 先看一眼代码

RealCall.enqueue()->
Dispatcher.enqueue()->
Dispatcher.promoteAndExecute()->
RealCall.executeOn()

override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
        //检查是否已经开始运行
      check(!executed) { "Already Executed" }
      executed = true
    }
    callStart()
    //封装AsyncCall对象 并放入队列中等待执行
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
  
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
        //放入等待队列
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    //推进执行
    promoteAndExecute()
  }
  
//将readyAsyncCalls中合格的请求过渡升级到runningAsyncCalls中
private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.//运行最大64
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.每个主机最大5

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

上面的流程主要是将我们的异步任务放入队列中 并且将可以运行的任务开启运行

RealCall.executeOn()

fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
            //将当前Runnable放到线程池中运行
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }
 
override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
        //终于到这篇文章的重头戏了
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }

我们知道OkHttp采用了拦截链模式 看一下何为拦截器模式 借用一下大佬的图(懒得画图了😁)

3631399-0626631d246373a4.png

我们可以看到 请求链会以链的形式调用下去 直到到链尾或者return response

getResponseWithInterceptorChain()

 @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    //加入我们自己的拦截器
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
        //如果不是websocket的话 加入网络拦截器
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)
    
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
        //开始拦截链调用
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }
  

@Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // Call the next interceptor in the chain.
    //循环拿取下一个拦截器
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    //这里会调用拦截器的方法  接下来我们会分析各个拦截器的作用
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

我们看到拦截器的链路模式 其实就是遍历调用各拦截器 对Request进行作用 直到return response

我们来逐个分析一下各个拦截器的作用 本文只分析处理Request,response的处理会在下文讲解

RetryAndFollowUpInterceptor

作用:处理错误 重定向 所以在请求的过程中 没有做太多事情 下文会分析如何处理错误 重定向

这个拦截器在请求的过程中几乎没做什么事情 只做了一件事 就是创建ExchangeFinder 这个对象在后面的ConnectInterceptor用来生成exchange对象

fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
    check(interceptorScopedExchange == null)
    check(exchange == null) {
      "cannot make a new request because the previous response is still open: " +
          "please call response.close()"
    }

    if (newExchangeFinder) {
      this.exchangeFinder = ExchangeFinder(
          connectionPool,
          createAddress(request.url),
          this,
          eventListener
      )
    }
  }

BridgeInterceptor

作用:添加必要的请求头信息、gzip处理等

@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    if (body != null) {
      val contentType = body.contentType()
      if (contentType != null) {
            //添加contentType
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }
    //默认keep-Alive
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    //传输流的压缩方式 默认gzip方式
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }

    val cookies = cookieJar.loadForRequest(userRequest.url)
    //添加cookie
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    //默认UA
    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }

    val networkResponse = chain.proceed(requestBuilder.build())
    ......handle response
  }

CacheInterceptor

okhttp默认的缓存机制会加快响应流程 我们看一下缓存策略 首先 我们要解释几个变量

//缓存策略类
class CacheStrategy{
    //如果我们需要请求网络 则networkRequest不为null 否则为null
    val networkRequest: Request?
    //请求的返回或者请求的响应 如果无法使用缓存(一般是过期或者无缓存 则为null)
    val cacheResponse: Response?
}

上面两个变量是缓存策略中比较重要的两个变量 我们会根据这两个变量来选择是否命中缓存
先看一下结论

networkRequest\cacheResponse cacheResponse is null cacheResponse is not null
networkRequest is null ① 返回HTTP_GATEWAY_TIMEOUT 504错误 ② 直接使用缓存
networkRequest is not null ③ 进行网络请求 并且缓存新response ④ 先请求 根据code(304) 判断是否需要重新request

再看一下intercept()方法 可以对照上面两个变量的解释和表格来观看

override fun intercept(chain: Interceptor.Chain): Response {
    //获取缓存 如果我们配置了缓存 那么会去查找是否存在cache 
    //这里需要注意的一点是 okhttp默认并不会配置缓存 只是规范了一套缓存策略 我们可以自己通过OkHttpClient.Builder 的 cache 方法设置
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()

    //这里的策略 会自动判断是否使用缓存 是否存在缓存
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    //这里就是上面我们解释过的两个变量
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)

    //LruCache没有hit cache 并且网络缓存不可用 
    if (cacheCandidate != null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.
      //关闭cacheCandidate.body
      cacheCandidate.body?.closeQuietly()
    }

    //按照我们上面对缓存的解释 不允许使用网络请求 并且当前没有缓存 对应表格中①
    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
    }

    //不允许使用网络 仅直接使用缓存 对应表格②
    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build()
    }

    var networkResponse: Response? = null
    try {
      //使用网络
      //对应表格③④
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }
     ......缓存新 response
   }

我们看完上面的代码 发现所有的缓存策略都是根据networkRequestcacheResponse两个变量进行控制的 接下来我们看一下缓存策略的生成过程

fun compute(): CacheStrategy {

        class Factory(
            private val nowMillis: Long,
            internal val request: Request,
            private val cacheResponse: Response? //这边的cacheResponse和我们上面讲的还不太一样 这边完全是Cache类中缓存的对象 如果我们之前请求过 并且缓存 则不为null
        )
        
       //根据request的cache-control 和response 的cache-control判断
      val candidate = computeCandidate()

      //使用网络请求 但是请求的request的cache-control是onlyIfCached 表示仅使用缓存
      //这就是个悖论了 所以直接返回504 对应表格①
      // We're forbidden from using the network and the cache is insufficient.
      if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
        return CacheStrategy(null, null)
      }

      return candidate
    }
    
    
    private fun computeCandidate(): CacheStrategy {
      // No cached response.
      //这里的cacheResponse是缓存中命中的  所以如果为null 表示之前没有缓存
      if (cacheResponse == null) {
        return CacheStrategy(request, null)
      }

      // Drop the cached response if it's missing a required handshake.
      //如果缺少tls握手 直接请求网络
      if (request.isHttps && cacheResponse.handshake == null) {
        return CacheStrategy(request, null)
      }

      // If this response shouldn't have been stored, it should never be used as a response source.
      // This check should be redundant as long as the persistence store is well-behaved and the
      // rules are constant.
      //根据cacheResponse的code 判断是否允许cache 
      //判断expires是否过期 并且request和reponse的cache-control都是noStore 
      //这里就不往下追踪了 感兴趣的同学可以自己阅读一下
      if (!isCacheable(cacheResponse, request)) {
        return CacheStrategy(request, null)
      }

        //如果是nocache或者根据If-Modified-Since来判断
        //If-Modified-Since会重新请求服务器 然后获取last-modified-since 判断是否修改过
      val requestCaching = request.cacheControl
      if (requestCaching.noCache || hasConditions(request)) {
        return CacheStrategy(request, null)
      }

      val responseCaching = cacheResponse.cacheControl

      val ageMillis = cacheResponseAge()
      var freshMillis = computeFreshnessLifetime()

      if (requestCaching.maxAgeSeconds != -1) {
        freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
      }

      var minFreshMillis: Long = 0
      if (requestCaching.minFreshSeconds != -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
      }

      var maxStaleMillis: Long = 0
      if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
      }
        
      if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        val builder = cacheResponse.newBuilder()
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
        }
        val oneDayMillis = 24 * 60 * 60 * 1000L
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
        }
        return CacheStrategy(null, builder.build())
      }

      // Find a condition to add to the request. If the condition is satisfied, the response body
      // will not be transmitted.
      val conditionName: String
      val conditionValue: String?
      when {
        etag != null -> {
          conditionName = "If-None-Match"
          conditionValue = etag
        }

        lastModified != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = lastModifiedString
        }

        servedDate != null -> {
          conditionName = "If-Modified-Since"
          conditionValue = servedDateString
        }
         //这里会重新request 然后判断modified-time
        else -> return CacheStrategy(request, null) // No condition! Make a regular request.
      }

      val conditionalRequestHeaders = request.headers.newBuilder()
      conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)

      val conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build()
          //直接使用缓存
      return CacheStrategy(conditionalRequest, cacheResponse)
    }

我们可以根据上面的判断,确定缓存策略
大致是根据cache-control或者handshake是否过期来判断是否需要重新request
例如:
noStore,noCache,If-Modified-Since等等 所有的 cache-control可以参考这篇

如果在CacheInterceptor中hit cache的话 就不会再往下面的拦截器传递 而是直接原路返回 return response

ConnectInterceptor

作用:负责与服务器连接 这个拦截器的过程分析其实相当复杂
简单来说流程是从连接池中查找连接 如果不存在 就创建连接 并完成TCP,TLS握手
然后等待下一个CallServerInterceptor进行数据的交互

我们分析一下源码 拦截器里的代码真的很少 不过不要被表象欺骗了😌 我第一次看OkHttp源码时 看到这里直接就跳过了 然后分析了CallServerInterceptor源码之后 发现没有获取连接过程

 override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    //获取exchange对象 exchange是我们用来和服务端交互的对象封装 看一下initExchange方法
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }

initExchange()中主要会调用ExchangeFinder#find()然后根据下面的调用链
ExchangeFinder#find()->
ExchangeFinder#findHealthyConnection->
ExchangeFinder#findConnection
然后我们看一下findConnection()这个方法内部就实现了connection的查找或创建
前方高能 下面代码会又臭又长😉

private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null
    var selectedRoute: Route? = null
    var releasedConnection: RealConnection?
    val toClose: Socket?
    synchronized(connectionPool) {
      if (call.isCanceled()) throw IOException("Canceled")

      val callConnection = call.connection // changes within this overall method
      releasedConnection = callConnection
      //如果url不一致或者callConnection为null 就断开链接
      toClose = if (callConnection != null && (callConnection.noNewExchanges ||
              !sameHostAndPort(callConnection.route().address.url))) {
        call.releaseConnectionNoEvents()
      } else {
        null
      }

      if (call.connection != null) {
        // We had an already-allocated connection and it's good.
        //经过判断上面验证 如果不为null 发现当前connection可用 那么就会直接复用connection
        result = call.connection
        releasedConnection = null
      }

      if (result == null) {
        // The connection hasn't had any problems for this call.
        refusedStreamCount = 0
        connectionShutdownCount = 0
        otherFailureCount = 0

        // Attempt to get a connection from the pool.
        //第一次试从连接池获取
        if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
          foundPooledConnection = true
          result = call.connection
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        }
      }
    }
    toClose?.closeQuietly()

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection!!)
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      return result!!
    }

    // If we need a route selection, make one. This is a blocking operation.
    // 查看是否有新的路由信息
    var newRouteSelection = false
    if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
      var localRouteSelector = routeSelector
      if (localRouteSelector == null) {
        localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
        this.routeSelector = localRouteSelector
      }
      newRouteSelection = true
      routeSelection = localRouteSelector.next()
    }

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (call.isCanceled()) throw IOException("Canceled")

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        routes = routeSelection!!.routes
        // 如果有新的路由 继续从连接池中查找试试
        if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
          foundPooledConnection = true
          result = call.connection
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    //现在发现可能连接池中没有我们要的connection
    //进行TCP和TLS连接
    result!!.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
    call.client.routeDatabase.connected(result!!.route())

    var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
      // Last attempt at connection coalescing, which only occurs if we attempted multiple
      // concurrent connections to the same host.
      //最后一次尝试从连接池中获取 如果能获取到 就使用连接池中 否则使用连接的connection 并且放入连接池中
      if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result!!.noNewExchanges = true
        socket = result!!.socket()
        result = call.connection

        // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
        // that case we will retry the route we just successfully connected with.
        nextRouteToTry = selectedRoute
      } else {
        //放入连接池中
        connectionPool.put(result!!)
        call.acquireConnectionNoEvents(result!!)
      }
    }
    socket?.closeQuietly()

    eventListener.connectionAcquired(call, result!!)
    return result!!
  }

上面的注释写的也比较多 流程其实也比较清晰了 我们接下来分析一下 如何从连接池中查找以及如何建立连接

//从连接池中获取
fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
    this.assertThreadHoldsLock()

    for (connection in connections) {
        //判断connection是否支持多路复用
      if (requireMultiplexed && !connection.isMultiplexed) continue
      //判断connection的host是否匹配
      if (!connection.isEligible(address, routes)) continue
      call.acquireConnectionNoEvents(connection)
      return true
    }
    return false
  }

还有一个TCP和TLS握手流程

fun connect(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    call: Call,
    eventListener: EventListener
  ) {
    check(protocol == null) { "already connected" }

    var routeException: RouteException? = null
    val connectionSpecs = route.address.connectionSpecs
    val connectionSpecSelector = ConnectionSpecSelector(connectionSpecs)
    ......

    while (true) {
      try {
          //注释的意思是 如果是通过HTTP代理HTTPS 那么需要连接Tunnel  
          //这里我是在是没看懂什么意思 告辞😞 有知道的大佬可以留言告诉我一下
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break
          }
        } else {
          //先建立socket连接 包括代理的配置
          connectSocket(connectTimeout, readTimeout, call, eventListener)
        }
        //如果是Http2协议还会创建Http2连接 或者 TLS握手 
        //TLS流程大家可以参考一下我之前写的一篇文章
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
        eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
        break
      } catch (e: IOException) {
        socket?.closeQuietly()
        rawSocket?.closeQuietly()
        socket = null
        rawSocket = null
        source = null
        sink = null
        handshake = null
        protocol = null
        http2Connection = null
        allocationLimit = 1

        eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)

        if (routeException == null) {
          routeException = RouteException(e)
        } else {
          routeException.addConnectException(e)
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException
        }
      }
    }

    if (route.requiresTunnel() && rawSocket == null) {
      throw RouteException(ProtocolException(
          "Too many tunnel connections attempted: $MAX_TUNNEL_ATTEMPTS"))
    }

    idleAtNs = System.nanoTime()
  }

TLS握手流程有疑问的同学 可以看一下我之前写的一篇文章呐 小飞机✈️觉得还可以可以点个赞哦

现在connection就获取完成了 接下来就是与服务器的交互啦
还有创建了新connection之后会putConnectionPool中 其中还有一个clean操作 同学们可以自己看一下代码呐

最后还有一个小点需要说明一下 因为关系到我们下一个拦截器的阅读
在我们最上面的说到 我们会通过ExchangeFinder#find来生成ExchangeCodec对象

首先我们解释一下ExchangeCodec作用okhttp会使用ExchangeCodec封装了与服务器的IO操作
ExchangeCodec的实现类分别对应协议是Http1ExchangeCodecHttp2ExchangeCodec

看一下find方法实现

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      //newCodec方法会对应不同的HTTP协议生成ExchangeCodeC对象
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

Http2

在阅读CallServerInterceptor之前 我们有必要看一下Http2.0相关知识 因为在CallServerInterceptor中会根据不同的Http协议 使用不同的传输方式 我们看一下Http2.0发展的几个阶段

CallServerInterceptor

作用:负责与服务器进行数据交互

在了解和服务器交互的流程之前 我想先介绍一下okio 这是Square公司开发的一款对java输入输出流的封装框架
JAVA输入输出流真的是非常的复杂 子类繁多 而okio主要分为两个接口SinkSource 分别对应输出和输入相关

接下来我们看一下实现代码 流程也比较简单 就是发送请求+获取响应

 @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()
    //将header写入socket
    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      if (responseBuilder == null) {
        //如果支持复用 传输request body
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if (!exchange.connection.isMultiplexed) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }
    //request完成
    if (requestBody == null || !requestBody.isDuplex()) {
      exchange.finishRequest()
    }
    //获取reponse header
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      // Server sent a 100-continue even though we did not request one. Try again to read the actual
      // response status.
      //100表示继续获取
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }

    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      //获取response body
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
    }
    return response
  }

上面的代码 流程也比较简单 就是request+response
我们分析一下分析一下写入Request流程

ExchangeCodec#writeRequestHeaders(request)

传输头部在HTTP1.x和HTTP2.0有点区别,HTTP1.x就直接将header通过写入Sink Buffer,而HTTP2.0会先创建http2Connection.newStream()对象

@Synchronized @Throws(IOException::class)
  fun headers(
    outFinished: Boolean,
    streamId: Int,
    headerBlock: List<Header>
  ) {
    if (closed) throw IOException("closed")
    //Hpack压缩算法 将压缩后数据存入hpackBuffer
    hpackWriter.writeHeaders(headerBlock)

    val byteCount = hpackBuffer.size
    val length = minOf(maxFrameSize.toLong(), byteCount)
    var flags = if (byteCount == length) FLAG_END_HEADERS else 0
    if (outFinished) flags = flags or FLAG_END_STREAM
    //HTTP2.0特性 帧传输
    frameHeader(
        streamId = streamId,
        length = length.toInt(),
        type = TYPE_HEADERS,
        flags = flags
    )
    sink.write(hpackBuffer, length)

    if (byteCount > length) writeContinuationFrames(streamId, byteCount - length)
  }

这里有两个可以HTTP2.0特性可以关注一下

总结

OkHttp的Request流程分析就到此结束了 接下来会接着分析一下Response的处理 觉得有收获的同学们欢迎点赞留言啊

参考

OKio - 重新定义了“短小精悍”的IO框架
浅析HTTPS握手流程

上一篇下一篇

猜你喜欢

热点阅读