Okhttp源码解读第一篇——整体架构
OkHttp概述
OkHttp 是适用于 Android 和 Java 应用程序的高效HTTP客户端。最早的版本是基于 HttpURLConnection 和 HttpClient 的封装,之后 Android 6.0 版移除了对 Apache HTTP 客户端的支持,OkHttp 也移除了 HttpClient,然后移除 HttpURLConnection 的底层支持,连接底层到应用全部自己实现,成为业界公认最佳方案,HttpURLConnection 的底层实现也改为了 OkHttp 的实现方式。
优势:
-
支持HTTP/2, HTTP/2通过使用多路复用技术在一个单独的TCP连接上支持并发, 通过在一个连接上一次性发送多个请求来发送或接收数据
-
如果HTTP/2不可用, 连接池复用技术也可以极大减少延时
-
支持GZIP, 可以压缩下载体积
-
响应缓存可以直接避免重复请求
-
会从很多常用的连接问题中自动恢复
-
如果您的服务器配置了多个IP地址, 当第一个IP连接失败的时候, OkHttp会自动尝试下一个IP
-
OkHttp还处理了代理服务器问题和SSL握手失败问题
使用
使用 OkHttp 同步请求
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(BASE_URL + "/date")
.build();
Call call = client.newCall(request);
Response response = call.execute();
使用 OkHttp 异步请求
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url(BASE_URL + "/date")
.build();
Call call = client.newCall(request);
call.enqueue(new Callback() {
public void onResponse(Call call, Response response)
throws IOException {
// ...
}
public void onFailure(Call call, IOException e) {
fail();
}
});
源码走读
异步请求
从 call.enqueue
开始,查看方法实现:
fun enqueue(responseCallback: Callback)
这是一个抽象方法,所以需要回到Call call = client.newCall(request)
,查看newCall
的实现,
/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
那么真正调用enqueue
的是RealCall
,查看 RealCall
里enqueue
的实现:
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
callStart()
是一个监听器,监听请求的开始,重点在这里:client.dispatcher.enqueue(AsyncCall(responseCallback))
。点进dispatcher
:
/**
* Policy on when async requests are executed.
*
* Each dispatcher uses an [ExecutorService] to run calls internally. If you supply your own
* executor, it should be able to run [the configured maximum][maxRequests] number of calls
* concurrently.
*/
class Dispatcher constructor() {}
通过注释,可以知道这是一个使用 ExecutorService 的线程调度器。
/**
* The maximum number of requests to execute concurrently. Above this requests queue in memory,
* waiting for the running calls to complete.
*
* If more than [maxRequests] requests are in flight when this is invoked, those requests will
* remain in flight.
*/
@get:Synchronized var maxRequests = 64
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
可以 Set 的同时执行的最大请求数,默认同时执行64个请求。
/**
* The maximum number of requests for each host to execute concurrently. This limits requests by
* the URL's host name. Note that concurrent requests to a single IP address may still exceed this
* limit: multiple hostnames may share an IP address or be routed through the same HTTP proxy.
*
* If more than [maxRequestsPerHost] requests are in flight when this is invoked, those requests
* will remain in flight.
*
* WebSocket connections to hosts **do not** count against this limit.
*/
@get:Synchronized var maxRequestsPerHost = 5
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
每个主机能同时执行的最大请求数,默认是5个。
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
内部使用一个核心线程数为0、容量无限大、60秒超时的线程池。
查看enqueue
:
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
readyAsyncCalls.add(call)
把准备执行但未执行的 AsyncCall 放入队列,然后记录主机连接数,最后调用promoteAndExecute()
:
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)
}
return isRunning
}
遍历队列,挑选符合条件可以被执行的 Call ,把他们放在正在执行的队列,然后调用asyncCall.executeOn(executorService)
执行:
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
executorService.execute(this)
这里既然传入了this
,那么这个类肯定实现了 Runnable 接口:
inner class AsyncCall(
private val responseCallback: Callback
) : Runnable
我们查看 AsyncCall 的 run
方法,找出怎么执行的:
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
val response = getResponseWithInterceptorChain()
这里直接返回的是 Response,说明getResponseWithInterceptorChain
里会发起请求和返回,核心也就在这里。
同步请求
下面看下同步请求的执行:
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher.finished(this)
}
}
很简单,直接调用getResponseWithInterceptorChain()
。
整体架构分析到此,下一篇详细分析getResponseWithInterceptorChain()
。