okhttp源码之分发器分析

2019-12-09  本文已影响0人  RenHaiRenWjc

定义

Square 贡献的一个处理网络请求的开源项目,从android 4.4 开始 httpURLConnection 底层实现采用 okhttp

优点

支持HTTP/2并允许对同一主机的所有请求共享一个套接字
通过连接池,减少了请求延迟
默认通过GZip压缩数据,帮我们压缩数据,请求速度更快、使用流量更少
响应缓存,避免了重复请求的网络
请求失败自动重试主机的其他ip,自动重定向

使用流程

okhttp使用流程图

从OkHttp使用流程图来看:
在使用okhttp发起一次请求时,最少存在在OkHttpClientRequestCall三个角色,结合代码来:

OkHttpClient client = new OkHttpClient.Builder().cache(cache).build();
Request request = new Request.Builder().url(ENDPOINT).build();
Call mCall = client.newCall(request);
Response response = mCall.execute();

其中OkHttpClientRequest的创建可以使用它为我们提供的Builder(建造者模式)。而Call则是把Request交给OkHttpClient之后返回的一个已准备好执行的请求。(建造者模式:将一个复杂的构建与其表示相分离,使得同样的构建过程可以创建不同的表示。实例化 OKHttpClient 和 Request 的时候,因为有太多的属性需要设置,而且开发者的需求组合千变万化,使用建造者模式可以)
同时 OkHttp 在设计时采用的门面模式,将整个系统的复杂性给隐藏起来,将子系统接口通过一个客户端 OkHttpClient 统一暴露出来。OkHttpClient中全是一些配置,比如拦截器配置等。
Call本身是一个接口,我们获得的实现为:RealCall,如下:

 fun newRealCall(client: OkHttpClient, originalRequest: Request, forWebSocket: Boolean): RealCall {
      // Safely publish the Call instance to the EventListener.
      return RealCall(client, originalRequest, forWebSocket).apply {
        transmitter = Transmitter(client, this)
      }
    }

Callexecute代表了同步请求,而enqueue则代表异步请求。两者唯一区别在于一个会直接发起网络请求,而另一个使用OkHttp内置的线程池来进行,这就涉及到OkHttp的任务分发器。

分发器---Dispatcher

是来调配请求任务的,内部会包含一个线程池。可以在创建OkHttpClient时,传递我们自己定义的线程池来创建分发器。
主要成员有:

var maxRequests = 64     // 异步请求时,最大请求数
var maxRequestsPerHost = 5    // The maximum number of requests for each host to execute concurrently.
var idleCallback: Runnable? = null     // A callback to be invoked each time the dispatcher becomes idle
var executorServiceOrNull: ExecutorService? = null   // 异步请求线程池
val readyAsyncCalls = ArrayDeque<AsyncCall>()  // 异步请求等待执行队列
val runningAsyncCalls = ArrayDeque<AsyncCall>()   // 异步请求正在执行队列
val runningSyncCalls = ArrayDeque<RealCall>()  // 同步请求正在执行队列

同步请求

  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }

因为同步请求不需要线程池,也不存在任何限制,所以分发器仅做一下记录。

异步请求

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)  // 加入等待请求队列
      .....
    }
    promoteAndExecute()
  }

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()
    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator() // 迭代器
      while (i.hasNext()) {
        val asyncCall = i.next()
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.超过运行中最大的请求数
        // 超过同一域名请求;如果不超过则放入执行列表中
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
        i.remove() // 从等待中的请求列表删除
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)  // 提交给线程池
    }
    return isRunning
  }

从上面看到,请求都是先加入等待请求队列,然后再判断最大请求数、同一域名最大数来看是否加入执行请求列表中。
提交到线程池后,那下一步做什么呢?看下executeOn() 所在的 AsyncCall

// 实现 Runnable ,线程启动时它时,会执行 run 方法
 internal inner class AsyncCall( private val responseCallback: Callback ) : Runnable {
    @Volatile
    
    fun executeOn(executorService: ExecutorService) {//重定向到`execute`方法
      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
       .....
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }
    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        transmitter.timeoutEnter()
        try {
          val response = getResponseWithInterceptorChain() // 真正的执行请求,返回结果,这才是 okhttp 的核心:拦截器责任链
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          ......
        } catch (t: Throwable) {
          ......
            responseCallback.onFailure(this@RealCall, canceledException)
          throw t
        } finally {
          client.dispatcher.finished(this) // 请求完成
        }
      }
    }
  }

看到没,启动一个线程在 run 中执行请求,那 AsyncCall 是什么初始的呢,看回enqueue(call: AsyncCall),显然通过传参进来,继续追踪,看到了
RealCall类中:

  // 异步
  override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" } // 检查是否重复执行
      executed = true
    }
    transmitter.callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback)) // 调用分发器
  }

而 RealCall 是实现 Call 的,因此在 Call mCall = client.newCall(request);已经初始了,就等线程执行 Runnable

而当执行请求列表执行完一个任务后,是怎么通知下一个等待请求的呢?
我们来看下 finished看代码

  internal fun finished(call: AsyncCall) {
    call.callsPerHost().decrementAndGet()
    finished(runningAsyncCalls, call)
  }
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }
  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      // 从请求列表中移除
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }
    val isRunning = promoteAndExecute() // 异步任务结束后,重新调配请求,把等待中的请求放入执行请求列表中 
    if (!isRunning && idleCallback != null) {
      idleCallback.run()
    }
  }

从代码来看,当任务执行完后,调用finished来获取下一个请求

分发器线程池

分发器内部会包含一个线程池。当异步请求时,会将请求任务交给线程池来执行。
那分发器中默认的线程池是如何定义的呢?为什么要这么定义?

  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(
          0, Int.MAX_VALUE,  // 核心数、最大线程、
          60, TimeUnit.SECONDS,  // 空闲线程闲置时间、闲置单位、
          SynchronousQueue(),  //线程等待队列,SynchronousQueue:一个不存储元素的阻塞队列
          threadFactory("OkHttp Dispatcher", false)) //线程工厂
      }
      return executorServiceOrNull!!
    }

首先核心线程为0,表示线程池不会一直为我们缓存线程,线程池中所有线程都是在60s内没有工作就会被回收。而最大线程Integer.MAX_VALUE与等待队列SynchronousQueue的组合能够得到最大的吞吐量。即当需要线程池执行任务时,如果不存在空闲线程不需要等待,马上新建线程执行任务!等待队列的不同指定了线程池的不同排队机制。
但是需要注意的时,我们都知道,进程的内存是存在限制的,而每一个线程都需要分配一定的内存。所以线程并不能无限个数。那么当设置最大线程数为Integer.MAX_VALUE时,OkHttp同时还有最大请求任务执行个数: 64的限制。这样即解决了这个问题同时也能获得最大吞吐。

总结:
上面就是分发器的分发流程了,其实分发器只是调配请求任务的,真正执行请求的工作都是在getResponseWithInterceptorChain()
分发器的源码分析主要弄清楚下面三个问题即可
Q: 如何决定将请求放入ready还是running?
A: 如果当前正在请求数不小于64放入ready;如果小于64,但是已经存在同一域名主机的请求5个放入ready!

Q: 从running移动ready的条件是什么?
A: 每个请求执行完成就会从running移除,同时进行第一步相同逻辑的判断,决定是否移动!

Q: 分发器线程池的工作优点?
A:无等待,最大并发

上一篇下一篇

猜你喜欢

热点阅读