Android

ConnectInterceptor

2019-10-24  本文已影响0人  大佬的上半生

ConnectInterceptor负责网络连接过滤器

object ConnectInterceptor : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val request = realChain.request()
    val transmitter = realChain.transmitter()

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    val doExtensiveHealthChecks = request.method != "GET"
    val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)

    return realChain.proceed(request, transmitter, exchange)
  }
}
拦截
intercept(chain: Interceptor.Chain)
分析transmitter()

创建连接对象,释放连接资源,取消连接超时等

 val transmitter = realChain.transmitter()

class Transmitter(
  private val client: OkHttpClient,
  private val call: Call
) {
//连接池
  private val connectionPool: RealConnectionPool = client.connectionPool.delegate
//监听对象
  private val eventListener: EventListener = client.eventListenerFactory.create(call)
  private val timeout = object : AsyncTimeout() {
    override fun timedOut() {
      cancel()
    }
  }.apply {
    timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
  }

  private var callStackTrace: Any? = null

  private var request: Request? = null
  private var exchangeFinder: ExchangeFinder? = null

  // Guarded by connectionPool.
  var connection: RealConnection? = null
  private var exchange: Exchange? = null
  private var exchangeRequestDone = false
  private var exchangeResponseDone = false
  private var canceled = false
  private var timeoutEarlyExit = false
  private var noMoreExchanges = false

  val isCanceled: Boolean
    get() {
      synchronized(connectionPool) {
        return canceled
      }
    }

  fun timeout(): Timeout = timeout

  fun timeoutEnter() {
    timeout.enter()
  }

  //在呼叫完全完成之前停止应用超时。 这用于WebSockets
    和双工调用,其中超时仅适用于初始设置。
  fun timeoutEarlyExit() {
    check(!timeoutEarlyExit)
    timeoutEarlyExit = true
    timeout.exit()
  }

  private fun <E : IOException?> timeoutExit(cause: E): E {
    if (timeoutEarlyExit) return cause
    if (!timeout.exit()) return cause

    val e = InterruptedIOException("timeout")
    if (cause != null) e.initCause(cause)
    @Suppress("UNCHECKED_CAST") // E is either IOException or IOException?
    return e as E
  }

  fun callStart() {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(call)
  }

  //装备一个请求,有的话 则不需要新建立
  fun prepareToConnect(request: Request) {
    if (this.request != null) {
      if (this.request!!.url.canReuseConnectionFor(request.url) && exchangeFinder!!.hasRouteToTry()) {
        return // Already ready.
      }
      check(exchange == null)

      if (exchangeFinder != null) {
        maybeReleaseConnection(null, true)
        exchangeFinder = null
      }
    }

    this.request = request
    this.exchangeFinder = ExchangeFinder(
        this, connectionPool, createAddress(request.url), call, eventListener)
  }
  //创建Address
  private fun createAddress(url: HttpUrl): Address {
    var sslSocketFactory: SSLSocketFactory? = null
    var hostnameVerifier: HostnameVerifier? = null
    var certificatePinner: CertificatePinner? = null
//判断是否后Https
    if (url.isHttps) {
      sslSocketFactory = client.sslSocketFactory
      hostnameVerifier = client.hostnameVerifier
      certificatePinner = client.certificatePinner
    }

    return Address(url.host, url.port, client.dns, client.socketFactory,
        sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator,
        client.proxy, client.protocols, client.connectionSpecs, client.proxySelector)
  }

  //获取一个新的请求对,request,response
  internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
    synchronized(connectionPool) {
      check(!noMoreExchanges) { "released" }
      check(exchange == null) {
        "cannot make a new request because the previous response is still open: " +
            "please call response.close()"
      }
    }
  //查找连接
    val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
    val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)

    synchronized(connectionPool) {
      this.exchange = result
      this.exchangeRequestDone = false
      this.exchangeResponseDone = false
      return result
    }
  }

  fun acquireConnectionNoEvents(connection: RealConnection) {
    assert(Thread.holdsLock(connectionPool))

    check(this.connection == null)
    this.connection = connection
    connection.transmitters.add(TransmitterReference(this, callStackTrace))
  }

  //从连接的分配列表中删除发射机。 返回调用者应该关闭的套接字
  fun releaseConnectionNoEvents(): Socket? {
    assert(Thread.holdsLock(connectionPool))

    val index = connection!!.transmitters.indexOfFirst { it.get() == this@Transmitter }
    check(index != -1)

    val released = this.connection
    released!!.transmitters.removeAt(index)
    this.connection = null

    if (released.transmitters.isEmpty()) {
      released.idleAtNanos = System.nanoTime()
      if (connectionPool.connectionBecameIdle(released)) {
        return released.socket()
      }
    }

    return null
  }

  fun exchangeDoneDueToException() {
    synchronized(connectionPool) {
      check(!noMoreExchanges)
      exchange = null
    }
  }

  //释放随请求或响应而持有的资源
  internal fun <E : IOException?> exchangeMessageDone(
    exchange: Exchange,
    requestDone: Boolean,
    responseDone: Boolean,
    e: E
  ): E {
    var result = e
    var exchangeDone = false
    synchronized(connectionPool) {
      if (exchange != this.exchange) {
        return result // This exchange was detached violently!
      }
      var changed = false
      if (requestDone) {
        if (!exchangeRequestDone) changed = true
        this.exchangeRequestDone = true
      }
      if (responseDone) {
        if (!exchangeResponseDone) changed = true
        this.exchangeResponseDone = true
      }
      if (exchangeRequestDone && exchangeResponseDone && changed) {
        exchangeDone = true
        this.exchange!!.connection()!!.successCount++
        this.exchange = null
      }
    }
    if (exchangeDone) {
      result = maybeReleaseConnection(result, false)
    }
    return result
  }

  fun noMoreExchanges(e: IOException?): IOException? {
    synchronized(connectionPool) {
      noMoreExchanges = true
    }
    return maybeReleaseConnection(e, false)
  }

  //释放连接
  private fun <E : IOException?> maybeReleaseConnection(e: E, force: Boolean): E {
    var result = e
    val socket: Socket?
    var releasedConnection: Connection?
    val callEnd: Boolean
    synchronized(connectionPool) {
      check(!force || exchange == null) { "cannot release connection while it is in use" }
      releasedConnection = this.connection
      socket = if (this.connection != null && exchange == null && (force || noMoreExchanges)) {
        releaseConnectionNoEvents()
      } else {
        null
      }
      if (this.connection != null) releasedConnection = null
      callEnd = noMoreExchanges && exchange == null
    }
    socket?.closeQuietly()

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection!!)
    }

    if (callEnd) {
      val callFailed = result != null
      result = timeoutExit(result)
      if (callFailed) {
        eventListener.callFailed(call, result!!)
      } else {
        eventListener.callEnd(call)
      }
    }
    return result
  }
  //取消重试
  fun canRetry(): Boolean {
    return exchangeFinder!!.hasStreamFailure() && exchangeFinder!!.hasRouteToTry()
  }

  fun hasExchange(): Boolean {
    synchronized(connectionPool) {
      return exchange != null
    }
  }

  //取消连接
  fun cancel() {
    val exchangeToCancel: Exchange?
    val connectionToCancel: RealConnection?
    synchronized(connectionPool) {
      canceled = true
      exchangeToCancel = exchange
      connectionToCancel = exchangeFinder?.connectingConnection() ?: connection
    }
    exchangeToCancel?.cancel() ?: connectionToCancel?.cancel()
  }

  internal class TransmitterReference(
    referent: Transmitter,
    //
    val callStackTrace: Any?
  ) : WeakReference<Transmitter>(referent)
}
建立连接newExchange(chain, doExtensiveHealthChecks)
  //获取一个新的请求对,request,reponse
  internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
    synchronized(connectionPool) {
      check(!noMoreExchanges) { "released" }
      check(exchange == null) {
        "cannot make a new request because the previous response is still open: " +
            "please call response.close()"
      }
    }

    val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
    val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)

    synchronized(connectionPool) {
      this.exchange = result
      this.exchangeRequestDone = false
      this.exchangeResponseDone = false
      return result
    }
  }

