OkHttp 4.0 Kotlin源码分析 (三) Dispa

2020-03-08  本文已影响0人  知止乎尔

前言

随着OkHttp 4.0 版本的发布,从这一节起我们将以OkHttp 4.0 为分析素材。 官方介绍说一切都变了,一切又都没变,说人话就是从4.0开始Okhttp从java迁移到了Kotlin。 Okhttp的迁移也激发了更多android开发者向Kotlin转换的信心。

implementation("com.squareup.okhttp3:okhttp:4.4.0")

其他相关文章

OkHttp 4.0 Kotlin源码分析 (一) 同步和异步请求基本用法

OkHttp 4.0 Kotlin源码分析 (二) 基本的数据对象以及Call类分析

OkHttp 4.0 Kotlin源码分析 (三) Dispatcher分发器流程控制

Dispatcher 调度

前面章节介绍了,OKhttp的基本用法和同步异步请求流程的区别,其中涉及到一个很重要的概念叫做Dispatcher。 通过前面的文章我们可以知道Dispatch完成了对同步和异步请求的分发和调度。 那么这个Dispather 是怎么完成这些工作的呢?

同步和异步请求结构

先看张图,让我们逐步分析首先我们看下同步请求时候Dispatcher做了些什么?

Dispatcher同步请求的调度

我们知道在执行同步请求的时候,实际执行的的是RealCall中的execute方法其中有9、10、12三行非常关键的代码。。

  override fun execute(): Response {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

同步请求非常简单,我们看到在同步请求中,我们知道getResponseWithInterceptorChain()是真正的网络请求执行单元并且使用了责任链模式实现的拦截器。 前后分别执行了Dispatcher的 executed 和 finish方法。 我们可以猜想,除了执行请求还需要做些什么呢?

没错还需要维护请求状态,因为我们知道我们的请求个数和请求的Host的连接数都是有限制的。毕竟我们可以在多个线程中同时执行多个同步请求,而且还有异步请求。

@get:Synchronized var maxRequests = 64
@get:Synchronized var maxRequestsPerHost = 5

那么在Dispatcher中是怎么维护这个请求状态的呢?
首先我们看下Dispatcher中有这么几个比较重要的双端队列ArrayDeque,分别是一个同步请求的队列和两个异步请求的队列,为什么异步请求有两个呢,其实,其中一个是正在执行的队列runningAsyncCalls,另一个是执行等待队列readyAsyncCalls

  /** Ready async calls in the order they'll be run. */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private val runningSyncCalls = ArrayDeque<RealCall>()

Dispatcher中还有很多函数用于判断当前队列的数量情况

/** Returns a snapshot of the calls currently being executed. */
 //实际执行的同步和异步请求Call列表
  @Synchronized fun runningCalls(): List<Call{
    return Collections.unmodifiableList(runningSyncCalls + runningAsyncCalls.map { it.call })
  }
  // 异步请求等待队列的请求数
  @Synchronized fun queuedCallsCount(): Int = readyAsyncCalls.size
 // 获取正在执行的同步请求和异步请求数
  @Synchronized fun runningCallsCount(): Int = runningAsyncCalls.size + runningSyncCalls.size

那么继续看同步请求中,这个Dispatcher的executed和finish方法到底做了什么,先上代码:

/** Used by `Call#execute` to signal it is in-flight. */
  @Synchronized internal fun executed(call: RealCall) {
    //注意这里操作的是同步请求队列
    runningSyncCalls.add(call)
  }

/** Used by `Call#execute` to signal completion. */
  internal fun finished(call: RealCall) {
    //注意这里操作的是同步请求队列
    finished(runningSyncCalls, call)
  }

private fun <Tfinished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

看到这里已经很明确了,同步请求 在Dispatcher中操作了同步请求队列runningSyncCalls 分别执行了简单的添加和移除操作。 到此我们同步请求也就分析完成了。

如果不出意外,异步请求时候也会去操作对应的等待和执行队列,这猜想是正确的,相比于同步请求异步请求会削微复杂一点。我们接下了这个章节分析下异步请求到底做了哪些事情,到底怎么调度的?

Dispatcher异步请求的调度

在分析之前,削微透露一下,其实异步请求是将异步请求从准备队列中移除然后加入到执行队列中,并使用一个线程池去执行请求也就是AsyncCall
为什么线程池能够执行AsyncCall,我想我也不用多说了,其实它就是Runnable。

internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable 

说了这么多,我们看下enqueue的函数到底是怎么样的呢?

internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

这个函数好像比同步请求并没复杂,就是加入到了异步请求队列而已吧。 是的!
这不还有一个重要的函数promoteAndExecute()这里面执行了异步请求的重要逻辑。

  /**
   * Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
   * executor service. Must not be called with synchronization because executing calls can call
   * into user code.
   *
   * @return true if the dispatcher is currently running calls.
   */
  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

前面的一堆逻辑就干了这些内容。判断正在执行的请求是不是超出了最大请求或最大Host数,没有的话就把等待队列中的AsyncCall对象移除,加入到执行队列中,然后呢就从执行队列中拿到线程池中执行。

asyncCall.executeOn(executorService)

  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

这里这个ThreadPoolExecutor很有意思, 我们看第一个参数,核心线程数是0,为啥呢?我们知道核心线程是不会被销毁的,这里设置为0,是想让线程池在不用的时候将线程销毁掉。

那这个线程执行了啥? 当然是asyncCall的run方法。那我们就看看这个RealCall.kt中的AsyncCallrun()方法干了什么?

override fun run() {
  threadName("OkHttp ${redactedUrl()}") {
    var signalledCallback = false
    timeout.enter()
    try {
      val response = getResponseWithInterceptorChain()
      signalledCallback = true
      responseCallback.onResponse(this@RealCall, response)
    } catch (e: IOException) {
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
      } else {
        responseCallback.onFailure(this@RealCall, e)
      }
    } catch (t: Throwable) {
      cancel()
      if (!signalledCallback) {
        val canceledException = IOException("canceled due to $t")
        canceledException.addSuppressed(t)
        responseCallback.onFailure(this@RealCall, canceledException)
      }
      throw t
    } finally {
      client.dispatcher.finished(this)
    }
  }
}

不出所料,这里执行了val response = getResponseWithInterceptorChain()
拦截器执行真正的网络请求,同时对返回结果进行了回调处理。而且我们注意到和同步请求一样这里还是调用了这个client.dispatcher.finished(this)

  /** Used by `AsyncCall#run` to signal completion. */
  internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

这个finish是参数重载的方法,和同步请求的finish是不同的。 但是执行内容我们可以预见是一致的。

好了到此我们同步请求和异步请求都分析完了。

总结

Dispatcher是Okhttp中很重要的调度单元,根据我们定义的请求限制,完成了对同步和异步请求的调度。主要做的工作是操作了三个队列,有等待队列和执行队列,并在合适的时机调用对应的getResponseWithInterceptorChain()执行请求。

那么getResponseWithInterceptorChain()是什么?它是怎么完成请求的?又有哪些巧妙的地方,为什么这个函数承载了整个Okhttp的设计精髓?我们将在下一篇中继续进行分析。

如果对你削微有点帮助,给个赞是对我最大的鼓励。

上一篇下一篇

猜你喜欢

热点阅读