解刨OkHttp框架
继AsyncTask,又把手术刀指向OkHttp,有时候解析源码会上瘾。因为源码里包含的东西仿佛就是组成计算机世界的砖头,水分,只要有这些东西,就可以保罗万物,无招胜有招。又说多了,开始吧
首先okhttp的依赖是:
compile 'com.squareup.okhttp3:okhttp:3.8.1'
我就是根据这里的源码进行解析的。
再来也很简单,就是最简单的OkHttp的同步和异步网络访问:
OkHttpClient client = new OkHttpClient();
//同步网络访问
public String Synch(String url) throws IOException {
Request request = new Request.Builder().url(url).build();
Response response = client.newCall(request).execute();
return response.body().string();
}
//异步网络访问
public void Async(String url) throws IOException {
Request request = new Request.Builder().url(url).build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.e("response",response.body().string());
}
});
}
OkHttpClient
我们首先是定义OkHttpClient,通过看源码如下:
public OkHttpClient() {
this(new Builder());
}
public static final class Builder {
Dispatcher dispatcher;
@Nullable Proxy proxy;
List<Protocol> protocols;
List<ConnectionSpec> connectionSpecs;
final List<Interceptor> interceptors = new ArrayList<>();
final List<Interceptor> networkInterceptors = new ArrayList<>();
EventListener.Factory eventListenerFactory;
ProxySelector proxySelector;
CookieJar cookieJar;
@Nullable Cache cache;
@Nullable InternalCache internalCache;
SocketFactory socketFactory;
@Nullable SSLSocketFactory sslSocketFactory;
@Nullable CertificateChainCleaner certificateChainCleaner;
HostnameVerifier hostnameVerifier;
CertificatePinner certificatePinner;
Authenticator proxyAuthenticator;
Authenticator authenticator;
ConnectionPool connectionPool;
Dns dns;
boolean followSslRedirects;
boolean followRedirects;
boolean retryOnConnectionFailure;
int connectTimeout;
int readTimeout;
int writeTimeout;
int pingInterval;
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
}
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
this.proxy = builder.proxy;
this.protocols = builder.protocols;
this.connectionSpecs = builder.connectionSpecs;
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
this.eventListenerFactory = builder.eventListenerFactory;
this.proxySelector = builder.proxySelector;
this.cookieJar = builder.cookieJar;
this.cache = builder.cache;
this.internalCache = builder.internalCache;
this.socketFactory = builder.socketFactory;
boolean isTLS = false;
for (ConnectionSpec spec : connectionSpecs) {
isTLS = isTLS || spec.isTls();
}
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = systemDefaultTrustManager();
this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
this.hostnameVerifier = builder.hostnameVerifier;
this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
certificateChainCleaner);
this.proxyAuthenticator = builder.proxyAuthenticator;
this.authenticator = builder.authenticator;
this.connectionPool = builder.connectionPool;
this.dns = builder.dns;
this.followSslRedirects = builder.followSslRedirects;
this.followRedirects = builder.followRedirects;
this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
this.connectTimeout = builder.connectTimeout;
this.readTimeout = builder.readTimeout;
this.writeTimeout = builder.writeTimeout;
this.pingInterval = builder.pingInterval;
}
看出OkHttpClient主要是进行各种参数的初始化,通过builder对象进行初始化再赋值给OkHttpClient。第二种也可以用建造者模式,初始化OkHttpClient,如下:
OkHttpClient okHttpClient=new OkHttpClient.Builder().build();
注意事项:OkHttpClient强烈建议全局单例使用,因为每一个OkHttpClient都有自己单独的连接池和线程池,复用连接池和线程池能够减少延迟、节省内存。
Request
每一个HTTP请求包含一个URL、一个方法(GET或POST或其他)、一些HTTP头。请求还可能包含一个特定内容类型的数据类的主体部分。而
Request request = new Request.Builder().url(url).build();
和OkHttpClient一样,只是做这些东西的初始化。因为很简单,所以这就不像上面列源码细讲了。
同步网络访问
Request request = new Request.Builder().url(url).build();
Response response = client.newCall(request).execute();
response.body().string();
接着我们拿同步网络进行下去,OkHttpClient调用了newCall();代码如下:
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
然后调用了RealCall的execute()方法,方法源码如下:
@Override
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed= = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
首先executed=ture,确保每一个call对象只能使用一次原则,然后就调用了captureCallStackTrace(),源码如下:
private void captureCallStackTrace() {
Object callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()");
retryAndFollowUpInterceptor.setCallStackTrace(callStackTrace);
}
public Object getStackTraceForCloseable(String closer) {
if (logger.isLoggable(Level.FINE)) {
return new Throwable(closer); // These are expensive to allocate.
}
return null;
}
public final class RetryAndFollowUpInterceptor implements Interceptor {
public void setCallStackTrace(Object callStackTrace) {
this.callStackTrace = callStackTrace;
}
}
可以看出captureCallStackTrace什么也没有做,只是把一个对象传进retryAndFollowUpInterceptor,
其中他这个的作用就像他名字一样,就是一个堆栈跟踪,捕获了这个请求的StackTrace。
接着 client.dispatcher().executed(this); 对应的源码如下:
public final class Dispatcher {
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
}
他这也是把请求放在一个双向队列里。也没做什么操作。
然后就是重点了,可以说整个网络请求获取数据都是靠 Response result = getResponseWithInterceptorChain();这句代码,Response装载了所有的访问数据,而getResponseWithInterceptorChain()做了什么? 源码如下:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//添加开发者应用层自定义的Interceptor
interceptors.addAll(client.interceptors());
//这个Interceptor是处理请求失败的重试,重定向
interceptors.add(retryAndFollowUpInterceptor);
//这个Interceptor工作是添加一些请求的头部或其他信息
//并对返回的Response做一些友好的处理(有一些信息你可能并不需要)
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//这个Interceptor的职责是判断缓存是否存在,读取缓存,更新缓存等等
interceptors.add(new CacheInterceptor(client.internalCache()));
//这个Interceptor的职责是建立客户端和服务器的连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//添加开发者自定义的网络层拦截器
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
//一个包裹这request的chain
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
//把chain传递到第一个Interceptor手中
return chain.proceed(originalRequest);
}
看上面的代码,不断的添加各种拦截器,最后就创建RealInterceptorChain然后调用proceed(),先看一下RealInterceptorChain类,
public final class RealInterceptorChain implements Interceptor.Chain {
private final List<Interceptor> interceptors;
private final StreamAllocation streamAllocation;
private final HttpCodec httpCodec;
private final RealConnection connection;
private final int index;
private final Request request;
private int calls;
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
}
@Override public Connection connection() {
return connection;
}
public StreamAllocation streamAllocation() {
return streamAllocation;
}
public HttpCodec httpStream() {
return httpCodec;
}
@Override public Request request() {
return request;
}
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}
}
从proceed()看创建RealInterceptorChain对象时候httpCodec直接赋予了null,所以略过判断,然后调用了这个interceptor.intercept(next);方法,就是执行前面添加的拦截器的intercept()方法; 而每一个拦截器的intercept()又会调用下一个拦截器的proceed(),下一个拦截器的proceed()又调用这个拦截器的intercept(),由此类推一个一个往下调。最后一个返回结果,在一层一层向上返回.
如下面的关键代码,每个拦截器都有对应的代码一步步的调下一个拦截器。
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
@Nullable Connection connection();
}
}
RealInterceptorChain implements Interceptor.Chain{
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
}
}
public final class RetryAndFollowUpInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Response response = ((RealInterceptorChain) chain).proceed(request,streamAllocation,null,null);
}
}
public final class BridgeInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request.Builder requestBuilder = userRequest.newBuilder();
Response networkResponse = chain.proceed(requestBuilder.build());
}
}
public final class CacheInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Response networkResponse = null;
networkResponse = chain.proceed(networkRequest);
}
}
用别人的一张图就是这样:
流程图.PNG
各个拦截器形成拦截器链,OkHttp的这种拦截器链采用的是责任链模式,这样的好处是将请求的发送和处理分开,并且可以动态添加中间的处理方实现对请求的处理、短路等操作。
最后的client.dispatcher().finished(this);源码如下:
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
最后通过calls.remove(call),移走了对了,因为readyAsyncCalls,readyAsyncCalls都为空,所以promoteCalls()不会触发。
起始上面最核心的就是拦截器的责任链模式。是值得我们学习的。
异步网络访问
因为很多和同步是一样的,所以就讲关键代码:
client.dispatcher().enqueue(new AsyncCall(responseCallback));
dispatcher()方法的源码是
public Dispatcher dispatcher() {
return dispatcher;
}
没什么讲的,接着就是说重要的enqueue(),
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
private int maxRequestsPerHost = 5;
private int maxRequests = 64;
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
但异步请求数量小于65并且请求访问的域名小于5,就会添加到runningAsyncCalls队列中,然后executorService线程池去运行,否则就添加到readyAsyncCalls等待队列中,executorService具体是什么线程池呢,看如下源码:
private ExecutorService executorService;
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
这是一个阀值为Integer.MAX_VALUE,不保留任何核心线程,用多少创多少,最多只能存活60秒,他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。
接着我们传进executorService里的AsyncCall,源码如下:
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
AsyncCall继承了Runnable,就像我之前解析AsyncTask一样,先线程池先执行NamedRunnable的run()方法,中途再执行AsyncCall的execute()方法,而整个网络访问还是像我们同步访问那样, Response response = getResponseWithInterceptorChain();通过责任获取访问然后再接口回调,获取服务器返回的数据。
最后又是执行client.dispatcher().finished(this);先执行calls.remove(call)删除call,当异步的缓存队列readyAsyncCalls有缓存请求时且满足条件时,就会执行promoteCalls()方法里的代码,就是在readyAsyncCalls取出一个call,并把这个call放入runningAsyncCalls,然后执行execute.