Android OkHttp 源码阅读笔记(一)

2023-12-04  本文已影响0人  BlueSocks

OkHttp 源码阅读笔记(一)

OkHttp 的大名不用多说了,本篇文章是对 OkHttp 的源码分析文章的第一篇。我后续分析源码的路径也都是针对 Http 1.1 来分析,略过 Http 2WebSocket 逻辑,基于的 OkHttp 版本是 4.11.0

OkHttp 的简单使用

val client: OkHttpClient = OkHttpClient
    .Builder()
    .build()

val request = Request.Builder()
    .get()
    .url("https://www.google.com")
    .build()

val call = client.newCall(request)

/**
 * 异步请求方式
 */
call.enqueue(responseCallback = object : Callback {
    override fun onFailure(call: Call, e: IOException) {
        
    }

    override fun onResponse(call: Call, response: Response) {
        
    }
})

/**
 * 同步请求方式
 */
//val response = call.execute()

首先通过创建一个 OkHttpClient 实例(通常在一般项目中都是只创建一个 OkHttp 实例),创建一个 Request 实例,Request 实例中包含我们 Http 请求的所有参数,通过 OkHttpClient#newCall() 方法传入创建的 Request 实例,会返回一个 Call 实例,通过 Call 实例就可以发送一次 Http 请求,请求的方式有两种,调用 execute() 方法是同步请求,调用 enqueue() 方法是同步请求。

同步请求

在介绍同步请求前,先看看 OkHttpClient#newCall() 创建 Call 的方法:

/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

Call 的实现类是 RealCall,我们看看 RealCall 执行同步调用的方法是 execute() 我们来看看它的实现:

override fun execute(): Response {
  // 判断是否已经执行过了,如果已经执行过了就报错
  check(executed.compareAndSet(false, true)) { "Already Executed" }
  
  // 超时任务
  timeout.enter()
  callStart()
  try {
    // 将本次同步请求添加到 Dispatcher 中
    client.dispatcher.executed(this)
    // 真正执行请求
    return getResponseWithInterceptorChain()
  } finally {
    // 从 Dispatcher 中移除本次同步请求
    client.dispatcher.finished(this)
  }
}

我们看到上面还调用了 callStart() 方法,这里简单看一下:

private fun callStart() {
  this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
  eventListener.callStart(this)
}

这个 eventListener 可以在创建 OkHttp 实例的时候自定义它,通过它可以监听到请求的每个关键节点,比如这个 callStart() 方法就表示请求开始的回调,后面还有很多的其他关键节点的回调,比如创建 SocketDNS 查询、TLS 握手等等,我们可以在 OkHttp 很多处的源码看见它,后面的源码分析我也都会再说起它。

这里简单看看 Dispatcher#execute() 方法 和 Dispatcher#finished() 方法,来看看 Dispatcher 如何记录同步请求任务。

/** Used by [Call.execute] to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
  // 添加到同步队列中
  runningSyncCalls.add(call)
}

/** Used by [Call.execute] to signal completion. */
internal fun finished(call: RealCall) {
  finished(runningSyncCalls, call)
}

private fun <T> finished(calls: Deque<T>, call: T) {
  val idleCallback: Runnable?
  synchronized(this) {
    // 请队列中移除
    if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
    idleCallback = this.idleCallback
  }
  
  // 后面分析 `promoteAndExecute()` 方法
  val isRunning = promoteAndExecute()

  if (!isRunning && idleCallback != null) {
    // 当没有新的任务时,执行一次 `idleCallback`,表示当前 `OkHttp` 空闲了,和 Android 中的 `IdleHandler` 类似。  
    idleCallback.run()
  }
}

异步请求

趁热打铁,我们接着看看 RealCall#enqueue() 方法执行的异步调用:

override fun enqueue(responseCallback: Callback) {
  check(executed.compareAndSet(false, true)) { "Already Executed" }

  callStart()
  // 向 `Dispatcher` 中添加异步任务
  client.dispatcher.enqueue(AsyncCall(responseCallback))
}

异步任务的实现类是 AsyncCall,它是 RealCall 中的一个内部类,我们看看 Dispatcher#enqueue 方法:

