okhttp源码之分发器分析
定义
Square 贡献的一个处理网络请求的开源项目,从android 4.4 开始 httpURLConnection 底层实现采用 okhttp
优点
支持HTTP/2并允许对同一主机的所有请求共享一个套接字
通过连接池,减少了请求延迟
默认通过GZip压缩数据,帮我们压缩数据,请求速度更快、使用流量更少
响应缓存,避免了重复请求的网络
请求失败自动重试主机的其他ip,自动重定向
使用流程
okhttp使用流程图从OkHttp使用流程图来看:
在使用okhttp发起一次请求时,最少存在在OkHttpClient
、Request
与Call
三个角色,结合代码来:
OkHttpClient client = new OkHttpClient.Builder().cache(cache).build();
Request request = new Request.Builder().url(ENDPOINT).build();
Call mCall = client.newCall(request);
Response response = mCall.execute();
其中OkHttpClient
和Request
的创建可以使用它为我们提供的Builder
(建造者模式)。而Call
则是把Request
交给OkHttpClient
之后返回的一个已准备好执行的请求。(建造者模式:将一个复杂的构建与其表示相分离,使得同样的构建过程可以创建不同的表示。实例化 OKHttpClient 和 Request 的时候,因为有太多的属性需要设置,而且开发者的需求组合千变万化,使用建造者模式可以)
同时 OkHttp 在设计时采用的门面模式,将整个系统的复杂性给隐藏起来,将子系统接口通过一个客户端 OkHttpClient 统一暴露出来。OkHttpClient
中全是一些配置,比如拦截器配置等。
而Call
本身是一个接口,我们获得的实现为:RealCall
,如下:
fun newRealCall(client: OkHttpClient, originalRequest: Request, forWebSocket: Boolean): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
Call
的execute
代表了同步请求,而enqueue
则代表异步请求。两者唯一区别在于一个会直接发起网络请求,而另一个使用OkHttp内置的线程池来进行,这就涉及到OkHttp的任务分发器。
分发器---Dispatcher
是来调配请求任务的,内部会包含一个线程池。可以在创建OkHttpClient
时,传递我们自己定义的线程池来创建分发器。
主要成员有:
var maxRequests = 64 // 异步请求时,最大请求数
var maxRequestsPerHost = 5 // The maximum number of requests for each host to execute concurrently.
var idleCallback: Runnable? = null // A callback to be invoked each time the dispatcher becomes idle
var executorServiceOrNull: ExecutorService? = null // 异步请求线程池
val readyAsyncCalls = ArrayDeque<AsyncCall>() // 异步请求等待执行队列
val runningAsyncCalls = ArrayDeque<AsyncCall>() // 异步请求正在执行队列
val runningSyncCalls = ArrayDeque<RealCall>() // 同步请求正在执行队列
同步请求
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
因为同步请求不需要线程池,也不存在任何限制,所以分发器仅做一下记录。
异步请求
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call) // 加入等待请求队列
.....
}
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator() // 迭代器
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.超过运行中最大的请求数
// 超过同一域名请求;如果不超过则放入执行列表中
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove() // 从等待中的请求列表删除
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService) // 提交给线程池
}
return isRunning
}
从上面看到,请求都是先加入等待请求队列,然后再判断最大请求数、同一域名最大数来看是否加入执行请求列表中。
提交到线程池后,那下一步做什么呢?看下executeOn() 所在的 AsyncCall
// 实现 Runnable ,线程启动时它时,会执行 run 方法
internal inner class AsyncCall( private val responseCallback: Callback ) : Runnable {
@Volatile
fun executeOn(executorService: ExecutorService) {//重定向到`execute`方法
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
.....
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
val response = getResponseWithInterceptorChain() // 真正的执行请求,返回结果,这才是 okhttp 的核心:拦截器责任链
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
......
} catch (t: Throwable) {
......
responseCallback.onFailure(this@RealCall, canceledException)
throw t
} finally {
client.dispatcher.finished(this) // 请求完成
}
}
}
}
看到没,启动一个线程在 run 中执行请求,那 AsyncCall 是什么初始的呢,看回enqueue(call: AsyncCall)
,显然通过传参进来,继续追踪,看到了
RealCall
类中:
// 异步
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" } // 检查是否重复执行
executed = true
}
transmitter.callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback)) // 调用分发器
}
而 RealCall 是实现 Call 的,因此在 Call mCall = client.newCall(request);
已经初始了,就等线程执行 Runnable
而当执行请求列表执行完一个任务后,是怎么通知下一个等待请求的呢?
我们来看下 finished
看代码
internal fun finished(call: AsyncCall) {
call.callsPerHost().decrementAndGet()
finished(runningAsyncCalls, call)
}
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
// 从请求列表中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute() // 异步任务结束后,重新调配请求,把等待中的请求放入执行请求列表中
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
从代码来看,当任务执行完后,调用finished来获取下一个请求
分发器线程池
分发器内部会包含一个线程池。当异步请求时,会将请求任务交给线程池来执行。
那分发器中默认的线程池是如何定义的呢?为什么要这么定义?
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(
0, Int.MAX_VALUE, // 核心数、最大线程、
60, TimeUnit.SECONDS, // 空闲线程闲置时间、闲置单位、
SynchronousQueue(), //线程等待队列,SynchronousQueue:一个不存储元素的阻塞队列
threadFactory("OkHttp Dispatcher", false)) //线程工厂
}
return executorServiceOrNull!!
}
首先核心线程为0,表示线程池不会一直为我们缓存线程,线程池中所有线程都是在60s内没有工作就会被回收。而最大线程Integer.MAX_VALUE
与等待队列SynchronousQueue
的组合能够得到最大的吞吐量。即当需要线程池执行任务时,如果不存在空闲线程不需要等待,马上新建线程执行任务!等待队列的不同指定了线程池的不同排队机制。
但是需要注意的时,我们都知道,进程的内存是存在限制的,而每一个线程都需要分配一定的内存。所以线程并不能无限个数。那么当设置最大线程数为Integer.MAX_VALUE
时,OkHttp同时还有最大请求任务执行个数: 64的限制。这样即解决了这个问题同时也能获得最大吞吐。
总结:
上面就是分发器的分发流程了,其实分发器只是调配请求任务的,真正执行请求的工作都是在getResponseWithInterceptorChain()
中
分发器的源码分析主要弄清楚下面三个问题即可
Q: 如何决定将请求放入ready还是running?
A: 如果当前正在请求数不小于64放入ready;如果小于64,但是已经存在同一域名主机的请求5个放入ready!
Q: 从running移动ready的条件是什么?
A: 每个请求执行完成就会从running移除,同时进行第一步相同逻辑的判断,决定是否移动!
Q: 分发器线程池的工作优点?
A:无等待,最大并发