OkHttp 原理解析 (三)
ConnectInterceptor
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
}
之前我们在RetryAndFollowUpInterceptor
已经 prepareToConnect()
做过准备了,然后就是在 BridgeIntercepter
中添加一些请求头和相应头,接着是CacheIntercepter
看是否可以直接使用缓存,如果有缓存的话也不会走到这里,如果没有缓存就需要 ConnectIntercepter
借用 Transmitter
来桥接应用层和网络层,通过 ExchangeFinder
中的 finHealthyConnection()
从 connectionPool
中找到一个可用的连接,这个连接可能是复用的,并 connect()
,从而得到 输入/输出 流 (source/sink) ,返回一个 Exchange
给 CallServerIntercepter
, 通过这个 Exchange
就可以添加请求头和请求体,并读取响应头和响应体,来交给上面的 Intercepter,层层向上传递。
// ExchangeFinder.java
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.
// 尝试复用已分配 Connection
releasedConnection = transmitter.connection;
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;
if (transmitter.connection != null) {
// 得到了已分配的connection
result = transmitter.connection;
releasedConnection = null;
}
if (result == null) {
// 尝试获取已回收的connection
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry;
nextRouteToTry = null;
} else if (retryCurrentRoute()) {
selectedRoute = transmitter.connection.route();
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// 从connectionPool中找到了就返回
return result;
}
// 如果需要路由选择器,就创建。这是一个阻塞操作
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
List<Route> routes = null;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
if (newRouteSelection) {
// 根据 IP addresses 集合, 再次尝试从 connectionPool中获取connection。这里与上次的区别是 routes不为空
routes = routeSelection.getAll();
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// 这里就创建一个 Connection并指派
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}
// 得到了connection,返回
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// 进行 TCP + TLS handshakes. 一个阻塞操作
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
// 将 connection进行合并,只有在多个connection 复用一个 host的时候
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
} else {
connectionPool.put(result);
transmitter.acquireConnectionNoEvents(result);
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
以上代码主要做的事情有:
- StreamAllocation的connection如果可以复用则复用;
- 如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回;
- 如果连接池里没有,则new一个RealConnection对象;
- 调用RealConnection的connect()方法发起请求;
- 将RealConnection对象存进连接池中,以便下次复用;
- 返回RealConnection对象。
RealConnection
// Connection 接口
Route route(); //返回一个路由
Socket socket(); //返回一个socket
Handshake handshake(); //如果是一个https,则返回一个TLS握手协议
Protocol protocol(); //返回一个协议类型 比如 http1.1 等或者自定义类型
RealConnection是Connection的实现类,代表着链接socket的链路,如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路。
// RealConnection 成员变量
private final ConnectionPool connectionPool;
private final Route route;
//下面这些字段,通过connect()方法初始化赋值,且不会再次赋值
private Socket rawSocket; //底层 TCP socket
private Socket socket; //应用层socket
private Handshake handshake; //握手
private Protocol protocol; //协议
private Http2Connection http2Connection; // http2的链接
// 通过source和sink,与服务器交互的输入输出流
private BufferedSource source;
private BufferedSink sink;
// 下面这个字段是表示链接状态的字段,并且有connectPool统一管理
// 如果noNewStreams被设为true,则noNewStreams一直为true,不会被改变,
// 并且这个链接不会再创建新的stream流
public boolean noNewStreams;
//成功的次数
public int successCount;
//此链接可以承载最大并发流的限制,如果不超过限制,可以随意增加
public int allocationLimit = 1;
由上面的我们可以得出一些结论:
- source和sink,以流的形式对服务器进行交互
- 除了route 字段,部分的字段都是在connect()方法里面赋值的,并且不会改变
- noNewStream 可以简单理解为该连接不可用。
- allocationLimit是分配流的数量上限,一个connection最大只能支持一个1并发
首先是connect()
方法
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if (protocol != null) throw new IllegalStateException("already connected");
// 创建一个 Selector 来选择 connectionSpec 也就是线路
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
...
// 尝试连接
while (true) {
try {
// 如果要求隧道模式,建立通道连接,通常不会使用这种
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break;
}
} else {
// socket 连接
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
// 建立 https 连接
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
...
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
socket 连接
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
// 根据代理类型来选择socket是代理还是直连类型
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
// 为支持不同的平台,实际是 socket.connect(address, connectTimeout)
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
try {
// 得到输入/输出流
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
隧道连接
private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
throws IOException {
// 创建隧道请求
Request tunnelRequest = createTunnelRequest();
HttpUrl url = tunnelRequest.url();
int attemptedConnections = 0;
int maxAttempts = 21;
while (true) {
if (++attemptedConnections > maxAttempts) {
throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
}
// 建立Socket连接
connectSocket(connectTimeout, readTimeout);
// 建立隧道
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
if (tunnelRequest == null) break; // Tunnel successfully created.
closeQuietly(rawSocket);
rawSocket = null;
sink = null;
source = null;
}
}
它们调用connectSocket
中参数 Call
是不一样的。
connectSocket中的代理连接建立的过程
- 没有设置代理的情况下,则直接与HTTP服务器建立TCP连接
- 设置了SOCKS代理的情况下,创建Socket时,为其传入proxy,连接时还是以HTTP服务器为目标。
- 设置了HTTP代理时,如果是HTTP请求,则与HTTP代理服务器建立TCP连接。HTTP代理服务器解析HTTP请求/响应的内容,并根据其中的信息来完成数据的转发。
- 设置了HTTP代理时,如果是 HTTPS/HTTP2请求,与HTTP服务器建立通过HTTP代理的隧道连接。HTTP代理不再解析传输的数据,仅仅完成数据转发的功能。此时HTTP代理的功能退化为如同SOCKS代理类似。
- 设置了代理类时,HTTP的服务器的域名解析会交给代理服务器执行。如果是HTTP代理,会对HTTP代理的域名做域名解析。
establishProtocol 建立连接过程:
- 建立 TLS 连接
- 用SSLSocketFactory基于原始的TCP Socket,创建一个SSLSocket, 配置SSLSocket。
- configureTlsExtensions 配置 TLS扩展
- 进行TLS握手
- 获取证书信息。
- 对证书进行验证。
- 完成HTTP/2的ALPN扩展
- 基于前面获取到SSLSocket创建于执行的IO的BufferedSource和BufferedSink等,并保存握手信息以及所选择的协议。
- 如果是HTTP 2.0,通过Http2Connection.Builder 建立一个 Http2Connection,通过 http2Connection.start() 和服务器建立连接。
ConnectionPool
管理http和http/2的链接,以减少请求的网络延迟。同一个address将共享同一个connection。实现了连接复用的功能。
public final class ConnectionPool {
final RealConnectionPool delegate;
}
当前版本将具体的实现委托给了 RealConnectionPool
public final class RealConnectionPool {
// 后台线程用来清理过期连接,在每一个连接池中最多又一个线程。
// 这个 executor 允许自己被GC 清理
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
// 清理任务
private final Runnable cleanupRunnable = () -> {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (RealConnectionPool.this) {
try {
RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
// 过期连接队列
private final Deque<RealConnection> connections = new ArrayDeque<>();
// 路由数据库,用来记录不可用的route
final RouteDatabase routeDatabase = new RouteDatabase();
}
默认情况下,这个连接池最多维持5个连接,且每个链接最多活5分钟。
从 ConnectionPool 获取Connection
// RealConectionPool.java
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
@Nullable List<Route> routes, boolean requireMultiplexed) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (requireMultiplexed && !connection.isMultiplexed()) continue;
if (!connection.isEligible(address, routes)) continue;
transmitter.acquireConnectionNoEvents(connection);
return true;
}
return false;
}
然后把这个connection 设置到 Transmitter 中去
// 此方法有两处调用,一个是 findConnection,另一个是 connectionPool.transmitterAcquirePooledConnection()
// 后一个方法也会在 findConnection处被调用
void acquireConnectionNoEvents(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
connection.transmitters.add(new TransmitterReference(this, callStackTrace));
}
从代码可以看出来,这个connection 必须 isMultiplexed、 isEligible, 才可以
至于添加 connection ,就是异步触发清理任务,然后将连接添加到队列中。
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
至于这个清理任务,代码就是上面的 cleanupRunnable
- 调用
cleanup
方法 - 等待
connectionBecameIdle()
触发notifyAll()
而这个connectionBecameIdle()
是在Transmitter
的releaseConnectionNoEvents()
->maybeReleaseConnection()
->exchangeMessageDone()
->Exchange.bodyComplete
->complete
->close
这个 close 属于ForwardingSource
,它的 delegate, 即为codec.openResponseBodySource(response)
我们现在看一下 cleanup 做了什么
- 统计空连接数量
- 查找最长空闲时间的连接,以及它的空闲时长
- 如果超过了最大连接数或者最大空闲时长,就 remove 掉这个连接
- 否则返回一个等待时长,也就是cleanup 的返回值 waitNanos
然后阻塞相应的时间,如果有了废弃连接就清理,否则,接着等待
cleanup中还有一个方法 pruneAndGetAllocationCount()
,它是用来追踪泄露连接的,返回还存活于 connection 的 transmitter 的数量。所谓泄漏,就是还在追踪这个connection 但是程序已经废弃掉他们了。
Transmitter
是OkHttp的应用程序和网络层之间的桥梁。 此类公开了高级应用程序层原语:连接,请求,响应和流。
它持有okhttpclient对象以及RealCall对象。
它支持异步取消,如果是一个 HTTP/2, 取消的是这个流而不是共享的这个连接,但是如果是在进行TLS握手,就会取消整个连接。
ExchangeFinder
它尝试去是为一些可能的变化去找到一条可用的连接,策略如下:
- 如果当前 call 已经有了一个连接,能够满足请求,就用相同的连接,做一些初始化修改。
- 如果连接池中的一个连接满足这个请求。
- 如果没有现存的连接,就创建一个路由列表,并创建一个新连接。如果失败了,就迭代的尝试列表中可用的路由。