OkHttp3源码解析-线程池
因为最近的一个项目用到了Retrofit2,所以也就用到了OkHttp3 ,因为Retrofit2 只支持OKHttp3了,所以除了研究Retrofit2以外还研究了OKhttp3的原理,然后在这做了记录。
1、OKHttp3的基本使用
这里已GET请求为例,
异步请求
Request.Builder requestBuilder = new Request.Builder().url("http://www.baidu.com");
Request request = requestBuilder.build();
Call mcall= mOkHttpClient.newCall(request);
mcall.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
response.body();
}
});
}
可以看出最后通过调用Call 的enqueue方法执行网络请求,这就是执行入口,所以源码分析从这里开始。
2、源码分析
Call 对象生成
@Override public Call newCall(Request request) {
return new RealCall(this, request);
}
最后生成了RealCall,所以Call 的enqueue方法,最后就会执行RealCall的enqueue方法。
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
//执行请求的地方
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
这里就要继续看client.dispatcher()以及enqueue方法了。client就是全局设置的OkHttpClient单例,而dispatcher()则获取请求分发器,返回的是Dispatcher类的实例。因为Dispatcher是final的,所以不能修改。然后追踪到Dispatcher的enqueue方法。
//最大同时请求数
private int maxRequests = 64;
//每个IP最多请求数
private int maxRequestsPerHost = 5;
synchronized void enqueue(AsyncCall call) {
//从判断条件可以看出,OKHttp3同时最多有64个请求执行,而每一个host最多有5个请求同时执行。其他请求都放在等待队列中。
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
executorService方法生成线程池实例,不知道为什么要直接方法同步,而没有使用双重校验。应该是在这性能基本没影响。
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
我上一篇博客说过ThreadPoolExecutor线程池,可以看出这个线程池,没有常存的核心线程,最多线程数为Integer.MAX_VALUE,线程空闲时存活时间为60秒,而SynchronousQueue是不保存任务的,所以只要把任务添加进去就会执行。而 SynchronousQueue的特性如下:
原博客引用地址SynchronousQueue:同步Queue,属于线程安全的BlockingQueue的一种,此队列设计的理念类似于"单工模式",对于每个put/offer操作,必须等待一个take/poll操作,类似于我们的现实生活中的"火把传递":一个火把传递地他人,需要2个人"触手可及"才行. 因为这种策略,最终导致队列中并没有一个真正的元素;这是一种pipleline思路的基于queue的"操作传递".
void put(E o):向队列提交一个元素,阻塞直到其他线程take或者poll此元素.
boolean offer(E o):向队列中提交一个元素,如果此时有其他线程正在被take阻塞(即其他线程已准备接收)或者"碰巧"有poll操作,那么将返回true,否则返回false。
E take():获取并删除一个元素,阻塞直到有其他线程offer/put.
boolean poll():获取并删除一个元素,如果此时有其他线程正在被put阻塞(即其他线程提交元素正等待被接收)或者"碰巧"有offer操作,那么将返回true,否则返回false.
既然是线程池执行线程,那我们也就知道会执行Runnable的run方法,那么最后会执行AsyncCall的run方法
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
private AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl().toString());
this.responseCallback = responseCallback;
}
.....
省略
.....
@Override protected void execute() {
boolean signalledCallback = false;
try {
//执行网络请求,获取请求结果
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
//异常回调
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最后一定会执行
client.dispatcher().finished(this);
}
}
}
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
//为线程设置名字
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
//实现请求任务的地方
protected abstract void execute();
}
所以OKHttp不是在线程池中维护线程的个数,线程是一直在Dispatcher中直接控制。线程池中的请求都是运行中的请求。这也就是说线程的重用不是线程池控制的,那么线程的重用是在什么地方呢?如何知道线程池怎么重用的,想到这就应该有个基本的思路:就是上个请求结束的地方肯定是那个请求开始的地方,也就是线程重用的地方。所以找到线程重用的地方。
该方法为AsyncCall的execute方法,
@Override protected void execute() {
.....
省略
.....
finally{
//最后一定执行
client.dispatcher().finished(this);
}
}
前面已经分析client.dispatcher()返回的就是Dispatcher实例,所以看Dispatcher的finished的方法。
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//call为要结束的请求,calls为runningSyncCalls,所以把当前请求移除运行中队列。
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//因为promoteCalls为true,所以执行promoteCalls方法。
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
最后找到了promoteCalls方法。
private void promoteCalls() {
//当前运行线程数大于最大允许线程数,不继续执行。
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
//等待允许请求队列为空,不继续执行。
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
//遍历所有等待的请求任务,找到相同host同时请求数小于maxRequestsPerHost的请求,执行该请求任务。
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//该任务必须是已经请求的host不多于maxRequestsPerHost的
if (runningCallsForHost(call) < maxRequestsPerHost) {
//remove方法,将该请求任务从等待队列中去除,同时执行该任务。
i.remove();
runningAsyncCalls.add(call);
//执行请求任务
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
这里就做到了线程的重用。