find()获取HttpCoder

fun find(
    client: OkHttpClient,
    chain: Interceptor.Chain,
    doExtensiveHealthChecks: Boolean
  ): ExchangeCodec {
  //获取参数
    val connectTimeout = chain.connectTimeoutMillis()
    val readTimeout = chain.readTimeoutMillis()
    val writeTimeout = chain.writeTimeoutMillis()
    val pingIntervalMillis = client.pingIntervalMillis
    val connectionRetryEnabled = client.retryOnConnectionFailure

    try {
  //获取一个健康的连接
      val resultConnection = findHealthyConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled,
          doExtensiveHealthChecks = doExtensiveHealthChecks
      )
  //返回结果
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure()
      throw e
    } catch (e: IOException) {
      trackFailure()
      throw RouteException(e)
    }
  }

获取一个健康的连接

 private fun findHealthyConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean,
    doExtensiveHealthChecks: Boolean
  ): RealConnection {
//循环任务,知道获取到位置
    while (true) {
      val candidate = findConnection(
          connectTimeout = connectTimeout,
          readTimeout = readTimeout,
          writeTimeout = writeTimeout,
          pingIntervalMillis = pingIntervalMillis,
          connectionRetryEnabled = connectionRetryEnabled
      )

      //如果这是一个全新的连接,我们可以跳过广泛的健康检查。
      synchronized(connectionPool) {
        if (candidate.successCount == 0) {
          return candidate
        }
      }

      //执行(可能很慢)检查以确认池连接仍然良好。 如果它
     // 不是,把它从池中取出然后重新开始。
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        candidate.noNewExchanges()
        continue
      }

      return candidate
    }
  }