internal fun enqueue(call: AsyncCall) {
  synchronized(this) {
    // 添加到等待队列
    readyAsyncCalls.add(call)

    // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
    // the same host.
    if (!call.call.forWebSocket) {
      // 如果不是 WebSocket,会更新当前的域名正在请求的数量
      val existingCall = findExistingCallWithHost(call.host)
      if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
    }
  }
  // 检查等待中的任务是否可以执行
  promoteAndExecute()
}

首先把任务添加到等待队列中;然后获取当前域名正在请求的任务的数量(这个数量我后面会分析),把这个数量更新在 RealCall 中;调用 promoteAndExecute() 执行等待队列中的可以执行的任务。
看看 promoteAndExecute() 方法的源码:

private fun promoteAndExecute(): Boolean {
  this.assertThreadDoesntHoldLock()

  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()
    // 遍历等待的任务
    while (i.hasNext()) {
      val asyncCall = i.next()
      
      // 如果当前执行中的任务大于了,最大的限制任务,退出遍历
      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
      // 如果当前任务的单个域名执行执行的个数大于了最大的限制,跳过当前 asyncCall
      if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

      i.remove()
      asyncCall.callsPerHost.incrementAndGet()
      executableCalls.add(asyncCall)
      
      // 移动当前任务到正在执行的任务中
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }

  // Avoid resubmitting if we can't logically progress
  // particularly because RealCall handles a RejectedExecutionException
  // by executing on the same thread.
  if (executorService.isShutdown) {
      // ...
  } else {
    // 在线程池上执行这些任务
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }
  }

  return isRunning
}

Dispatcher 默认限制同时最多执行 64 个任务(可自定义),同一个域名最多有 5 个任务同时执行(可自定义)。如果没有达到限制,就会把等待中的任务移动到正在执行的任务中,最后把这些任务在 executorService 上执行(上面的限制只是对异步任务有效,同步任务不受影响),等待中的任务需要等待其他任务执行完成后调用 promoteAndExecute() 方法来检测等待中的任务是否达到执行的要求(还有其他的逻辑也会调用 promoteAndExecute() 方法去判断),可以执行的话移动到正在执行中的任务。

我们看看 executorService 的创建:

@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
  get() {
    if (executorServiceOrNull == null) {
      executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
  }

线程池是一个无限大小的缓存线程池,线程的回收时间是 60s。

我们再来看看 AsyncCall#executeOn() 方法:

fun executeOn(executorService: ExecutorService) {
  client.dispatcher.assertThreadDoesntHoldLock()

  var success = false
  try {
    // 线程池上执行任务,入口函数为 run()
    executorService.execute(this)
    success = true
  } catch (e: RejectedExecutionException) {
    failRejected(e)
  } finally {
    if (!success) {
      client.dispatcher.finished(this) // This call is no longer running!
    }
  }
}

  override fun run() {
    threadName("OkHttp ${redactedUrl()}") {
      var signalledCallback = false
      timeout.enter()
      try {
        // 真正执行
        val response = getResponseWithInterceptorChain()
        signalledCallback = true
        // 回调成功
        responseCallback.onResponse(this@RealCall, response)
      } catch (e: IOException) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
        } else {
          // 回调失败
          responseCallback.onFailure(this@RealCall, e)
        }
      } catch (t: Throwable) {
        cancel()
        if (!signalledCallback) {
          val canceledException = IOException("canceled due to $t")
          canceledException.addSuppressed(t)
          // 回调失败
          responseCallback.onFailure(this@RealCall, canceledException)
        }
        throw t
      } finally {
        // 将任务从 Dispatcher 中移除
        client.dispatcher.finished(this)
      }
    }
  }
}

上面的代码也都是比较简单,就不多说了,然后他的真正的 Http 请求也是通过 getResponseWithInterceptorChain() 方法来完成。

拦截器链的执行

我们上面讲同步调用和异步调用中都讲到了最终的请求都是通过 getResponseWithInterceptorChain() 方法来完成。

