探索Android开源框架 - 1. OkHttp源码解析
2021-09-14 本文已影响0人
今阳说
使用
1. 添加依赖
implementation 'com.squareup.okhttp3:okhttp:3.14.9'
2. 常用请求方法
1. 同步GET请求
- 执行请求的操作是阻塞式的,直到http响应返回
1. 创建OkHttpClient对象
- 直接创建
val client = OkHttpClient()
- 通过Builder模式创建
val client = OkHttpClient.Builder()
.build()
2. 创建Request对象
val request = Request.Builder()
.url("https://www.baidu.com")
.get()
.build()
3. 将request封装成call对象
val call = client.newCall(request)
4. 调用call.execute发送同步请求
val response = call.execute()
if (response.isSuccessful) {
log(response.body()?.string())
} else {
log(IOException("Unexpected code $response").message)
}
- 注意:需要在子线程调用,发送请求后,当前线程就会进入阻塞状态,直到收到响应
lifecycleScope.launch {
withContext(Dispatchers.IO) {
getSync()
}
}
- 别忘了添加网络请求权限
<uses-permission android:name="android.permission.INTERNET" />
- 如果是非https请求,可能会报错:java.net.UnknownServiceException: CLEARTEXT communication to。。。
- CLEARTEXT,就是明文的意思,在Android P系统的设备上,如果应用使用的是非加密的明文流量的http网络请求,则会导致该应用无法进行网络请求,
https则不会受影响,同样地,如果应用嵌套了webView,webView也只能使用https请求; - 解决该异常需要改为https请求,或者在 AndroidManifest.xml文件的Application标签中加入android:usesCleartextTraffic="true"
2. 异步get请求
- 执行请求的操作是非阻塞式的,执行结果通过接口回调方式告知调用者
- 前三步是一样的,第四步调用异步方法 call.enqueue
val client = OkHttpClient()
val request = Request.Builder()
.url("https://www.baidu.com")
.get()
.build()
val call = client.newCall(request)
//调用异步方法enqueue
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
log("onFailure:${e.message}")
runOnUiThread { tv.text = e.message }
}
override fun onResponse(call: Call, response: Response) {
val result = response.body()?.string()
log("onResponse:${result}")
runOnUiThread { tv.text = "onResponse${result}" }
}
})
- 注意:回调方法onResponse,onFailure是在 子线程/工作线程 中执行的, 所以onResponse中使用了runOnUiThread来更新UI;
3. 异步POST请求提交键值对
- 多了一步创建FormBody,为POST请求的参数
val client = OkHttpClient()
//创建FormBody
val formBody = FormBody.Builder()
.add("k", "wanAndroid")
.build()
val request = Request.Builder()
.url("https://www.wanandroid.com/article/query/0/json")
.post(formBody)
.build()
val call = client.newCall(request)
call.enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
log("onFailure:${e.message}")
runOnUiThread { tv.text = e.message }
}
override fun onResponse(call: Call, response: Response) {
val result = response.body()?.string()
log("onResponse:${result}")
runOnUiThread { tv.text = "onResponse${result}" }
}
})
4. Post方式提交流(上传文件)
private fun postFile() {
val client = OkHttpClient()
//获取要上传的文件
val file=File(externalCacheDir,"ljy.txt")
//创建RequestBody:
val requestBody=RequestBody.create(
MediaType.parse("text/x-markdown; charset=utf-8"),
file
)
val request=Request.Builder()
.url("https://api.github.com/markdown/raw")
.post(requestBody)
.build()
client.newCall(request).enqueue(object : Callback{
override fun onFailure(call: Call, e: IOException) {
log("onFailure:${e.message}")
}
override fun onResponse(call: Call, response: Response) {
log("onResponse:${ response.body()?.string()}")
}
})
}
- 需要在AndroidManifest.xml中添加读写权限, 和运行时权限申请
<uses-permission android:name="android.permission.READ_EXTERNAL_STORAGE"/>
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
if (ActivityCompat.checkSelfPermission(this, Manifest.permission.WRITE_EXTERNAL_STORAGE) != PackageManager.PERMISSION_GRANTED
&& ActivityCompat.checkSelfPermission(this, Manifest.permission.READ_EXTERNAL_STORAGE) != PackageManager.PERMISSION_GRANTED
) {
this@MainActivity.requestPermissions(arrayOf(
Manifest.permission.WRITE_EXTERNAL_STORAGE,
Manifest.permission.READ_EXTERNAL_STORAGE), 10001)
} else {
...
}
override fun onRequestPermissionsResult(requestCode: Int, permissions: Array<out String>, grantResults: IntArray
) {
super.onRequestPermissionsResult(requestCode, permissions, grantResults)
if (requestCode == 10001) {
...
}
}
5. 异步下载文件
private fun downloadFile() {
val client = OkHttpClient()
val url = "https://pic3.zhimg.com/v2-dc32dcddfd7e78e56cc4b6f689a24979_xl.jpg"
val request = Request.Builder()
.url(url)
.build()
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
log("onFailure:${e.message}")
}
override fun onResponse(call: Call, response: Response) {
val inputStream = response.body()?.byteStream()
val fileOutputStream = FileOutputStream(File(externalCacheDir, "ljy.jpg"))
val buffer = ByteArray(2048)
var len: Int
while (inputStream?.read(buffer).also { len = it ?: -1 } != -1) {
fileOutputStream.write(buffer, 0, len)
}
fileOutputStream.flush()
log("文件下载成功")
}
})
}
6. Post提交表单
- 有时会上传文件同时还需要传其他类型的字段
private fun sendMultipart() {
val client = OkHttpClient()
val file = File(externalCacheDir, "ljy.jpg")
val requestBody: RequestBody = MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("name", "ljy")
.addFormDataPart("age", "18")
.addFormDataPart(
"image", "header.jpg",
RequestBody.create(MediaType.parse("image/png"), file)
)
.build()
val request: Request = Request.Builder()
.header("Authorization", "Client-ID " + "...")
.url("https://api.imgur.com/3/image")
.post(requestBody)
.build()
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
log("onFailure:${e.message}")
}
override fun onResponse(call: Call, response: Response) {
log("onResponse:${response.body()?.string()}")
}
})
}
常用设置
1. 设置超时时间
val client = OkHttpClient.Builder()
.connectTimeout(30,TimeUnit.SECONDS)
.readTimeout(60,TimeUnit.SECONDS)
.writeTimeout(90,TimeUnit.SECONDS)
.build()
2. 设置缓存
//设置缓存路径和大小,及缓存拦截器
val client = OkHttpClient.Builder()
.addNetworkInterceptor(CacheInterceptor())
.cache(
Cache(
File(cacheDir, "httpCache2"),
100 * 1024 * 1024L
)
).build()
//缓存拦截器
class CacheInterceptor : Interceptor {
override fun intercept(chain: Interceptor.Chain): Response {
var request: Request = chain.request()
val var10000: Response
val response: Response
if (NetUtil.isNetworkAvailable(this@OkHttpDemoActivity)) {
//如果有网,返回一个30内有效的响应,则30秒内同一请求会直接从缓存中读取
response = chain.proceed(request)
//构建maxAge = 30秒的CacheControl
val cacheControl = CacheControl.Builder()
.maxAge(30, TimeUnit.SECONDS)
.build()
.toString()
var10000 = response.newBuilder()
.removeHeader("Pragma")
.removeHeader("Cache-Control") //填入30秒的CacheControl
.header("Cache-Control", cacheControl)
.build()
} else {
//如果没网,用原来的请求重新构建一个强制从缓存中读取的请求
request = request.newBuilder()
.cacheControl(CacheControl.FORCE_CACHE)
.build()
var10000 = chain.proceed(request)
}
return var10000
}
}
- OkHttpClient.cache的入参Cache构造函数如下:
public Cache(File directory, long maxSize) {
this(directory, maxSize, FileSystem.SYSTEM);
}
Cache(File directory, long maxSize, FileSystem fileSystem) {
this.cache = DiskLruCache.create(fileSystem, directory, VERSION, ENTRY_COUNT, maxSize);
}
- 可以看到也是用的DiskLruCache;
3. 设置失败重试
val client = OkHttpClient.Builder()
.retryOnConnectionFailure(true)
.build()
4. 持久化cookie
//添加三方库依赖
implementation 'com.zhy:okhttputils:2.6.2'
//持久化cookie,保持session会话:
val cookieJar = new CookieJarImpl(new PersistentCookieStore(CommonModule.getAppContext()))
val client = OkHttpClient.Builder()
.cookieJar(cookieJar)
.build()
源码解析
Request
- Request.Builder()构造方法如下,method默认是GET
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
//Request构造方法
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tags = Util.immutableMap(builder.tags);
}
- Request.BUilder的post方法如下:
public Builder post(RequestBody body) {
return method("POST", body);
}
public Builder method(String method, @Nullable RequestBody body) {
if (method == null) throw new NullPointerException("method == null");
if (method.length() == 0) throw new IllegalArgumentException("method.length() == 0");
if (body != null && !HttpMethod.permitsRequestBody(method)) {
throw new IllegalArgumentException("method " + method + " must not have a request body.");
}
if (body == null && HttpMethod.requiresRequestBody(method)) {
throw new IllegalArgumentException("method " + method + " must have a request body.");
}
this.method = method;
this.body = body;
return this;
}
OkHttpClient
- OkHttpClient构造方法实现如下:
public OkHttpClient() {
this(new Builder());
}
// builder的构造方法中提供了默认值:
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
callTimeout = 0;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
public OkHttpClient build() {
return new OkHttpClient(this);
}
- OkHttpClient.newCall
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
//其内部调用的RealCall.newRealCall:
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
//RealCall构造方法如下:
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
}
Call
- Call.execute同步请求方法源码
// Call的方法要看RealCall中的实现,dispatcher主要负责保存和移除同步请求
@Override public Response execute() throws IOException {
//判断executed,确保同一个HTTP请求只执行一次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.timeoutEnter();
transmitter.callStart();
try {
//调用dispatcher的executed将请求加入到同步请求队列中
client.dispatcher().executed(this);
//通过拦截器链获取response
return getResponseWithInterceptorChain();
} finally {
//回收同步请求
client.dispatcher().finished(this);
}
}
- Call.enqueue 异步请求方法源码
//RealCall中的实现:
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
//确保call只执行一次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
- 可以看到他们都调用了dispatcher的方法
Dispatcher任务调度
- 用于控制并发的请求,主要维护了以下变量
/**
最大并发请求数
*/
private int maxRequests = 64;
/**
每个主机最大请求数
*/
private int maxRequestsPerHost = 5;
/**
消费者线程池
*/
private ExecutorService executorService;
/**
将要运行的异步请求队列
*/
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/**
正在运行的异步请求队列
*/
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/**
正在运行的同步请求队列
*/
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
- dispatcher的executed方法如下:
synchronized void executed(RealCall call) {
//将请求加入到同步请求队列中
runningSyncCalls.add(call);
}
- dispatcher().finished用于回收同步请求,实现如下:
void finished(RealCall call) {
finished(runningSyncCalls, call);
}
private <T> void finished(Deque<T> calls, T call) {
Runnable idleCallback;
synchronized (this) {
//移除同步请求
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
idleCallback = this.idleCallback;
}
boolean isRunning = promoteAndExecute();
if (!isRunning && idleCallback != null) {
idleCallback.run();
}
}
- dispatcher的enqueue方法如下:
void enqueue(AsyncCall call) {
synchronized (this) {
//将请求加入到准备好的异步请求队列中
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
//通过host查找已经存在的Call
AsyncCall existingCall = findExistingCallWithHost(call.host());
//如果存在则复用callsPerHost
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
- 其入参AsyncCall是RealCall的内部类,构造函数入参就是我们传入的callback,并在execute方法中调用callback,而在NamedRunnable的run中调用了execute方法
final class AsyncCall extends NamedRunnable {
...
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} catch (Throwable t) {
cancel();
if (!signalledCallback) {
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
responseCallback.onFailure(RealCall.this, canceledException);
}
throw t;
} finally {
client.dispatcher().finished(this);
}
}
}
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
- 上面AsyncCall的execute中,在最后的finally中也调用了finished用于回收异步请求
void finished(AsyncCall call) {
call.callsPerHost().decrementAndGet();
finished(runningAsyncCalls, call);
}
- finished和异步中都调用了promoteAndExecute方法,其实现如下
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
//遍历准备好的异步请求队列,放到可执行的list和正在运行的队列中:
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
//重新计算待执行的同步异步请求数量
isRunning = runningCallsCount() > 0;
}
//遍历可执行的AsyncCall list,调用executeOn方法传入线程池执行
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
public synchronized int runningCallsCount() {
return runningAsyncCalls.size() + runningSyncCalls.size();
}
异步请求的调用顺序:
- 使用者调用Call.enqueue(Callback);
- Call.enqueue中调用了client.dispatcher().enqueue(new AsyncCall(responseCallback));
- dispatcher().enqueue调用promoteAndExecute;
- promoteAndExecute中会遍历readyAsyncCalls,放到executableCalls和runningAsyncCalls中,并调用runningCallsCount重新计算待执行的同步异步请求数量,然后遍历executableCalls,调用 asyncCall.executeOn(executorService());
- asyncCall.executeOn中调用executorService.execute(this),其中this为runnable类型的asyncCall,最后会调用其run方法;
- NamedRunnable的run方法中调用了execute方法,asyncCall中实现了execute方法;
- asyncCall.execute中调用了 Response response = getResponseWithInterceptorChain(),并调用callback,最终调用dispatcher().finished;
- dispatcher().finished中又调用了promoteAndExecute方法,直到队列中的请求都执行完毕;
拦截器链
- 拦截器是okhttp中一个强大的机制,可以实现网络监听,请求及响应重写,请求失败重试等功能;
- 上面的同步请求异步请求源码中都有调用getResponseWithInterceptorChain方法,其代码如下
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
//创建一系列拦截器,并放入list中
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
//1. 重试和失败重定向拦截器
interceptors.add(new RetryAndFollowUpInterceptor(client));
//2. 桥接适配拦截器(如补充请求头,编码方式,压缩方式)
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//3. 缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//4. 连接拦截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//5. 网络io流拦截器
interceptors.add(new CallServerInterceptor(forWebSocket));
//创建拦截器链chain,并执行chain.proceed方法
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
- 上方法创建一系列拦截器,并放入list中,再创建拦截器链RealInterceptorChain,并执行chain.proceed方法
- proceed方法实现如下:
@Override public Response proceed(Request request) throws IOException {
return proceed(request, transmitter, exchange);
}
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
...
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
...
return response;
}
- 其核心代码九心上面几行,创建下一个拦截器链,调用interceptors.get(index)取得当前拦截器,并执行interceptor.intercept方法得到response返回;
- getResponseWithInterceptorChain中传入的index为0,则当前拦截器就是RetryAndFollowUpInterceptor,那么我们来看看他的intercept方法是如何实现的
RetryAndFollowUpInterceptor
- RetryAndFollowUpInterceptor的intercept方法代码如下
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
//还记得interceptor.intercept(next)么,所以这里的realChain是下一个拦截器链
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Transmitter transmitter = realChain.transmitter();
int followUpCount = 0;
Response priorResponse = null;
while (true) {
transmitter.prepareToConnect(request);
if (transmitter.isCanceled()) {
throw new IOException("Canceled");
}
Response response;
boolean success = false;
try {
//调用下一个拦截器链的proceed方法
response = realChain.proceed(request, transmitter, null);
success = true;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), transmitter, false, request)) {
throw e.getFirstConnectException();
}
continue;
//当发生IOException或者RouteException时会执行recover方法
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, transmitter, requestSendStarted, request)) throw e;
continue;
} finally {
// The network call threw an exception. Release any resources.
if (!success) {
transmitter.exchangeDoneDueToException();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Exchange exchange = Internal.instance.exchange(response);
Route route = exchange != null ? exchange.connection().route() : null;
Request followUp = followUpRequest(response, route);
if (followUp == null) {
if (exchange != null && exchange.isDuplex()) {
transmitter.timeoutEarlyExit();
}
return response;
}
RequestBody followUpBody = followUp.body();
if (followUpBody != null && followUpBody.isOneShot()) {
return response;
}
closeQuietly(response.body());
if (transmitter.hasExchange()) {
exchange.detachWithViolence();
}
//重试次数判断
if (++followUpCount > MAX_FOLLOW_UPS) {
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
request = followUp;
priorResponse = response;
}
}
- recover方法代码如下
private boolean recover(IOException e, Transmitter transmitter,
boolean requestSendStarted, Request userRequest) {
// The application layer has forbidden retries.
if (!client.retryOnConnectionFailure()) return false;
// We can't send the request body again.
if (requestSendStarted && requestIsOneShot(e, userRequest)) return false;
// This exception is fatal.
if (!isRecoverable(e, requestSendStarted)) return false;
// No more routes to attempt.
if (!transmitter.canRetry()) return false;
// For failure recovery, use the same route selector with a new connection.
return true;
}
- RetryAndFollowUpInterceptor的intercept方法中调用下一个拦截器链的proceed方法获取response,
并在while (true) 循环中根据异常结果或响应结果判断是否要进行重新请求,如当发生IOException或者RouteException时会执行recover方法,
并且通过++followUpCount > MAX_FOLLOW_UPS判断最大重试次数,超出则直接跳出循环; - 由RealInterceptorChain.proceed可知会继续调用下一个拦截器的intercept方法,由getResponseWithInterceptorChain中顺序可知下一个拦截器就是BridgeInterceptor
- 那么来继续看一下BridgeInterceptor的intercept方法
BridgeInterceptor
- BridgeInterceptor的intercept方法如下
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
//补充RequestBody的请求头
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//调用下一个拦截器链的proceed方法
Response networkResponse = chain.proceed(requestBuilder.build());
//补充响应头
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
//Response.body的输入流转为GzipSource,以解压的方式读取流数据
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
- BridgeInterceptor的intercept中,先各种判断对RequestBody的请求头进行补充,将其转化为能够进行网络访问的请求,然后调用下一个拦截器链的proceed方法获取response,再对respone的响应头进行补充,如设置cookieJar,gzip解压,将请求回来的响应response转化为用户可用的response;
- 调用下一个拦截器链的proceed,又会调用下一个拦截器的intercept方法,下一个拦截器为CacheInterceptor
CacheInterceptor
- CacheInterceptor的intercept方法如下
@Override public Response intercept(Chain chain) throws IOException {
//尝试获取缓存的Response
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//CacheStrategy缓存策略,维护了networkRequest 和 cacheResponse
//根据时间获取缓存策略,其内部会结合时间等条件返回对应的缓存测试略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
//如果禁止网络访问又没有缓存,则直接new一个失败的Response
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
//如果不需要网络访问,则直接返回缓存的response
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
//需要网络访问,则调用下一个拦截器链的proceed获取response
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
//如果我们本地有缓存的Response
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
//服务器返回304,则直接返回本地缓存的response
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//如果有缓存,则对缓存进行更新
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
//如果不是get请求则移除缓存
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
- CacheInterceptor的intercept中对用不用缓存和对缓存是否更新进行了各种判断,如果用网络请求也会调用下一个拦截器链的proceed方法获取response,
- 那么下一个拦截器就是ConnectInterceptor
ConnectInterceptor
- ConnectInterceptor的intercept方法如下, 正式开启okhttp的网络请求
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
- 上面调用transmitter.newExchange获取Exchange,并调用下一个拦截器链的proceed传给下一个拦截器,获取response,newExchange方法如下
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
...
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
...
}
- 上面调用了exchangeFinder.find获取ExchangeCodec, 其中通过findHealthyConnection得到RealConnection,再return RealConnection.newCode
public ExchangeCodec find(OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
...
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
...
}
- findHealthyConnection又调用了findConnection方法, findConnection方法代码如下, 其中通过连接池或 new RealConnection获取RealConnection,并调用了其connect方法
- 源码很长,下面只是列出了关键步骤
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
...
//先从连接池中取
if (result == null) {
// Attempt to get a connection from the pool.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
...
}
...
//连接池中有就不用继续搞了
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
...
//连接池没有就new一个,并调用connect方法
result = new RealConnection(connectionPool, selectedRoute);
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
//添加到连接池中
connectionPool.routeDatabase.connected(result.route());
...
return result;
}
CallServerInterceptor
- 最后来看看CallServerInterceptor的intercept
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Exchange exchange = realChain.exchange();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
//向socket中写入请求头信息
exchange.writeRequestHeaders(request);
boolean responseHeadersStarted = false;
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
exchange.flushRequest();
responseHeadersStarted = true;
exchange.responseHeadersStart();
responseBuilder = exchange.readResponseHeaders(true);
}
if (responseBuilder == null) {
if (request.body().isDuplex()) {
// Prepare a duplex body so that the application can send a request body later.
exchange.flushRequest();
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, true));
//写入body信息
request.body().writeTo(bufferedRequestBody);
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
BufferedSink bufferedRequestBody = Okio.buffer(
exchange.createRequestBody(request, false));
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
} else {
exchange.noRequestBody();
if (!exchange.connection().isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
exchange.noNewExchangesOnConnection();
}
}
} else {
exchange.noRequestBody();
}
//请求结束
if (request.body() == null || !request.body().isDuplex()) {
exchange.finishRequest();
}
if (!responseHeadersStarted) {
exchange.responseHeadersStart();
}
//读取响应头
if (responseBuilder == null) {
responseBuilder = exchange.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
response = exchange.readResponseHeaders(false)
.request(request)
.handshake(exchange.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
exchange.responseHeadersEnd(response);
//读取响应body
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(exchange.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
exchange.noNewExchangesOnConnection();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}