OkHttp讲解(四)-链接池
一、 Interceptor 关联类分析
1.1、 StreamAllocation 的成员变量
简介
StreamAllocation是用来协调Connections、Streams和Calls这三个实体的。
- Connections:连接到远程服务器的物理套接字,这个套接字连接可能比较慢,所以它有一套取消机制。
- Streams:定义了逻辑上的HTTP请求/响应对,每个连接都定义了它们可以携带的最大并* 发流,HTTP/1.x每次只可以携带一个,HTTP/2每次可以携带多个。
Calls:定义了流的逻辑序列,这个序列通常是一个初始请求以及它的重定向请求,对于同一个连接,我们通常将所有流都放在一个调用中,以此来统一它们的行为。
HTTP通信 执行 网络请求Call 需要在 连接Connection 上建立一个新的 流Stream,我们将 StreamAllocation 称之 流 的桥梁,它负责为一次 请求 寻找 连接 并建立 流,从而完成远程通信。
public final Address address;//地址
private Route route; //路由
private final ConnectionPool connectionPool; //连接池
private final Object callStackTrace; //日志
// State guarded by connectionPool.
private final RouteSelector routeSelector; //路由选择器
private int refusedStreamCount; //拒绝的次数
private RealConnection connection; //连接
private boolean released; //是否已经被释放
private boolean canceled //是否被取消了
1.2、 RealConnection
- okhttp是底层实现框架,与httpURLconnection是同一级别的。OKHttp底层建立网络连接的关键就是RealConnection类。RealConnection类底层封装socket,是真正的创建连接者。分析这个类之后就明白了OKHttp与httpURLconnection的本质不同点。
- RealConnection是Connection的实现类,代表着链接socket的链路,如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路,而且通过RealConnection代表是连接socket链路,RealConnection对象意味着我们已经跟服务端有了一条通信链路了,在这个类里面实现的三次握手。
- 在OKHttp里面,记录一次连接的是RealConnection,这个负责连接,在这个类里面用socket来连接,用HandShake来处理握手。
//链接的线程池
private final ConnectionPool connectionPool;
private final Route route;
//下面这些字段,通过connect()方法开始初始化,并且绝对不会再次赋值
/** The low-level TCP socket. */
private Socket rawSocket; //底层socket
private Socket socket; //应用层socket
//握手
private Handshake handshake;
//协议
private Protocol protocol;
// http2的链接
private Http2Connection http2Connection;
//通过source和sink,大家可以猜到是与服务器交互的输入输出流
private BufferedSource source;
private BufferedSink sink;
//下面这个字段是 属于表示链接状态的字段,并且有connectPool统一管理
//如果noNewStreams被设为true,则noNewStreams一直为true,不会被改变,并且表示这个链接不会再创新的stream流
public boolean noNewStreams;
//成功的次数
public int successCount;
//此链接可以承载最大并发流的限制,如果不超过限制,可以随意增加
public int allocationLimit = 1;
RealConnection的connect方法,connect()里面进行了三次握手
public void connect(。。。) {
//如果协议不等于null,抛出一个异常
if (protocol != null) throw new IllegalStateException("already connected");
。。 省略部分代码。。。。
while (true) {//一个while循环
//如果是https请求并且使用了http代理服务器
if (route.requiresTunnel()) {
connectTunnel(...);
} else {//
//直接打开socket链接
connectSocket(connectTimeout, readTimeout);
}
//建立协议
establishProtocol(connectionSpecSelector);
break;//跳出while循环
。。省略部分代码。。。
}
//当前route的请求是https并且使用了Proxy.Type.HTTP代理
public boolean requiresTunnel() {
return address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP;
}
普通连接的建立过程为建立TCP连接,建立TCP连接的过程为
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
//根据代理类型来选择socket类型,是代理还是直连
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
//连接socket
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
throw new ConnectException("Failed to connect to " + route.socketAddress());
}
source = Okio.buffer(Okio.source(rawSocket));//从socket中获取source 对象。
sink = Okio.buffer(Okio.sink(rawSocket));//从socket中获取sink 对象。
}
Okio.source(rawSocket)
和Okio.sink(rawSocket)
的原码
public static Source source(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Source source = source(socket.getInputStream(), timeout);
return timeout.source(source);
}
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
建立隧道连接的过程
private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
throws IOException {
//1、创建隧道请求对象
Request tunnelRequest = createTunnelRequest();
HttpUrl url = tunnelRequest.url();
int attemptedConnections = 0;
int maxAttempts = 21;
//一个while循环
while (true) {
//尝试连接词说超过最大次数
if (++attemptedConnections > maxAttempts) {
throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
}
//2、打开socket链接
connectSocket(connectTimeout, readTimeout);
//3、请求开启隧道并返回tunnelRequest
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
//4、成功开启了隧道,跳出while循环
if (tunnelRequest == null) break; /
//隧道未开启成功,关闭相关资源,继续while循环
//当然,循环次数超限后抛异常,退出wiile循环
closeQuietly(rawSocket);
rawSocket = null;
sink = null;
source = null;
}
}
//隧道请求是一个常规的HTTP请求,只是请求的内容有点特殊。最初创建的隧道请求如
private Request createTunnelRequest() {
return new Request.Builder()
.url(route.address().url())
.header("Host", Util.hostHeader(route.address().url(), true))
.header("Proxy-Connection", "Keep-Alive") // For HTTP/1.0 proxies like Squid.
.header("User-Agent", Version.userAgent())
.build();
}
二、创建链接流程
ConnectInterceptor
里面创建链接,并把创建的链接放如链接池中。具体过程如下:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//获取可复用流
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//创建输出流
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
//根据HTTP/1.x(keep-alive)和HTTP/2(流复用)的复用机制,发起连接
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
通过ConnectInterceptor
中的HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
方法调用StreamAllocation#newStream
方法
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//获取一个健康的连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
//实例化HttpCodec,如果是HTTP/2则是Http2Codec否则是Http1Codec
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
然后调用StreamAllocation#findHealthyConnection
方法
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis,
boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
//todo 找到一个连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
//todo 如果这个连接是新建立的,那肯定是健康的,直接返回
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
//todo 如果不是新创建的,需要检查是否健康
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
//todo 不健康 关闭连接,释放Socket,从连接池移除
// 继续下次寻找连接操作
noNewStreams();
continue;
}
return candidate;
}
}
然后调用StreamAllocation#findConnection
找一个连接
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if (result == null) {
//todo 尝试从连接池获取连接,如果有可复用的连接,会给第三个参数 this的connection赋值
//Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
// If we need a route selection, make one. This is a blocking operation.
//todo 创建一个路由 (dns解析的所有ip与代理的组合)
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
//todo 根据代理和不同的ip从连接池中找可复用的连接
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
//todo 还是没找到,必须新建一个连接了
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
//todo 实际上就是创建socket连接,但是要注意的是如果存在http代理的情况
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
//todo 将新创建的连接放到连接池中
Internal.instance.put(connectionPool, result);
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
然后调用StreamAllocation#isHealthy
判断是否健康链接
public boolean isHealthy(boolean doExtensiveChecks) {
//todo Socket关闭,肯定不健康
if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
return false;
}
if (http2Connection != null) {
return !http2Connection.isShutdown();
}
if (doExtensiveChecks) {
try {
int readTimeout = socket.getSoTimeout();
try {
socket.setSoTimeout(1);
if (source.exhausted()) {
return false; // Stream is exhausted; socket is closed.
}
return true;
} finally {
socket.setSoTimeout(readTimeout);
}
} catch (SocketTimeoutException ignored) {
// Read timed out; socket is good.
} catch (IOException e) {
return false; // Couldn't read; socket is closed.
}
}
return true;
}
三、链接池的存取数据
根据图我们看ConnectionPool
里面代码:
- 1、创建
//这是一个用于清楚过期链接的线程池,每个线程池最多只能运行一个线程,并且这个线程池允许被垃圾回收
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
//每个address的最大空闲连接数。
private final int maxIdleConnections;
private final long keepAliveDurationNs;
//清理任务
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
//链接的双向队列
private final Deque<RealConnection> connections = new ArrayDeque<>();
//路由的数据库
final RouteDatabase routeDatabase = new RouteDatabase();
//清理任务正在执行的标志
boolean cleanupRunning;
//创建一个适用于单个应用程序的新连接池。
//该连接池的参数将在未来的okhttp中发生改变
//目前最多可容乃5个空闲的连接,存活期是5分钟
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
//保持活着的时间,否则清理将旋转循环
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
管理http和http/2的链接,以便减少网络请求延迟。同一个address将共享同一个connection。该类实现了复用连接的目标。
- 1、主要就是connections,可见ConnectionPool内部以队列方式存储连接;
- 2、routDatabase是一个黑名单,用来记录不可用的route,但是看代码貌似ConnectionPool并没有使用它。所以此处不做分析。
- 3、剩下的就是和清理有关了,所以executor是清理任务的线程池,cleanupRunning是清理任务的标志,cleanupRunnable是清理任务。
- 2、put 链接
/**
* todo 保存连接以复用。
* 本方法没上锁,只加了断言: 当前线程拥有this(pool)对象的锁。
* 表示使用这个方法必须要上锁,而且是上pool的对象锁。
* okhttp中使用到这个函数的地方确实也是这么做的
*/
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//todo 如果清理任务未执行就启动它,再把新连接加入队列
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
- 2、get 链接
/**
* todo 获取可复用的连接
*/
@Nullable
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
//todo 要拿到的连接与连接池中的连接 连接的配置(dns/代理/域名等等)一致 就可以复用
// 在使用了,所以 acquire 会创建弱引用放入集合记录
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
执行RealConnection#isEligible
检查是否可以复用
public boolean isEligible(Address address, @Nullable Route route) {
// If this connection is not accepting new streams, we're done.
//todo 实际上就是在使用(对于http1.1)就不能复用
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// If the non-host fields of the address don't overlap, we're done.
//todo 如果地址不同,不能复用。包括了配置的dns、代理、证书以及端口等等 (域名还没判断,所有下面马上判断域名)
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// If the host exactly matches, we're done: this connection can carry the address.
//todo 都相同,那就可以复用了
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to carry the
// request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false;
// 2. The routes must share an IP address. This requires us to have a DNS address for both
// hosts, which only happens after route planning. We can't coalesce connections that use a
// proxy, since proxies don't tell us the origin server's IP address.
if (route == null) return false;
if (route.proxy().type() != Proxy.Type.DIRECT) return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
if (!this.route.socketAddress().equals(route.socketAddress())) return false;
// 3. This connection's server certificate's must cover the new host.
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;
// 4. Certificate pinning must match the host.
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true; // The caller's address can be carried by this connection.
}
- 2、清理 链接池
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//todo 检查连接是否正在被使用
//If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
//todo 否则记录闲置连接数
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
//TODO 获得这个连接已经闲置多久
// 执行完遍历,获得闲置了最久的连接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//todo 超过了保活时间(5分钟) 或者池内数量超过了(5个) 马上移除,然后返回0,表示不等待,马上再次检查清理
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it
// below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
//todo 池内存在闲置连接,就等待 保活时间(5分钟)-最长闲置时间 =还能闲置多久 再检查
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we
// run again.
//todo 有使用中的连接,就等 5分钟 再检查
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
//todo 都不满足,可能池内没任何连接,直接停止清理(put后会再次启动)
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
ConnectionPool#pruneAndGetAllocationCount
检查连接是否正在被使用
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
//todo 这个连接被使用就会创建一个弱引用放入集合,这个集合不为空就表示这个连接正在被使用
// 实际上 http1.x 上也只能有一个正在使用的。
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}