OkHttp 源码解析(Kotlin版)
前言
OkHttp 是一款非常优秀的网络请求框架,随着Kotlin语言的不断完善,OkHttp 自 4.0 版本开始使用Kotlin编写,对于巩固Kotlin语法知识及实践,研读OkHttp 4.0及以上版本的源码是一个不错的选择。
首先列出一些前置知识点:
1xx:信息,请求收到,继续处理
2xx:成功,行为被成功地接受、理解和采纳
3xx:重定向,为了完成请求,必须进一步执行的动作
4xx:客户端错误,请求包含语法错误或者请求无法实现
5xx:服务器错误,服务器不能实现一种明显无效的请求
OkHttp的基本使用
- 首先添加依赖库(去官网找最新的或想要的版本)
- Http 请求有多种类型,常用的分为 Get 和 Post,而 POST 又分为 Form 和 Multiple 等,下面我们以Get请求为例:
// 1.创建OkHttpClient 对象,
// var client = OkHttpClient();//方式一
//方式二:
val client = OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build()
//2.创建请求对象并添加请求参数信息
val request = Request.Builder().url("").build()
//3.构建进行请求操作的call对象
val call = client.newCall(request)
//同步请求 Call (RealCall)—>execute() 返回response
// val response = client.newCall(request).execute()
//异步请求 Call (RealCall)—>enqueue()
call.enqueue(
object : Callback {
override fun onFailure(call: Call, e: IOException) {
println(e.stackTrace.toString())
}
@Throws(IOException::class)
override fun onResponse(call: Call, response: Response) {
println(response.body.toString())
}
})
- 首先使用OkHttpClint的构造
OkHttpClient()
或者Build模式构建一个OkHttpClint的对象实例; - 使用构建者模式构建一个Request对象,通过OkHttpClient和Request对象,构建出Call对象;
- 执行call的
enqueue()
或者execute()
。
注意:在实际开发中建议将OkHttpClint对象的创建封装成单列, 因为每个 OkHttpClient 对象都管理自己独有的线程池和连接池,复用连接池和线程池能够减少延迟、节省内存。
OkHttp 源码分析
一. OkHttpClient
constructor() : this(Builder())
//这里是默认的参数设置
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher()//调度器,通过双端队列保存Calls(同步&异步Call)
internal var connectionPool: ConnectionPool = ConnectionPool()//链接池
internal val interceptors: MutableList<Interceptor> = mutableListOf()//拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()//网络拦截器
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()//一个Call的状态监听器
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
internal var followRedirects = true
internal var followSslRedirects = true
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES//默认没有Cookie
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM//域名解析系统 domain name -> ip address
internal var proxy: Proxy? = null
internal var proxySelector: ProxySelector? = null//使用默认的代理选择器
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
internal var socketFactory: SocketFactory = SocketFactory.getDefault()//默认的Socket 工厂生产Socket
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS//OKHttp连接(Connection)配置
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
internal var connectTimeout = 10_000
internal var readTimeout = 10_000
internal var writeTimeout = 10_000
internal var pingInterval = 0//和WebSocket有关,为了保持长连接,我们必须间隔一段时间发送一个ping指令进行保活
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
connectionSpecs: OKHttp连接(Connection)配置
companion object {
internal val DEFAULT_PROTOCOLS = immutableListOf(HTTP_2, HTTP_1_1)
internal val DEFAULT_CONNECTION_SPECS = immutableListOf(
ConnectionSpec.MODERN_TLS, ConnectionSpec.CLEARTEXT)
}
/**
* A modern TLS configuration that works on most client platforms and can connect to most servers.
* This is OkHttp's default configuration.
*/
//针对TLS的, 是OkHttp 的默认配置
@JvmField
val MODERN_TLS = Builder(true)
.cipherSuites(*APPROVED_CIPHER_SUITES)
.tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2)
.supportsTlsExtensions(true)
.build()
/** URL的未经加密,未经身份验证的连接 */
@JvmField
val CLEARTEXT = Builder(false).build()
二. 同步请求流程分析
以下代码为同步请求流程中的核心代码,按照调用次序呈现。
// 1.
val response = client.newCall(request).execute()
// 2.
/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
//RealCall的execute()
// 3. 真正执行请求的是在call的实现类 RealCall的execute()中
override fun execute(): Response {
//标记请求执行状态:一个请求只能执行一次
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
timeout.enter()
callStart()
try {
// 4. 通知dispatcher已经进入执行状态
client.dispatcher.executed(this)
// 5. 通过连接器的链式调用进行请求处理并返回最终响应结果
return getResponseWithInterceptorChain()
} finally {
// 6. 通知dispatcher自己已执行完毕
client.dispatcher.finished(this)
}
}
//4 . dispatcher.executed()
/** Used by `Call#execute` to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
//Dispatcher中维护的ArrayDeque
/** 准备执行的异步请求队列. */
private val readyAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在执行的异步请求队列 */
private val runningAsyncCalls = ArrayDeque<AsyncCall>()
/** 正在执行的同步请求队列 */
private val runningSyncCalls = ArrayDeque<RealCall>()
//5 .RealCall的getResponseWithInterceptorChain()
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors//用户在构建OkHttpClient是配置的连接器
interceptors += RetryAndFollowUpInterceptor(client)//负责请求失败后的重试和重定向
interceptors += BridgeInterceptor(client.cookieJar)//对请求和响应的参数进行必要的处理
interceptors += CacheInterceptor(client.cache)//读取缓存数据返回、更新缓存
interceptors += ConnectInterceptor//负责跟服务器的链接操作
if (!forWebSocket) {
//创建OkHttpClient时设置的networkInterceptor
interceptors += client.networkInterceptors
}
//向服务器发送请求数据,读取响应数据
interceptors += CallServerInterceptor(forWebSocket)
//将请求对象及OkHttpClient的一些配置封装在RealInterceptorChain中
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
//开启链式调用
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
//7. RealInterceptorChain的proceed()
@Throws(IOException::class)
override fun proceed(request: Request): Response {
check(index < interceptors.size)
calls++
if (exchange != null) {
check(exchange.finder.sameHostAndPort(request.url)) {
"network interceptor ${interceptors[index - 1]} must retain the same host and port"
}
check(calls == 1) {
"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
}
}
// Call the next interceptor in the chain. 实例化下一个拦截器
val next = copy(index = index + 1, request = request)
//获取当前拦截器
val interceptor = interceptors[index]
//调用当前拦截器的intercept(),并将下一个拦截器的RealIterceptorChain对象传递下去,最后返回响应结果
@Suppress("USELESS_ELVIS")
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
if (exchange != null) {
check(index + 1 >= interceptors.size || next.calls == 1) {
"network interceptor $interceptor must call proceed() exactly once"
}
}
check(response.body != null) { "interceptor $interceptor returned a response with no body" }
return response
}
三. 异步请求流程分析
//1. 异步请求 Call (RealCall)—>enqueue()
client.newCall(request).enqueue(
object : Callback {
override fun onFailure(call: Call, e: IOException) {
println(e.stackTrace.toString())
}
@Throws(IOException::class)
override fun onResponse(call: Call, response: Response) {
println(response.body.toString())
}
})
//2. RealCall 的enqueue()
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
//3. dispatcher 的enqueue()
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
//将请求添加到等待执行的异步请求队列中
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
/**
* Promotes eligible calls from [readyAsyncCalls] to [runningAsyncCalls] and runs them on the
* executor service. Must not be called with synchronization because executing calls can call
* into user code.
*
* @return true if the dispatcher is currently running calls.
*/
//4. 不断从readyAsyncCalls中取出要执行的请求放到runningAsyncCalls中,并将readyAsyncCalls中的移除
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
// 如果其中的runningAsynCalls不满,且call占用的host小于最大数量,则将call加入到runningAsyncCalls中执行,
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove()
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 利用线程池执行call
asyncCall.executeOn(executorService)
}
return isRunning
}
异步请求的dispatcher.enqueue(AsyncCall)中传入是call 是一个AsyncCall,接下来看AsyncCall的实现.它是RealCall的内部类,实际是一个Runnable。
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
@Volatile var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
val host: String
get() = originalRequest.url.host
val request: Request
get() = originalRequest
val call: RealCall
get() = this@RealCall
/**
* Attempt to enqueue this async call on [executorService]. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
//在线程迟中执行
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
//最终进行拦截器的链式调用来处理请求并返回最终的响应结果
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
-
通过源码看到Dispatcher维护了三个ArrayDeque,一个保存了正在执行的同步任务;一个保存异步正在执行的请求,另一个是异步等待执行的请求,异步右两个ArrayDeque是因为Dispatcher默认支持最大的并发请求是64个,单个Host最多执行5个并发请求,如果超过,则Call会先被放入到readyAsyncCall中,当出现空闲的线程时,再将readyAsyncCall中的线程移入到runningAsynCalls中,执行请求。
-
通过拦截器链处理,得到响应结果后执行finally中的代码
dispatcher.finished(this)
,
现在来看下这个方法,走到这,一个请求流程就结束了。
/** Used by [AsyncCall.run] to signal completion. */
//异步请求时调用
internal fun finished(call: AsyncCall) {
call.callsPerHost.decrementAndGet()
finished(runningAsyncCalls, call)
}
/** Used by [Call.execute] to signal completion. */
//同步请求时调用
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
//最终都调用这个方法
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//将当前call从其队列中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
总结:网络请求是从OkHttpClient().newCall(request)
开始的,通过创建的OkHttpClient对象和Request对象,构建出一个RealCall对象来执行网络请求,同步请求是在RealCall
的execute()
方法中,异步请求是在enqueue()
中,在这两个方法中都用OkHttpClient
对象的dispatcher
执行对应的请求方法。对于同步请求,dispatcher
的execute()
就是将请求加入到runningSyncCalls
这个双端队列中;对于异步请求,dispatcher
进行请求的分发执行。在dispatcher
将请求分发后调用getResponseWithInterceptorChain()
方法,在这里,==依次==将client.interceptorsRetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor、client.networkInterceptors和CallServerInterceptor
添加到一个集合中,并创建出一个拦截器链RealInterceptorChain
,通过RealInterceptorChain.proceed()
使每一个拦截器执行完毕之后会调用下一个拦截器或者不调用并返回结果。显然,我们最终拿到的响应就是这个链条执行之后返回的结果。
-
整体的请求流程图如下:
image
四.OkHttp内置拦截器源码分析
1. RetryAndFollowUpInterceptor
这个拦截器负责重试和重定向,当一个请求由于各种原因失败了,如果是路由或者连接异常,则尝试恢复,否则,根据响应码(ResponseCode),followup方法会对Request进行再处理以得到新的Request,然后沿着拦截器链继续新的Request;当尝试次数超过最大次数就抛出异常。代码逻辑相对比较简单,这里就不贴出来了。
2. BridgeInterceptor
负责将用户请求转换为网络请求,也就是根据 Request 信息组建请求 Header 以及设置响应数据,包括设置 Cookie 以及gzip。源码就不贴出来了。
3. CacheInterceptor
负责根据请求的信息和缓存的响应的信息来判断是否存在可用的缓存,读取缓存直接返回、否则就继续使用责任链模式来从服务器中获取响应。当获取到响应的时候,更新缓存。
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
//从缓存中获取
val cacheCandidate = cache?.get(chain.request())
val now = System.currentTimeMillis()
//缓存策略,决定使用缓存还是从网络获取
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
val networkRequest = strategy.networkRequest
val cacheResponse = strategy.cacheResponse
//根据缓存策略,更新统计指标:请求次数、使用网络请求次数、使用缓存次数
cache?.trackResponse(strategy)
//若缓存不可用,关闭
if (cacheCandidate != null && cacheResponse == null) {
// The cache candidate wasn't applicable. Close it.
cacheCandidate.body?.closeQuietly()
}
// If we're forbidden from using the network and the cache is insufficient, fail.
//如果既无网络请求可用,又没有缓存,则返回504错误
if (networkRequest == null && cacheResponse == null) {
return Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(HTTP_GATEWAY_TIMEOUT)
.message("Unsatisfiable Request (only-if-cached)")
.body(EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
}
// If we don't need the network, we're done.
//缓存可用,则返回缓存中数据
if (networkRequest == null) {
return cacheResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build()
}
var networkResponse: Response? = null
try {
//进行网络请求,返回请求结果
networkResponse = chain.proceed(networkRequest)
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
cacheCandidate.body?.closeQuietly()
}
}
// If we have a cache response too, then we're doing a conditional get.
//HTTP_NOT_MODIFIED缓存有效,合并网络请求和缓存
if (cacheResponse != null) {
if (networkResponse?.code == HTTP_NOT_MODIFIED) {
val response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers, networkResponse.headers))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis)
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
networkResponse.body!!.close()
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache!!.trackConditionalCacheHit()
cache.update(cacheResponse, response)//更新缓存
return response
} else {
cacheResponse.body?.closeQuietly()
}
}
val response = networkResponse!!.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build()
//允许缓存且请求结果不为空,则写入缓存
if (cache != null) {
if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
val cacheRequest = cache.put(response)
return cacheWritingResponse(cacheRequest, response)
}
if (HttpMethod.invalidatesCache(networkRequest.method)) {
try {
cache.remove(networkRequest)
} catch (_: IOException) {
// The cache cannot be written.
}
}
}
return response
}
// CacheStrategy中的核心方法 computeCandidate()
/** Returns a strategy to use assuming the request can use the network. */
private fun computeCandidate(): CacheStrategy {
// No cached response. 没有缓存,直接进行网络请求
if (cacheResponse == null) {
return CacheStrategy(request, null)
}
// Drop the cached response if it's missing a required handshake. 是https请求,但是没有握手,进行网络请求
if (request.isHttps && cacheResponse.handshake == null) {
return CacheStrategy(request, null)
}
// If this response shouldn't have been stored, it should never be used as a response source.
// This check should be redundant as long as the persistence store is well-behaved and the
// rules are constant.
//不能进行缓存
if (!isCacheable(cacheResponse, request)) {
return CacheStrategy(request, null)
}
val requestCaching = request.cacheControl
//请求头nocache或者请求头包含If-Modified-Since或者If-None-Match(意味着本地缓存过期,需要服务器验证本地缓存是不是还能继续使用)
if (requestCaching.noCache || hasConditions(request)) {
return CacheStrategy(request, null)
}
val responseCaching = cacheResponse.cacheControl
val ageMillis = cacheResponseAge()
var freshMillis = computeFreshnessLifetime()
if (requestCaching.maxAgeSeconds != -1) {
freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
}
var minFreshMillis: Long = 0
if (requestCaching.minFreshSeconds != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
}
var maxStaleMillis: Long = 0
if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
}
//缓存过期了,但仍然可用,给相应头中添加了Warning,使用缓存
if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
val builder = cacheResponse.newBuilder()
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
}
val oneDayMillis = 24 * 60 * 60 * 1000L
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
}
return CacheStrategy(null, builder.build())//使用缓存
}
// Find a condition to add to the request. If the condition is satisfied, the response body
// will not be transmitted.
val conditionName: String
val conditionValue: String?
//流程走到这,说明缓存已经过期了
//添加请求头:If-Modified-Since或者If-None-Match
//etag与If-None-Match配合使用
//lastModified与If-Modified-Since配合使用
//前者和后者的值是相同的
//区别在于前者是响应头,后者是请求头。
//后者用于服务器进行资源比对,看看是资源是否改变了。
// 如果没有,则本地的资源虽过期还是可以用的
when {
etag != null -> {
conditionName = "If-None-Match"
conditionValue = etag
}
lastModified != null -> {
conditionName = "If-Modified-Since"
conditionValue = lastModifiedString
}
servedDate != null -> {
conditionName = "If-Modified-Since"
conditionValue = servedDateString
}
else -> return CacheStrategy(request, null) // No condition! Make a regular request.
}
val conditionalRequestHeaders = request.headers.newBuilder()
conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
val conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build()
return CacheStrategy(conditionalRequest, cacheResponse)
}
从上述CatchStragety.computeCandidate()方法可知,缓存策略如下:
1. 没有缓存,直接网络请求;
2. 如果是Https,没有进行握手,则进行网络请求;
3. 设置了不可缓存,则进行网络请求;
4. 请求头nocache或者请求头包含If-Modified-Since或者If-None-Match,则需要服务器验证本地缓存是不是还能继续使用,进行网络请求;
5. 可以缓存,并且缓存过期过期了但是还可以使用,这时给响应头添加Warning后,使用缓存;
6. 缓存已经过期,添加请求头:If-Modified-Since或者If-None-Match,进行网络请求;
整个CatcheIncepter的执行依靠CatchStragety的缓存策略,代码中添加了注释,这里整理下流程如下:
1. 如果网络不可用并且无可用的有效缓存,则返回504错误;
2. 如果禁止了网络请求,则直接使用缓存;
3. 如果没有缓存且网络请求可用,则进行网络请求;
4. 如果此时有缓存,并且网络请求返回HTTP_NOT_MODIFIED(304),说明缓存还是有效的,则合并网络响应和缓存结果。同时更新缓存;
5. 如果没有缓存,则将请求回来的结果写入新的缓存中;
6. 返回响应数据。
可以看到,缓存的获取、添加、更新等操作都是在Catche中初始化了一个DiskLruCache来完成的,具体方法如下:
//获取缓存
internal fun get(request: Request): Response? {
val key = key(request.url)
val snapshot: DiskLruCache.Snapshot = try {
cache[key] ?: return null
} catch (_: IOException) {
return null // Give up because the cache cannot be read.
}
val entry: Entry = try {
Entry(snapshot.getSource(ENTRY_METADATA))
} catch (_: IOException) {
snapshot.closeQuietly()
return null
}
val response = entry.response(snapshot)
if (!entry.matches(request, response)) {
response.body?.closeQuietly()
return null
}
return response
}
//添加缓存
internal fun put(response: Response): CacheRequest? {
val requestMethod = response.request.method
if (HttpMethod.invalidatesCache(response.request.method)) {
try {
remove(response.request)
} catch (_: IOException) {
// The cache cannot be written.
}
return null
}
if (requestMethod != "GET") {
// Don't cache non-GET responses. We're technically allowed to cache HEAD requests and some
// POST requests, but the complexity of doing so is high and the benefit is low.
return null
}
if (response.hasVaryAll()) {
return null
}
val entry = Entry(response)
var editor: DiskLruCache.Editor? = null
try {
editor = cache.edit(key(response.request.url)) ?: return null
entry.writeTo(editor)
return RealCacheRequest(editor)
} catch (_: IOException) {
abortQuietly(editor)
return null
}
}
//更新缓存
internal fun update(cached: Response, network: Response) {
val entry = Entry(network)
val snapshot = (cached.body as CacheResponseBody).snapshot
var editor: DiskLruCache.Editor? = null
try {
editor = snapshot.edit() ?: return // edit() returns null if snapshot is not current.
entry.writeTo(editor)
editor.commit()
} catch (_: IOException) {
abortQuietly(editor)
}
}
4. ConnectInterceptor
这个拦截器打开与目标服务器的链接并进入下一个拦截器。
通过RealCall
的initExchange(chain)
创建一个Exchange
对象,并调用 Chain.proceed()
方法。
initExchange()
方法中会先通过 ExchangeFinder
尝试去 RealConnectionPool
中寻找已存在的连接,未找到则会重新创建一个RealConnection
并开始连接,然后将其存入RealConnectionPool
,现在已经准备好了RealConnection
对象,然后通过请求协议创建不同的ExchangeCodec
并返回,返回的ExchangeCodec
正是创建Exchange对象的一个参数。
- 下面说一下在建立连接过程中涉及到的几个重要类:
Route:
是连接到服务器的具体路由。其中包含了 IP 地址、端口、代理等参数。
由于存在代理或者 DNS 可能返回多个 IP 地址的情况,所以同一个接口地址可能会对应多个 route
。
在创建 Connection 时将会使用 Route 而不是直接用 IP 地址。
RouteSelector:
路由选择器,其中存储了所有可用的 route
,在准备连接时时会通过 RouteSelector.next()
方法获取下一个 Route
。
值得注意的是,RouteSelector
中包含了一个 routeDatabase
对象,其中存放着连接失败的Route
,RouteSelector
会将其中存储的上次连接失败的route
放在最后,以此提高连接速度。
RealConnection:
RealConnection
实现了 Connection
接口,其中使用 Socket
建立HTTP/HTTPS
连接,并且获取 I/O 流,同一个 Connection
可能会承载多个 HTTP 的请求与响应。
RealConnectionPool:
这是用来存储 RealConnection
的池子,内部使用一个双端队列来进行存储。
在 OkHttp 中,一个连接(RealConnection)用完后不会立马被关闭并释放掉,而且是会存储到连接池(RealConnectionPool)中。
除了缓存连接外,缓存池还负责定期清理过期的连接,在 RealConnection 中会维护一个用来描述该连接空闲时间的字段,每添加一个新的连接到连接池中时都会进行一次检测,遍历所有的连接,找出当前未被使用且空闲时间最长的那个连接,如果该连接空闲时长超出阈值,或者连接池已满,将会关闭该连接。
ExchangeCodec:
ExchangeCodec
负责对Request
编码及解码 Response
,也就是写入请求及读取响应,我们的请求及响应数据都通过它来读写。其实现类有两个:Http1ExchangeCodec
及 Http2ExchangeCodec
,分别对应两种协议版本。
Exchange:
功能类似 ExchangeCodec
,但它是对应的是单个请求,其在 ExchangeCodec
基础上担负了一些连接管理及事件分发的作用。
具体而言,Exchange
与 Request
一一对应,新建一个请求时就会创建一个 Exchange
,该 Exchange
负责将这个请求发送出去并读取到响应数据,而发送与接收数据使用的是 ExchangeCodec
。
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
//RealCall 中的initExchange()初始化Exchange对象
/** Finds a new or pooled connection to carry a forthcoming request and response. */
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(connectionPool) {
check(!noMoreExchanges) { "released" }
check(exchange == null)
}
val codec = exchangeFinder!!.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder!!, codec)
this.interceptorScopedExchange = result
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
//找到可用的resultConnection后根据协议创建ExchangeCodec并返回
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
//ExchangeFinder的findConnection方法中找已经存在的可用的链接
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
…………
// Attempt to get a connection from the pool.
//从connectPool中找可用的链接并返回
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
foundPooledConnection = true
result = call.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
}
}
}
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result!!
}
…………
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
//创建一个新的RealConnection
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = call.connection
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute
} else {
connectionPool.put(result!!)//将新创建的RealConnection添加到connectPool中
call.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
5. CallServerInterceptor
这是OkHttp 的连接器链中的最后一个拦截器,负责利用exchange把Request中的数据发送给服务端,并获取到数据写入到Response中。
到这里,OkHttp框架的核心逻辑已经梳理完了,回顾一下整体的架构实现,用到的设计模式有:Builder模式(OKHttpClient的构建)、工厂方法模式(Call接口提供了内部接口Factory、责任链模式(拦截器链)、享元模式(在Dispatcher的线程池)、策略模式(CacheInterceptor中数据选择等。
参考资源
- OkHttp 4.6源码
- okhttp源码解析
- OkHttp 源码分析
- 感谢您阅读这篇文章,若有不正确的地方,欢迎指正!