Okhttp之网络连接相关三大类RealConnection、C
Okhttp的浅层架构分析
Okhttp的责任链模式和拦截器分析
Okhttp之RetryAndFollowUpInterceptor拦截器分析
Okhttp之BridgeInterceptor拦截器分析
Okhttp之CacheInterceptor拦截器分析
Okhttp之ConnectInterceptor拦截器分析
Okhttp之网络连接相关三大类RealConnection、ConnectionPool、StreamAllocation
Okhttp之CallServerInterceptor拦截器分析
浅析okio的架构和源码实现
随着okhttp的不断深入,已经开始接触到了难啃的内容,即核心连接通信逻辑,下面就来扒一扒它是怎么设计和构造的。开始之前先来看看http2.0多路复用的概念。
HTTP 1.1 默认启用长TCP连接,但所有的请求-响应都是按序进行的(这里的长连接可理解成半双工协议。即便是HTTP1.1引入了管道机制,也是如此)。复用同一个TCP连接期间,即便是通过管道同时发送了多个请求,服务端也是按请求的顺序依次给出响应的;而客户端在未收到之前所发出所有请求的响应之前,将会阻塞后面的请求(排队等待),这称为"队头堵塞"(Head-of-line blocking)。
HTTP2.0复用TCP连接则不同,虽然依然遵循请求-响应模式,但客户端发送多个请求和服务端给出多个响应的顺序不受限制,这样既避免了"队头堵塞",又能更快获取响应。在复用同一个TCP连接时,服务器同时(或先后)收到了A、B两个请求,先回应A请求,但由于处理过程非常耗时,于是就发送A请求已经处理好的部分, 接着回应B请求,完成后,再发送A请求剩下的部分。HTTP2.0长连接可以理解成全双工的协议。
HTTP2.0 使用 多路复用 的技术,多个 stream 可以共用一个 socket 连接。每个 tcp连接都是通过一个 socket 来完成的,socket 对应一个 host 和 port,如果有多个stream(即多个 Request) 都是连接在一个 host 和 port上,那么它们就可以共同使用同一个 socket ,这样做的好处就是 可以减少TCP的一个三次握手的时间。
在OKHttp里面,负责连接的是 RealConnection 。
一、RealConnection
RealConnection是Connection的实现类,代表着链接socket的链路,如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路。与服务的三次握手也是在这里实现的。下面看看它的属性和构造函数。
public final class RealConnection extends Http2Connection.Listener implements Connection {
private static final String NPE_THROW_WITH_NULL = "throw with null exception";
private static final int MAX_TUNNEL_ATTEMPTS = 21;
private final ConnectionPool connectionPool;//连接池
private final Route route;//路由
// The fields below are initialized by connect() and never reassigned.
//下面这些字段,通过connect()方法开始初始化,并且绝对不会再次赋值
/** The low-level TCP socket. */
private Socket rawSocket; //底层socket
/**
* The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
* {@link #rawSocket} itself if this connection does not use SSL.
*/
private Socket socket; //应用层socket
//握手
private Handshake handshake;
//协议
private Protocol protocol;
// http2的链接
private Http2Connection http2Connection;
//通过okio的source和sink,大家可以猜到是与服务器交互的输入输出流
private BufferedSource source;
private BufferedSink sink;
// The fields below track connection state and are guarded by connectionPool.
//下面这个字段是 属于表示链接状态的字段,并且有connectPool统一管理
/** If true, no new streams can be created on this connection. Once true this is always true. */
//如果noNewStreams被设为true,则noNewStreams一直为true,不会被改变,并且表示这个链接不会再创新的stream流
public boolean noNewStreams;
//成功的次数
public int successCount;
/**
* The maximum number of concurrent streams that can be carried by this connection. If {@code
* allocations.size() < allocationLimit} then new streams can be created on this connection.
*/
//此链接可以承载最大并发流的限制,如果不超过限制,可以随意增加
public int allocationLimit = 1;
/** Current streams carried by this connection. */
//此链接当前携带的流
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
/** Nanotime timestamp when {@code allocations.size()} reached zero. */
public long idleAtNanos = Long.MAX_VALUE;
//构造方法,传入连接池和路由
public RealConnection(ConnectionPool connectionPool, Route route) {
this.connectionPool = connectionPool;
this.route = route;
}
...
}
下面看看核心方法connect():
public void connect( int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
if (protocol != null) throw new IllegalStateException("already connected");
// 线路的选择
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
// 连接开始,注意这是个死循环,创建连接成功才会跳出
while (true) {
try {
// 如果要求隧道模式,建立通道连接,通常不是这种
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
// 一般都走这条逻辑了,实际上很简单就是socket的连接
connectSocket(connectTimeout, readTimeout);
}
// 建立协议,构造读写桥梁,很重要的方法
establishProtocol(connectionSpecSelector);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
connectTunnel()隧道链接
/**
* Does all the work to build an HTTPS connection over a proxy tunnel. The catch here is that a
* proxy server can issue an auth challenge and then close the connection.
* 是否通过代理隧道建立HTTPS连接的所有工作。 这里的问题是代理服务器可以发出一个验证质询,然后关闭连接。
*/
private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout, Call call,
EventListener eventListener) throws IOException {
//1-构造一个 建立隧道连接 请求。
Request tunnelRequest = createTunnelRequest();
HttpUrl url = tunnelRequest.url();
for (int i = 0; i < MAX_TUNNEL_ATTEMPTS; i++) {
//2 与HTTP代理服务器建立TCP连接。
connectSocket(connectTimeout, readTimeout, call, eventListener);
//3 创建隧道。这主要是将 建立隧道连接 请求发送给HTTP代理服务器,并处理它的响应
tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
if (tunnelRequest == null) break; // Tunnel successfully created.
// The proxy decided to close the connection after an auth challenge. We need to create a new
// connection, but this time with the auth credentials.
closeQuietly(rawSocket);
rawSocket = null;
sink = null;
source = null;
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), null);
//重复上面的第2和第3步,直到建立好了隧道连接。
}
}
最终还是要调用到connectSocket():
/**
* Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
* 完成在原始套接字上构建完整的HTTP或HTTPS连接所需的所有工作。
*/
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
//如果是直连或者明文的HTTP代理模式则调用address.socketFactory().createSocket()
//是普通的创建new Socket(host, port, clientAddress, clientPort);否则用代理
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
//监听回调
eventListener.connectStart(call, route.socketAddress(), proxy);
//设置socket读数据超时时间
rawSocket.setSoTimeout(readTimeout);
try {
//建立Socket连接,实际调用的就是socket.connect(address, connectTimeout);
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
//okio 拿到输入流,最终的目的就是建立了管道流
source = Okio.buffer(Okio.source(rawSocket));
// 输出流
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
public class Platform {
public void connectSocket(Socket socket, InetSocketAddress address,
int connectTimeout) throws IOException {
//最终调用java的connect
socket.connect(address, connectTimeout);
}
}
总结连接的流程:
1、创建Socket,非SOCKS代理的情况下,通过SocketFactory创建;在SOCKS代理则传入proxy手动new一个出来。
2、为Socket设置超时
3、完成特定于平台的连接建立
4、创建用于I/O的source和sink
至于代理的相关逻辑,这里暂时就不深究了,后续会再单独去了解。
再看看establishProtocol()方法,这也是核心方法,正是在这里这里把读写操作架设好的:
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
// 一些协议的设置,主要方法我们看http2.0的startHttp2()方法
if (route.address().sslSocketFactory() == null) {
//如果当前协议包含了HTTP_2,OKhttp就会开启Http2.0模式,主要取决于服务器
if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
socket = rawSocket;
protocol = Protocol.H2_PRIOR_KNOWLEDGE;
startHttp2(pingIntervalMillis);
return;
}
//否则降级为1.1
socket = rawSocket;
protocol = Protocol.HTTP_1_1;
return;
}
eventListener.secureConnectStart(call);
//获取证书
connectTls(connectionSpecSelector);
eventListener.secureConnectEnd(call, handshake);
//在connectTls()方法里明确了是2.0模式
if (protocol == Protocol.HTTP_2) {
startHttp2(pingIntervalMillis);
}
}
//开启了http2Connection 线程,这是个流读写的线程,就是服务器客户端直接的通道交互
private void startHttp2(int pingIntervalMillis) throws IOException {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.pingIntervalMillis(pingIntervalMillis)
.build();
http2Connection.start();
}
继续看http2Connection.start()方法:
/**
* Sends any initial frames and starts reading frames from the remote peer. This should be called
* after {@link Builder#build} for all new connections.
*/
public void start() throws IOException {
start(true);
}
/**
* @param sendConnectionPreface true to send connection preface frames. This should always be true
* except for in tests that don't check for a connection preface.
*/
void start(boolean sendConnectionPreface) throws IOException {
//这部分代码暂不深究
if (sendConnectionPreface) {
writer.connectionPreface();
writer.settings(okHttpSettings);
int windowSize = okHttpSettings.getInitialWindowSize();
if (windowSize != Settings.DEFAULT_INITIAL_WINDOW_SIZE) {
writer.windowUpdate(0, windowSize - Settings.DEFAULT_INITIAL_WINDOW_SIZE);
}
}
//实际是调用了这个readerRunnable封装的线程start()方法
new Thread(readerRunnable).start(); // Not a daemon thread.
}
看看这个readerRunnable
//前面分析过这个NamedRunnable就是继承了runnable的接口,run()方法里面有个execute()抽象方法
//所以最终还是执行了execute()
class ReaderRunnable extends NamedRunnable implements Http2Reader.Handler {
final Http2Reader reader;
ReaderRunnable(Http2Reader reader) {
super("OkHttp %s", hostname);
this.reader = reader;
}
@Override
protected void execute() {
ErrorCode connectionErrorCode = ErrorCode.INTERNAL_ERROR;
ErrorCode streamErrorCode = ErrorCode.INTERNAL_ERROR;
try {
reader.readConnectionPreface(this);
//重点是这个nextFrame()方法,循环执行,看起来应该是个读的操作
while (reader.nextFrame(false, this)) {
}
connectionErrorCode = ErrorCode.NO_ERROR;
streamErrorCode = ErrorCode.CANCEL;
} catch (IOException e) {
connectionErrorCode = ErrorCode.PROTOCOL_ERROR;
streamErrorCode = ErrorCode.PROTOCOL_ERROR;
} finally {
try {
close(connectionErrorCode, streamErrorCode);
} catch (IOException ignored) {
}
Util.closeQuietly(reader);
}
}
...
}
在此重新梳理一下:
从Realconnection调用connect()创建了socket连接之后(这里讨论走http2.0协议分支),创建了一个http2Connection 对象,启用了一个readerRunnable的线程,run()方法的主要工作是循环地执行reader.nextFrame()方法。
看看reader.nextFrame()干了啥:
public boolean nextFrame(boolean requireSettings, Handler handler) throws IOException {
//不停的在读数据帧,直到流关闭(发生IOException )返回false
//数据帧是个什么概念我也不是很清楚。。。立个flag
try {
source.require(9); // Frame header size
} catch (IOException e) {
return false; // This might be a normal socket close.
}
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Length (24) |
// +---------------+---------------+---------------+
// | Type (8) | Flags (8) |
// +-+-+-----------+---------------+-------------------------------+
// |R| Stream Identifier (31) |
// +=+=============================================================+
// | Frame Payload (0...) ...
// +---------------------------------------------------------------+
int length = readMedium(source);
if (length < 0 || length > INITIAL_MAX_FRAME_SIZE) {
throw ioException("FRAME_SIZE_ERROR: %s", length);
}
byte type = (byte) (source.readByte() & 0xff);
if (requireSettings && type != TYPE_SETTINGS) {
throw ioException("Expected a SETTINGS frame but was %s", type);
}
byte flags = (byte) (source.readByte() & 0xff);
//streamId 很重要,用来追踪是哪次请求流的
//Map<Integer, Http2Stream> streams ,Http2Connection里维护的一个map,用来保存各个请求流
int streamId = (source.readInt() & 0x7fffffff); // Ignore reserved bit.
if (logger.isLoggable(FINE)) logger.fine(frameLog(true, streamId, length, type, flags));
//根据数据类型,data,header,priopity等等,虽然我不是很懂但是大概理解了
//这个循环读操作就是等着我们的request发过服务器后,服务器返回应答后我们在循环
//读取的一个动作,这个方法的理解,会在后面的CallServerInterceptor拦截器里让你
//醍醐灌顶,特别是那个readHeaders(handler, length, flags, streamId)方法
switch (type) {
case TYPE_DATA:
readData(handler, length, flags, streamId);
break;
case TYPE_HEADERS:
readHeaders(handler, length, flags, streamId);
break;
case TYPE_PRIORITY:
readPriority(handler, length, flags, streamId);
break;
case TYPE_RST_STREAM:
readRstStream(handler, length, flags, streamId);
break;
case TYPE_SETTINGS:
readSettings(handler, length, flags, streamId);
break;
case TYPE_PUSH_PROMISE:
readPushPromise(handler, length, flags, streamId);
break;
case TYPE_PING:
readPing(handler, length, flags, streamId);
break;
case TYPE_GOAWAY:
readGoAway(handler, length, flags, streamId);
break;
case TYPE_WINDOW_UPDATE:
readWindowUpdate(handler, length, flags, streamId);
break;
default:
// Implementations MUST discard frames that have unknown or unsupported types.
source.skip(length);
}
return true;
}
再来重点看看 readHeaders(handler, length, flags, streamId)方法,因为在后面的CallServerInterceptor拦截器会追踪到,提前了解一下,是怎么读取response的headers的:
private void readHeaders(Handler handler, int length, byte flags, int streamId)
throws IOException {
//streamId 很重要,用来追踪是哪次请求的流
if (streamId == 0) throw ioException("PROTOCOL_ERROR: TYPE_HEADERS streamId == 0");
boolean endStream = (flags & FLAG_END_STREAM) != 0;
short padding = (flags & FLAG_PADDED) != 0 ? (short) (source.readByte() & 0xff) : 0;
if ((flags & FLAG_PRIORITY) != 0) {
readPriority(handler, streamId);
length -= 5; // account for above read.
}
length = lengthWithoutPadding(length, flags, padding);
List<Header> headerBlock = readHeaderBlock(length, padding, flags, streamId);
//重点方法,继续追踪
handler.headers(endStream, streamId, -1, headerBlock);
}
@Override
public void headers(boolean inFinished, int streamId, int associatedStreamId,
List<Header> headerBlock) {
if (pushedStream(streamId)) {
pushHeadersLater(streamId, headerBlock, inFinished);
return;
}
Http2Stream stream;
synchronized (Http2Connection.this) {
//通过id拿到stream
stream = getStream(streamId);
if (stream == null) {
// If we're shutdown, don't bother with this stream.
if (shutdown) return;
// If the stream ID is less than the last created ID, assume it's already closed.
if (streamId <= lastGoodStreamId) return;
// If the stream ID is in the client's namespace, assume it's already closed.
if (streamId % 2 == nextStreamId % 2) return;
// Create a stream.
Headers headers = Util.toHeaders(headerBlock);
final Http2Stream newStream = new Http2Stream(streamId, Http2Connection.this,
false, inFinished, headers);
lastGoodStreamId = streamId;
streams.put(streamId, newStream);
listenerExecutor.execute(new NamedRunnable("OkHttp %s stream %d", hostname, streamId) {
@Override public void execute() {
try {
listener.onStream(newStream);
} catch (IOException e) {
Platform.get().log(INFO, "Http2Connection.Listener failure for " + hostname, e);
try {
newStream.close(ErrorCode.PROTOCOL_ERROR);
} catch (IOException ignored) {
}
}
}
});
return;
}
}
// Update an existing stream.
//更新一个存在的流信息,看下去这个方法
stream.receiveHeaders(headerBlock);
if (inFinished) stream.receiveFin();
}
/**
* Accept headers from the network and store them until the client calls {@link #takeHeaders}, or
* {@link FramingSource#read} them.
*/
void receiveHeaders(List<Header> headers) {
assert (!Thread.holdsLock(Http2Stream.this));
boolean open;
synchronized (this) {
hasResponseHeaders = true;
//headersQueue是一个双端队列,用它来接受保存返回的headers信息
headersQueue.add(Util.toHeaders(headers));
open = isOpen();
//配合wait方法,实际是唤醒takeHeaders()方法里的wait,告诉它读到了headers
notifyAll();
}
if (!open) {
connection.removeStream(id);
}
}
//取headers 这个也是stream的方法,stream是什么时候创建的呢,下篇讲CallServerInterceptor的时候会讲到
public synchronized Headers takeHeaders() throws IOException {
readTimeout.enter();
try {
//headersQueue为空的时候一直等待
while (headersQueue.isEmpty() && errorCode == null) {
waitForIo();
}
} finally {
readTimeout.exitAndThrowIfTimedOut();
}
//被唤醒后返回header
if (!headersQueue.isEmpty()) {
return headersQueue.removeFirst();
}
throw new StreamResetException(errorCode);
}
二、ConnectionPool
链接池,看名字就能联想到线程池之类的池设计,都是为了减少资源创建,提高资源复用率而设计的。连接池是用来管理http和http/2的链接复用,通过让同一个address将共享同一个connection,以便减少网络请求延迟。
成员变量和构造函数:
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
//这是一个用于清除过期链接的线程池,每个线程池最多只能运行一个线程,并且这个线程池允许被垃圾回收
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
//每个address的最大空闲连接数。
private final int maxIdleConnections;
private final long keepAliveDurationNs;
//清理过期链接任务
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
//清除方法
long waitNanos = cleanup(System.nanoTime());
//-1代表没有链接,直接退出
if (waitNanos == -1) return;
//等待
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
//这是object顶级父类的方法,大部分时间清理线程会在这里等待,这里会配合connectionBecameIdle()
//里的notifyAll方法被唤醒来使用,详见 https://www.jianshu.com/p/c518f9c07a80
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
//链接的双向队列,在这里存储可复用的链接
private final Deque<RealConnection> connections = new ArrayDeque<>();
//路由的数据库
final RouteDatabase routeDatabase = new RouteDatabase();
//清理任务正在执行的标志
boolean cleanupRunning;
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
//创建一个适用于单个应用程序的新连接池。
//该连接池的参数将在未来的okhttp中发生改变
//目前最多可容乃5个空闲的连接,存活期是5分钟
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
//保持活着的时间,否则清理将旋转循环
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
...
}
先搞明白那个清除cleanup(long now)方法:
//对该池执行维护,如果它超出保持活动限制或空闲连接限制,则驱逐空闲时间最长的连接。
//返回纳秒级的睡眠持续时间,直到下次预定调用此方法为止。 如果不需要进一步清理,则返回-1。
long cleanup(long now) {
//在用的链接数
int inUseConnectionCount = 0;
//闲置的链接数
int idleConnectionCount = 0;
//闲置最久的链接
RealConnection longestIdleConnection = null;
//闲置最久的时间
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
//遍历所有的连接,标记处不活跃的连接。
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
//1. 查询此连接内部的StreanAllocation的引用数量,大于0则跳过这个连接
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
//2. 标记闲置最久的空闲连接
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//遍历完后
//3. 如果空闲连接超过5个或者keepalive时间大于5分钟,则将该连接清理掉。
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
//longestIdleConnection此时必然不为空
//只有这个分支才会清理连接,清理后需要关闭链接,最终return 0
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
//这种情况是只是有空闲的链接但是还没有超过5个且闲置时间还没到5分钟
//4. 返回此连接的到期时间,供下次进行清理。
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
//5. 全部都是活跃连接,5分钟时候再进行清理。
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
//6. 没有任何连接,跳出循环。
cleanupRunning = false;
return -1;
}
}
//7. 关闭连接,返回时间0,立即再次进行清理。
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
pruneAndGetAllocationCount()方法:
/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on
* {@code connection}. Allocations are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
//遍历弱引用列表
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
//若StreamAllocation被使用则接着循环
if (reference.get() != null) {
i++;
continue;
}
//到了这里表示这个connect没有被引用了,但是却还存在,说明没有关闭流操作而导致了内存泄露
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
//若StreamAllocation未被使用则移除引用,这边注释为泄露
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
//如果列表为空则说明此连接没有被引用了,则返回0,表示此连接是空闲连接
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
//走到这说明链接被引用
return references.size();
}
再来看看清理的任务是什么时候执行的:
//加入一个链接到这个链接池
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//cleanupRunning 这个变量是在上面的cleanup()方法里控制的,这个变量保证了同时间只会有一个清理任务在跑
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
总结清理闲置链接机制:
- 利用一个线程池在不断监控当前的闲置链接数量和链接闲置的时长,当数量和时长出现超载的情况的时候就会执行清除动作。
- 每当往连接池加入一个链接的时候,会根据当前是否有清理线程来决定是否开启一个新的清理线程,保证始终只有一个清理线程任务在跑。
下面ConnectionPool的实例化的过程,一个OkHttpClient只包含一个ConnectionPool,其实例化是在OkHttpClient的实例化过程中进行的。ConnectionPool各个方法的调用并没有直接对外暴露,而是通过OkHttpClient的Internal接口统一对外暴露。
Internal.instance = new Internal() {
...
@Override public boolean connectionBecameIdle(
ConnectionPool pool, RealConnection connection) {
return pool.connectionBecameIdle(connection);
}
@Override public RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
@Override public Socket deduplicate(
ConnectionPool pool, Address address, StreamAllocation streamAllocation) {
return pool.deduplicate(address, streamAllocation);
}
@Override public void put(ConnectionPool pool, RealConnection connection) {
pool.put(connection);
}
@Override public RouteDatabase routeDatabase(ConnectionPool connectionPool) {
return connectionPool.routeDatabase;
}
...
}
再来看看ConnectionPool的其他方法,看明白了这些方法也就大概了解了它的工作流程,get()方法:
/**
* Returns a recycled connection to {@code address}, or null if no such connection exists. The route is null if the address has not yet been routed.
* 返回一个可复用链接,如果还没有被创建则为null
*/
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
//断言,判断线程是不是被自己锁住了
assert (Thread.holdsLock(this));
// 遍历已有连接集合
for (RealConnection connection : connections) {
//如果connection和需求中的"地址"和"路由"匹配
if (connection.isEligible(address, route)) {
//复用这个连接
streamAllocation.acquire(connection);
//返回这个连接
return connection;
}
}
return null;
}
connectionBecameIdle()方法:
/**
* Notify this pool that {@code connection} has become idle. Returns true if the connection has
* been removed from the pool and should be closed.
*/
//手动把一个connection置为闲置状态,一般是外部主动调用
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
//该连接已经不可用可直接移除
if (connection.noNewStreams || maxIdleConnections == 0) {
connections.remove(connection);
return true;
} else {
//唤醒cleanupRunnable 线程来清理他
notifyAll();
return false;
}
}
deduplicate()方法:
/**
* Replaces the connection held by {@code streamAllocation} with a shared connection if possible.
* This recovers when multiple multiplexed connections are created concurrently.
*/
//如果可能,用共享连接替换 {@code streamAllocation} 持有的连接。
// 当同时创建多个多路复用连接时会恢复。
//该方法主要是针对多路复用连接清除的场景。如果是当前连接是HTTP/2,那么所有指向该站点的请求都应该基于同一个TCP连接。
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
//遍历当前的connection
for (RealConnection connection : connections) {
//connection 能让address复用&&connection支持多路复用(http2.0就能支持)&&
//connection 不等于streamAllocation持有的connection
if (connection.isEligible(address, null)
&& connection.isMultiplexed()
&& connection != streamAllocation.connection()) {
return streamAllocation.releaseAndAcquire(connection);
}
}
return null;
}
evictAll()方法:
/** Close and remove all idle connections in the pool. */
//关闭并移除连接池里的所有空闲链接
//这应该也是一个外部主动调用的方法
public void evictAll() {
List<RealConnection> evictedConnections = new ArrayList<>();
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
if (connection.allocations.isEmpty()) {
connection.noNewStreams = true;
evictedConnections.add(connection);
i.remove();
}
}
}
for (RealConnection connection : evictedConnections) {
closeQuietly(connection.socket());
}
}
总结:
ConnectionPool的主要职责就是维护了一个RealConnection的双端队列,并且维护了一个定时清理空闲和多余connection的线程池,并提供了一些相应的操作方法来维护连接池的稳定性和提供相应的功能。
三、StreamAllocation
流分配,Connection是建立在Socket之上的物理通信信道,而Stream则是代表逻辑的流,至于Call是对一次请求过程的封装。之前也说过一个Call可能会涉及多个流(比如重定向或者auth认证等情况)。所以我们想一下,如果StreamAllocation要想解决上述问题,需要两个步骤,一是寻找连接,二是获取流。所以StreamAllocation里面应该包含一个Stream;还应该包含连接Connection。如果想找到合适的链接,还需要一个连接池ConnectionPool属性。所以应该有一个获取流的方法在StreamAllocation里面,还应该有完成请求任务的之后的方法来关闭流对象,还有终止和取消等方法,以及释放资源的方法。
成员变量及构造函数:
public final class StreamAllocation {
public final Address address;//地址
private Route route; //路由
private final ConnectionPool connectionPool; //连接池
private final Object callStackTrace; //日志
// State guarded by connectionPool.
private final RouteSelector routeSelector; //路由选择器
private int refusedStreamCount; //拒绝的次数
private RealConnection connection; //连接
private boolean released; //是否已经被释放
private boolean canceled //是否被取消了
public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
this.connectionPool = connectionPool;
this.address = address;
this.routeSelector = new RouteSelector(address, routeDatabase());
this.callStackTrace = callStackTrace;
}
}
看到这些成员变量是不是很眼熟,就是之前讲过的链接以及连接池,路由这些,下面看看它的几个重要的方法,先看看在ConnectInterceptor里调用到的newStream()方法:
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
//各种超时值获取
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
//链接失败是否重试
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//获取一个健康可用的连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
//实例化HttpCodec,如果是HTTP/2则是Http2Codec否则是Http1Codec
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
findHealthyConnection()方法
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
//循环获取,一直得到健康可用的connection为止
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
//如果这是个全新的链接,可以跳过健康检测直接返回
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
//如果不是健康的链接,继续循环获取
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
//拿到了健康可用的,返回
return candidate;
}
}
继续看下findConnection()方法:
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
//1.获取存在的连接
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
// 如果已经存在的连接满足要求,则使用已存在的连接
return allocatedConnection;
}
//2. 尝试从链接池中去取,这个get()方法我们在前面讲到过,实际调用的是connectionPool的get()方法
// 最终会调用到StreamAllocation里的acquire()方法,这个方法会给connection变量赋值
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// 线路的选择,多ip的支持
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
//里面是个递归
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
//更换路由再次尝试
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
// 以上都不符合,创建一个连接
result = new RealConnection(connectionPool, selectedRoute);
//给connection赋值一下
acquire(result);
}
//连接并握手
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
//更新本地数据库
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
//把连接放到连接池中
Internal.instance.put(connectionPool, result);
//如果这个连接是多路复用
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
//调用connectionPool的deduplicate方法去重。调用的其实是
//connectionPool里的方法,这个方法在上面有贴出来
//最终返回socket的方法是streamAllocation的releaseAndAcquire(),下面有贴出
//socket 不为空则代表有确实有重复的socket,下面会把他关掉达到复用不浪费资源的目的
//其实我在这里有个疑问,前面调用连接池的get方法如果没有拿到复用的connection,那不就说明没有
//重复的connection吗,何必要在这去重呢?
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
//如果是重复的socket则关闭socket,不是则socket为nul,什么也不做
closeQuietly(socket);
//返回整个连接
return result;
}
/**
* Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
* {@link #release} on the same connection.
获得一个connection
*/
public void acquire(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
//此时connection 必须是空的,才能被赋值,如果不为空会报非法异常
if (this.connection != null) throw new IllegalStateException();
//七拐八绕最终在这里赋值
this.connection = connection;
//connection被StreamAllocation引用的集合+1
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
//释放socket并且得到一个可复用的connection
public Socket releaseAndAcquire(RealConnection newConnection) {
assert (Thread.holdsLock(connectionPool));
//此时的成员connection的allocations必然只有一个,因为他是新建的
if (codec != null || connection.allocations.size() != 1) throw new IllegalStateException();
// Release the old connection.
Reference<StreamAllocation> onlyAllocation = connection.allocations.get(0);
//这个方法后面分析
Socket socket = deallocate(true, false, false);
// Acquire the new connection.
this.connection = newConnection;
//这个newConnection是传进来的复用的,关联自身StreamAllocation
newConnection.allocations.add(onlyAllocation);
return socket;
}
/**
* Releases resources held by this allocation. If sufficient resources are allocated, the
* connection will be detached or closed. Callers must be synchronized on the connection pool.
*
* <p>Returns a closeable that the caller should pass to {@link Util#closeQuietly} upon completion
* of the synchronized block. (We don't do I/O while synchronized on the connection pool.)
*/
private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
assert (Thread.holdsLock(connectionPool));
if (streamFinished) {
this.codec = null;
}
if (released) {
this.released = true;
}
Socket socket = null;
if (connection != null) {
if (noNewStreams) {
connection.noNewStreams = true;
}
if (this.codec == null && (this.released || connection.noNewStreams)) {
//清除此链接的StreamAllocation引用
release(connection);
if (connection.allocations.isEmpty()) {
connection.idleAtNanos = System.nanoTime();
//connectionBecameIdle(),把connection从链接池移除,此时拿到对应的socket
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
socket = connection.socket();
}
}
connection = null;
}
}
return socket;
}
以上findHealthyConnection()方法的总结:
1、循环获取connection实例,直到获取到健康可用的,获取实例的时候先找是否有已经存在的连接,如果有已经存在的连接,并且可以使用(!noNewStreams)则直接返回。
2、没有现成的就根据已知的address在connectionPool里面找,如果有连接,则返回
3、更换路由,更换线路,在connectionPool里面再次查找,如果有则返回。
4、如果以上条件都不满足则直接new一个RealConnection出来
5、new出来的RealConnection通过acquire关联到connection.allocations上
6、做去重判断,如果有重复的socket则关闭
最终得到了一个可用的链接connection,并且在newStream()方法后还用这个链接新建了一个Http2Codec(http2.0) ,Http2Codec 的作用主要是对请求进行编码和对response进行解码,可以理解成对流的一些操作封装。
其他方法暂时没用到,不做一一讲解,下篇分析最后一个拦截器CallServerInterceptor,最终跟服务器产生通信的阶段,结合这个拦截器再来重新组织起来看看这篇文章讲到的知识点。