//返回连接。 如果存在连接,则优先选择现有连接,
 //如果不存在从连接池,建立一个新的连接。
  @Throws(IOException::class)
  private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null
    var selectedRoute: Route? = null
    var releasedConnection: RealConnection?
    val toClose: Socket?
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")
      hasStreamFailure = false // This is a fresh attempt.
  //连接
      releasedConnection = transmitter.connection
//获取是否连接成功
      toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
        transmitter.releaseConnectionNoEvents()
      } else {
        null
      }

      if (transmitter.connection != null) {
        // 已经分配了一个连接
        result = transmitter.connection
        releasedConnection = null
      }

      if (result == null) {
        // Attempt to get a connection from the pool.
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry
          nextRouteToTry = null
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection!!.route()
        }
      }
    }
    toClose?.closeQuietly()

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection!!)
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
    }
    if (result != null) {
      // 如果我们找到已经分配或池化的连接,我们就完成了。
      return result!!
    }

    //选择路由
    var newRouteSelection = false
    if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
      newRouteSelection = true
      routeSelection = routeSelector.next()
    }

    var routes: List<Route>? = null
    synchronized(connectionPool) {
      if (transmitter.isCanceled) throw IOException("Canceled")

      if (newRouteSelection) {
        // 拿到IP开始连接匹配
        routes = routeSelection!!.routes
        if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
          foundPooledConnection = true
          result = transmitter.connection
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()
        }

  
//创建连接并立即将其分配给此分配。 这使它成为可能
        //  对于异步取消()来中断我们即将要做的握手。
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

    // 如果我们第二次发现了汇集连接,我们就完成了。
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result!!)
      return result!!
    }

    // 做TCP + TLS握手。 这是一个阻止操作。
    result!!.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
    connectionPool.routeDatabase.connected(result!!.route())

    var socket: Socket? = null
    synchronized(connectionPool) {
      connectingConnection = null
     //上次尝试连接合并,只有在我们尝试多次时才会发生
        //与同一主机的并发连接。
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // 关闭创建的连接,返回连接池的连接
        result!!.noNewExchanges = true
        socket = result!!.socket()
        result = transmitter.connection

        // 我们可以获得立即不健康的合并连接。 在
          //在这种情况下,我们将重试我们刚刚成功连接的路线。
        nextRouteToTry = selectedRoute
      } else {
        connectionPool.put(result!!)
        transmitter.acquireConnectionNoEvents(result!!)
      }
    }
    socket?.closeQuietly()

    eventListener.connectionAcquired(call, result!!)
    return result!!
  }
上一篇下一篇

猜你喜欢

热点阅读