@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
  // Build a full stack of interceptors.
  // 自定义普通拦截器
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors
  // 系统定义的拦截器
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  // 不是 WebSocket 的情况下
  if (!forWebSocket) {
    // 自定义网络拦截器
    interceptors += client.networkInterceptors
  }
  // 真正的通过 Socket 和 Server 来交换信息。
  interceptors += CallServerInterceptor(forWebSocket)

  // 构建拦截器链
  val chain = RealInterceptorChain(
    call = this,
    interceptors = interceptors,
    index = 0,
    exchange = null,
    request = originalRequest,
    connectTimeoutMillis = client.connectTimeoutMillis,
    readTimeoutMillis = client.readTimeoutMillis,
    writeTimeoutMillis = client.writeTimeoutMillis
  )

  var calledNoMoreExchanges = false
  try {
    // 开始执行拦截器链,最后一个拦截器执完成后就获取到了结果
    val response = chain.proceed(originalRequest)
    if (isCanceled()) {
      response.closeQuietly()
      throw IOException("Canceled")
    }
    return response
  } catch (e: IOException) {
    calledNoMoreExchanges = true
    throw noMoreExchanges(e) as Throwable
  } finally {
    if (!calledNoMoreExchanges) {
      noMoreExchanges(null)
    }
  }
}

拦截器链排在最前面的是自定义的普通拦截器,然后是 4 个普通的系统拦截器,再然后是自定义的网络拦截器(非 WebSocket,拦截器执行时,可以保证对应的 Socket 已经创建完成,而且在它之中必须调用一次 RealInterceptorChain#procced() 方法,也只能调用一次),最后是系统的 CallServerInterceptor 拦截器。我这里简单介绍一下系统拦截器的功能,具体的实现代码,后面的文章介绍。

所有的 Interceptor 都会被放入到 RealInterceptorChain,然后通过其 proceed() 方法开始执行,其中 index 就是下一个要执行的 Interceptorindex。 这里还有一个比较重要的参数就是 Exchange,如果它为空就表示还没有创建网络连接,反之就是已经创建了网络连接,我们上面也说到 ConnectionInterceptor,在 ConnnectionInterceport 执行后,Exchange 对象就不为空了,我们来看看 RealInterceptorChain#proceed() 方法的源码:

@Throws(IOException::class)
override fun proceed(request: Request): Response {
  check(index < interceptors.size)
  // 调用次数记录
  calls++

  // exchange 不为空表示为网络拦截器
  if (exchange != null) {
    // 网络拦截器不能修改请求的域名
    check(exchange.finder.routePlanner.sameHostAndPort(request.url)) {
      "network interceptor ${interceptors[index - 1]} must retain the same host and port"
    }
    // 网络拦截器必须请求一次 proceed() 方法,也只能请求一次。
    check(calls == 1) {
      "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
    }
  }

  // Call the next interceptor in the chain.
  // 复制下一次要请求需要的 RealInterceptorChain 对象,这里的 index + 1 了,也就表示下一次的 Chain 执行的 Interceptor 为当前 `Interceptor` 的下一个。
  val next = copy(index = index + 1, request = request)
  val interceptor = interceptors[index]

  @Suppress("USELESS_ELVIS")
  // 执行拦截器的拦截方法,得到最后的结果然后返回。
  val response = interceptor.intercept(next) ?: throw NullPointerException(
      "interceptor $interceptor returned null")

  if (exchange != null) {
    check(index + 1 >= interceptors.size || next.calls == 1) {
      "network interceptor $interceptor must call proceed() exactly once"
    }
  }

  return response
}

这里要简单说明一下在网络拦截器中不能修改域名,这很好理解,因为这个时候网络连接已经建立,也就是域名已经解析完成,如果你修改了网络域名,也就是说上面的网络连接对应的域名和你修改后的不一样,当然上面的网络连接是不可用的,所以 OkHttp 禁止在网络拦截器中修改域名;这里还限制了网络拦截器必须调用一次 proceed() 方法。

这里会构建一个新的 RealInterceptorChain 对象,和原来的对象相比,index 值加 1,reqeust 使用外部传入的 reqeust。然后调用当前需要执行的 Interceptor,然后调用它的 intercept() 方法,把上面新创建的 RealInterceptorChain 对象传入进去。

最后

本篇文章介绍了 OkHttp 同步网络请求和异步网络请求,还介绍了异步网络请求时 Dispatcher 如何调度请求任务,还介绍了 RealInterceptorChain 拦截器链如何工作的。后面的文章还会介绍网络链接如何创建,默认的拦截器具体的工作原理。

上一篇下一篇

猜你喜欢

热点阅读