移动互联网android网络开发OkHttp

okhttp源码学习笔记(二)-- 连接与连接管理

2017-03-12  本文已影响646人  蕉下孤客

本篇文章为okhttp源码学习笔记系列的第二篇文章,本篇文章的主要内容为okhttp中的连接与连接的管理,因此需要重点介绍连接的概念。客户端通过HTTP协议与服务器进行通信,首先需要建立连接,okhttp并没有使用URLConnection, 而是对socket直接进行封装,在socket之上建立了connection的概念,代表这物理连接。同时,一对请求与响应对应着输出和输入流, okhttp中同样使用流的逻辑概念,建立在connection的物理连接之上进行数据通信,(只不过不知道为什么okhttp中流的类名为HttpCodec)。在HTTP1.1以及之前的协议中,一个连接只能同时支持单个流,而SPDY和HTTP2.0协议则可以同时支持多个流,本文先考虑HTTP1.1协议的情况。connection负责与远程服务器建立连接, HttpCodec负责请求的写入与响应的读出,但是除此之外还存在在一个连接上建立新的流,取消一个请求对应的流,释放完成任务的流等操作,为了使HttpCodec不至于过于复杂,okhttp中引入了StreamAllocation负责管理一个连接上的流,同时在connection中也通过一个StreamAllocation的引用的列表来管理一个连接的流,从而使得连接与流之间解耦。另外,熟悉HTTP协议的同学肯定知道建立连接是昂贵的,因此在请求任务完成以后(即流结束以后)不会立即关闭连接,使得连接可以复用,okhttp中通过ConnecitonPool来完成连接的管理和复用。

因此本文会首先从两个拦截器开始引入StreamAllocation的概念,其次会依次介绍Connection和的一个实现类Connection,最后再通过分析ConnectionPool来分析连接的管理机制,期间还有涉及Rout, RoutSelector等一些为建立连接或建立流有关的辅助类。

0. 从两个拦截器开始

在上一篇文章中我们了解到okhttp中的网络请求过程其实就是一系列拦截器的处理过程,那么我们就可以以这些拦截器为主线看一下okhttp的实现中都做了哪些事情,首先我们再次贴出RealCall#getResponseWithInterceptor方法:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

从方法中可以看出okhttp内部定义了五个拦截器,分别负责重试(包括失败重连,添加认证头部信息,重定向等),request和response的转换(主要是将Request和Response对象转换成Http协议定义的报文形式), 缓存以及连接处理,最后一个与其他稍有不同,它负责流的处理, 将请求数据写入到socket的输出流,并从输入流中获取响应数据,这个在第三篇文章中做分析。
本篇文章的重点是连接以及连接管理,可以看出与连接有关的时RetryAndFollowUpInterceptor和ConnectionInterceptor,那么我们就从这两个拦截器开始分析。
在前面一篇文章中我们了解到拦截器会对外提供一个方法,即intercept(chain)方法,在该方法中处理请求,并调用chain.proceed()方法获取响应,处理后返回响应结果,那么我们首先来看RetryAndFollowUpInterceptor#intercept()方法:

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();

    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      ...
      Response response = null;
      boolean releaseConnection = true;
      try {
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (... e) {
        ...
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }
      ...

这里的代码只是前一半,并且做了部分简化,由于本篇文章分析的是连接与连接管理,因此这里我们只关注与连接相关的部分。这里我们看到在该方法中,首先创建了StreamAllocation对象,该对象是连接与流的桥梁,okhttp处理一个请求时,它负责为该请求找到一个合适的连接(找到是指从连接池中复用连接或者创建新连接), 并在该连接上创建流对象,通过该流对象完成数据通信。这里的流对象负责数据通信,即向socket的输出流中写请求数据,并从socket的输出流中读取响应数据,而StreamAllocation则负责管理流,包括为请求查找连接,并在该连接上建立流对象,将连接封装的socket对象中的输入输出流传递到okhttp流对象中,使他去完成数据通信任务,这是一种功能或责任的分离,这一点有点像之前分析的Request对象和Call对象的关系,另外一个连接也是通过持有一个关于StreamAllocation的集合来管理一个连接上的多个流(只有在SPDY和HTTP2上存在一个连接上多个流,HTTP1.1及1.0则是在一个连接上同时只有一个流对象)。
从代码中我们看到,创建的streamAllocation对象传递到proceed()方法中,之前对于拦截器链的分析中我们知道,在proceed()方法中递归地创建新的chain对象,并添加proceed()方法中传递进来的对象,提供给后面的拦截器使用,这包括StreamAllocation, Connection, HttpCodec三个主要的类的对象,其中最后一个就是所谓的流对象(不太清楚okhttp为何如此命名)。
RetryAndFollowUpInterceptor#intercept()方法中其他的逻辑我们在之后的文章中再做分析,这里我们主要是需要了解StreamAllocation对象,以及继续熟悉okhttp中拦截器链的执行机制,下面我们再来分析Connection#intercept()方法:

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

这里没有对代码做简化,可以看出ConnectionInterceptor的代码很简单,我们暂且忽略doExtensiveHealthChecks,这个跟具体的HTTP协议的规则有关, 这里最关键的是streamAllocation的方法,即newStream()方法,该方法负责寻找连接,并在该连接上建立新的流对象,并返回,而connection()方法只不过是分会找到的连接而已,这里看到三个对象在这时都已经分别创建出来,此时传递到InterceptorChain上,CallServerInterceptor就可以使用这些对象完成数据通信了,这一点在下一篇文章中分析流对象时再做分析,这里我们只是关注连接,那么我们就要StreamAllocation开始。

1. 连接与流的桥梁

首先我们明白HTTP通信执行网络请求需要在连接之上建立一个新的流执行该任务, 我们这里将StreamAllocation称之为连接与流的桥梁, 它负责为一次请求寻找连接并建立流,从而完成远程通信,所以StreamAllocation与请求,连接,流都相关,因此我们首先熟悉一下这三个概念,对于它们三个, StreamAllocation类之前的注释已经给出了解释,这里我们首先看一下它的注释:

/**
 * This class coordinates the relationship between three entities:
 *
 * <ul>
 *     <li><strong>Connections:</strong> physical socket connections to remote servers. These are
 *         potentially slow to establish so it is necessary to be able to cancel a connection
 *         currently being connected.
 *     <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on
 *         connections. Each connection has its own allocation limit, which defines how many
 *         concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream
 *         at a time, HTTP/2 typically carry multiple.
 *     <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and
 *         its follow up requests. We prefer to keep all streams of a single call on the same
 *         connection for better behavior and locality.
 * </ul>
 ...
 **/

注释说明的很清楚,Connection时建立在Socket之上的物理通信信道,而Stream则是代表逻辑的HTTP请求/响应对, 至于Call,是对一次请求任务或是说请求过程的封装,在第一篇文章中我们已经做出介绍,而一个Call可能会涉及多个流(如请求重定向,auth认证等情况), 而Okhttp使用同一个连接完成这一系列的流上的请求任务,这一点的实现我们将在介绍RetryInterceptor的部分中说明。

下面我们来思考StreamAllocation所要解决的问题,简单来讲就是在合适的连接之上建立一个新的流,这个问题划分为两步就是寻找连接和新建流。那么,StreamAllocation的数据结构中应该包含Stream(okhttp中将接口的名字定义为HttpCodec), Connction, 其次为了寻找合适的连接,应该包含一个URL地址, 连接池ConnectionPool, 而方法则应该包括我们前面提到的newStream()方法, 而findConnection则是为之服务的方法,其次在完成请求任务之后应该有finish方法用来关闭流对象,还有终止和取消等方法, 以及释放资源的方法。下面我们从StreamAllocation中寻找对应的属性和方法, 首先来看它的属性域:

public final class StreamAllocation {
  public final Address address;
  private Route route;
  private final ConnectionPool connectionPool;
  private final Object callStackTrace;

  // State guarded by connectionPool.
  private final RouteSelector routeSelector;
  private int refusedStreamCount;
  private RealConnection connection;
  private boolean released;
  private boolean canceled;
  private HttpCodec codec;

  public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
    this.connectionPool = connectionPool;
    this.address = address;
    this.routeSelector = new RouteSelector(address, routeDatabase());
    this.callStackTrace = callStackTrace;
  }
...

这里我们看并没有使用URL代表一个地址,而是使用Address对象, 如果查看其代码可以看到它封装了一个URL, 其中还包括dns, 代理等信息,可以更好地满足HTTP协议中规定的细节。使用Address对象也可以直接ConnectionPool中查找对应满足条件的连接,同时Address对象可以用于在RoutSelector查询一个合适的路径Rout对象,该Route对象可以用于建立连接, 然后我们忽略标志位和统计信息,剩下的则是Connection和HttpCodec对象,这里我们先忽略调用栈callStackTrace, 不考虑。以上就是它所有的属性域,那么下面我们再来看它最重要的方法,即新建流的方法:

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);

      HttpCodec resultCodec;
      if (resultConnection.http2Connection != null) {
        resultCodec = new Http2Codec(client, this, resultConnection.http2Connection);
      } else {
        resultConnection.socket().setSoTimeout(readTimeout);
        resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
        resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
        resultCodec = new Http1Codec(
            client, this, resultConnection.source, resultConnection.sink);
      }

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

其流程很明确,我们先不考虑HTTP2.0的情况, 流程就是找到合适的良好连接,然后实例化HttpCodec对象,并设置对应的属性,返回该流对象即可,该流对象依赖连接的输入流和输出流,从而可以在流之上进行请求的写入和响应的读出。下面来开findConnection的代码:

 /**
  * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
  * until a healthy connection is found.
  */
 private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
     int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
     throws IOException {
   while (true) {
     RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
         connectionRetryEnabled);

     // If this is a brand new connection, we can skip the extensive health checks.
     synchronized (connectionPool) {
       //successCount记录该连接上执行流任务的次数,为零说明是新建立的连接
       if (candidate.successCount == 0) {
         return candidate;
       }
     }

     // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
     // isn't, take it out of the pool and start again.
     if (!candidate.isHealthy(doExtensiveHealthChecks)) {  // 判断连接是否可用
       noNewStreams(); //在该方法中会设置该Allocation对应的Connection对象的noNewStream标志位,标识这在该连接不再使用,在回收的线程中会将其回收
       continue;
     }

     return candidate;
   }
 }

 /**
  * Returns a connection to host a new stream. This prefers the existing connection if it exists,
  * then the pool, finally building a new connection.
  */
 private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
     boolean connectionRetryEnabled) throws IOException {
   Route selectedRoute;
   synchronized (connectionPool) {

   //一系列条件判断
   ...

     RealConnection allocatedConnection = this.connection;
     if (allocatedConnection != null && !allocatedConnection.noNewStreams) {//noNewStream是一个标识为,标识该连接不可用
       return allocatedConnection;
     }

     // Attempt to get a connection from the pool.
     //可以在OkhttpClient中查看到该方法, 其实就是调用connnctionPool.get(address, streamAllocation);

     RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);  
     if (pooledConnection != null) {
       this.connection = pooledConnection;
       return pooledConnection;
     }

     selectedRoute = route;
   }

   if (selectedRoute == null) {
     selectedRoute = routeSelector.next(); //选择下一个路线Rout
     synchronized (connectionPool) {
       route = selectedRoute;
       refusedStreamCount = 0;
     }
   }
   RealConnection newConnection = new RealConnection(selectedRoute);

   synchronized (connectionPool) {
     acquire(newConnection);   //1. 将该StreamAllocation对象,即this 添加到Connection对象的StreamAllocation引用列表中,标识在建立新的流使用到了该连接
     Internal.instance.put(connectionPool, newConnection);  //2. 将新建的连接加入到连接池, 与get方法类型,也是在OkHttpClient调用的pool.put()方法
     this.connection = newConnection;
     if (canceled) throw new IOException("Canceled");
   }

   newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
       connectionRetryEnabled);   //3. 连接方法,将在介绍Connection的部分介绍
   routeDatabase().connected(newConnection.route());  //4.新建的连接一定可用,所以将该连接移除黑名单

   return newConnection;
 }

 ...
 /**
  * Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
  * {@link #release} on the same connection.
  */
 public void acquire(RealConnection connection) {
   assert (Thread.holdsLock(connectionPool));
   connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
 }

这两个方法完成了寻找合适的连接的功能,这里我们可以将其分成两种类型:

  1. 从连接池中复用连接,这一点我们在连接管理部分中会有介绍,其实就是从维护的队列中找到合适的连接并返回,查找的依据就是Address对象;
  2. 新建连接,新建连接需要首先查找一个合适的路径,然后在该路径上实例化一个新的connection对象,在建立一个新的连接以后需要执行一系列的步骤,在代码中已经用注释的方式分四步标出。
    在或许到连接以后还需要执行检查过程,通常来说以上两种类型选择的连接都需要执行检查,只是这里新的连接一定可用,所以会跳过检查,其实检查的过程就是查看该连接的socket是否被关闭以及是否可以正常使用,有兴趣的可以自行查看代码。
    在建立新的连接以后需要处理一些事情,代码中的中文注释分了四步将其标出,其中需要说明第一步,acquire方法,connection中维护这一张在一个连接上的流的链表,该链表保存的是StreamAllocation的引用 Connction的该链表为空时说明该连接已经可以回收了,这部分在连接管理部分会有说明。其实在调用ConnectionPool.get()方法时,传入StreamAllocation对象也是为了调用StreamAllocation#acquire方法,将该StreamAllocation对象的引用添加到连接对应的链表中,用于管理一个连接上的流。

此外还需要说明的两点,一是关于Internal, 这是一个抽象类,该类只有一个实现类,时HttpClient的一个匿名内部类,该类的一系列方法都是一个功能,就是将okhttp3中一些包访问权限的方法对外提供一个public的访问方法,至于为什么这么实现,目前还不太清楚,估计是在Okhttp的框架中方便使用包权限的方法, 如ConnectionPool的put和get方法,在Okhttp的框架代码中通过Internal代理访问, 而在外部使用时无法访问到这些方法,至于还有没有其他考虑有清楚的同学欢迎赐教。
需要说明的第二点是RouteDatabase是一个黑名单,记录着不可用的路线,避免在同一个坑里栽倒两次,connect()方法就是将该路线移除黑名单,这里为了不影响StreamAllocation分析的连贯性,本文将RoutSelector和RouteDatabase等与路线选择相关的代码放到附录部分,这里我们暂且先了解它的功能而不关注它的实现。

至此就分析完了建立新流的过程,那么剩下的就是释放资源的逻辑,由于这一部分很多地方都与连接的复用管理,流的操作以及RetryAndFollowUpInterceptor中的逻辑有关,所以在这里暂且略过,后续部分再做分析,我们此处重点是了解如何查询连接,如何新建连接以及如何新建流对象。

2. 连接

下面开始分析连接对象,在okhttp中定义了Connection的接口,而该接口在okhttp只有一个实现类,即RealConnetion,下面我们重点分析该类。该类的主要功能就是封装Socket并对外提供输入输出流,那么它的内部结构也很容易联想到,它应该持有一个Socket, 并提供一个输入流一个输入流,在功能方法中对外提供connect()方法建立连接。这里由于okhttp是支持Https和HTTP2.0,如果不考虑这两种情况,RealConnection的代码将会比较简单,下面首先来看它的属性域:

public final class RealConnection extends Http2Connection.Listener implements Connection {
  private final Route route;

  /** The low-level TCP socket. */
  private Socket rawSocket;

  /**
   * The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
   * {@link #rawSocket} itself if this connection does not use SSL.
   */
  public Socket socket;
...
  private Protocol protocol;
...
  public int successCount;
  public BufferedSource source;
  public BufferedSink sink;
  public int allocationLimit;
  public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
  public boolean noNewStreams;
  public long idleAtNanos = Long.MAX_VALUE;

  public RealConnection(Route route) {
    this.route = route;
  }
...

RealConnection通过一个Route路线(或者说路由或路径)来建立连接,它封装一个Socket, 由于考虑到Https的情况,socket有可能时SSLSocket或者RawSocket, 所以这里有两个socket域, 一个在底层,一个在上层,由于我们不考虑Https的情况,那么两个就是等价的,我们只需要明白内部建立rawSocket, 对外提供socket就可以了(注释中也已经明白解释)。另外除了输入流和输出流之外还有连接所使用到的协议,在连接方法中会用到,最后剩下的就是跟连接管理部分相关的统计信息,allocationLimit是分配流的数量上限,对应HTTP1.1来说它就是1, allocations在StreamAllocation部分我们已经熟悉,它是用来统计在一个连接上建立了哪些流,通过StreamAllocation的acquire方法和release方法可以将一个allocation对象添加到链表或者移除链表,(不太清楚这两个方法放在connection中是不是更合理一些), noNewStream之前说过可以简单理解为它标识该连接已经不可用,idleAtNanos记录该连接处于空闲状态的时间,这些将会在第三部分连接管理中做介绍,这里暂且略过不考虑。

下面就开始看它的连接方法, connect()方法

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) {
    if (protocol != null) throw new IllegalStateException("already connected");
    ...
    while (protocol == null) {
      try {
          buildConnection(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);
      } catch (IOException e) {
        ...
    }
  }

这里对代码做了最大的简化,主要去掉了异常处理的部分以及Https需要考虑的部分,从代码中可以看出,建立连接是通过判断protocol是否为空来确定是否已经建立 连接的, 下面就继续看buildeConnection()方法:

 private void buildConnection(int connectTimeout, int readTimeout, int writeTimeout,
      ConnectionSpecSelector connectionSpecSelector) throws IOException {
    connectSocket(connectTimeout, readTimeout);
    establishProtocol(readTimeout, writeTimeout, connectionSpecSelector);
  }

  private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    //根据是否需要代理建立不同的Socket
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      //内部就是调用socket.connect(InetAddress, timeout)方法建立连接
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ...
    }

    //获取输入输出流
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));
  }

  private void establishProtocol(int readTimeout, int writeTimeout,
      ConnectionSpecSelector connectionSpecSelector) throws IOException {
    if (route.address().sslSocketFactory() != null) {
      ...
    } else {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
    }

    if (protocol == Protocol.HTTP_2) {
      ...
    } else {
      //HTTP1.1及以下版本中, 每个连接只允许有一个流
      this.allocationLimit = 1;
    }
  }

这里同样对代码做了简化,并在重要的地方做了注释, 流程也很清楚,就是建立socket连接,并获取输入输出流,然后设置正确的协议,此时connect()方法就可以跳出while循环,完成连接。
至此,就完成了连接过程,所以如果除去HTTPs和HTTP2的部分,RealConnection的代码很简单,不过对于HTTPS和HTTP2的处理还是挺多,后续还会继续学习。下面再看另外一个概念,流。

3. 连接管理

对于连接的管理主要是分析ConnectionPool,以连接池的形式管理连接的复用, okhttp中尽可能对于相同地址的远程通信复用同一个连接,这样就节省了连接的代价。那么我们现在明白了ConnectionPool所要解决的问题就可以去思考它应当如何实现,它应当具备的功能可以简单分为三个方法,get, put, cleanup,即获取添加和清理的方法。其中最重要也是最复杂的是清理,即在连接池放满的情况下,如何选择需要淘汰的连接。这里需要考虑的指标包括连接的数量,连接的空闲时长等,那么下面我们来看ConnectionPool的代码:

/**
 * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
 * share the same {@link Address} may share a {@link Connection}. This class implements the policy
 * of which connections to keep open for future use.
 */
public final class ConnectionPool {
  /**
   * Background threads are used to cleanup expired connections. There will be at most a single
   * thread running per connection pool. The thread pool executor permits the pool itself to be
   * garbage collected.
   */
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  boolean cleanupRunning;
  private final Runnable cleanupRunnable = new Runnable() {
    ...
  };

  /** The maximum number of idle connections for each address. */
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;

  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();


  /**
   * Create a new connection pool with tuning parameters appropriate for a single-user application.
   * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
   * this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
   */
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

首先来看它的属性域,最主要的就是connections,可见在ConnectionPool内部以队列的方式存储连接,而routeDatabase是一个黑名单,用来记录不可用的route,但是看代码目测ConnectionPool中并没有使用它,可能后续还有其他用处,此处不做分析。剩下的就是与清理相关的,从名字则可以看出他们的用途,最开始的三个分别是执行清理任务的线程池,标志位以及清理任务,而maxIdleConnections和keepAliveDurationNs则是清理中淘汰连接的指标,这里需要说明的是maxIdleConnections是值每个地址上最大的空闲连接数(如注释所说),看来okhttp只是限制了与同一个远程服务器的空闲连接数量,对整体的空闲连接数量并没有做限制,但是从代码来看并不是如此,该值时标识该连接池中整体的空闲连接的最大数量,当一个连接数量超过这个值时则会触发清理,这里留有疑问,不知注释是何意, 另外还有一个疑问是okhttp并没有限制一个连接池中连接的最大数量,而只是限制了连接池中的最大空闲连接数量,由于不懂HTTP协议,水平也有限,不太清楚这里需不需要限制,有了解的欢迎在评论中告知,不胜感激。
最后我们可以从默认的构造器中看出okhttp允许每个地址同时可以有五个连接,每个连接空闲时间最多为五分钟。

下面我们首先来看较为简单一些的get和put方法

/** Returns a recycled connection to {@code address}, or null if no such connection exists. */
  RealConnection get(Address address, StreamAllocation streamAllocation) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.allocations.size() < connection.allocationLimit
          && address.equals(connection.route().address)
          && !connection.noNewStreams) {
        streamAllocation.acquire(connection);
        return connection;
      }
    }
    return null;
  }

获取一个connection时按照Address来匹配,而建立连接也是通过Address查询一个合适的Route来建立的,当匹配到连接时,会将新建立的流对应的StreamAllocation添加到connection.allocations中, 如果这里调用connection.allocations.add(StreamAllocation)或者Connection自定义的add方法会不会更清晰一些,不太明白为什么将该任务放在了StreamAllocation的acquire方法中,此处的streamAllocation的acquire方法其实也就是做了这件事情,用来管理一个连接上的流。

put方法

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

put方法更为简单,就是异步触发清理任务,然后将连接添加到队列中。那么下面开始重点分析它的清理过程,首先来看清理任务的定义:

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

逻辑也很简单,就是调用cleanup方法执行清理,并等待一段时间,持续清理,而等待的时间长度时有cleanup函数返回值指定的,那么我们继续来看cleanup函数

/**
  * Performs maintenance on this pool, evicting the connection that has been idle the longest if
  * either it has exceeded the keep alive limit or the idle connections limit.
  *
  * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
  * -1 if no further cleanups are required.
  */
 long cleanup(long now) {
   int inUseConnectionCount = 0;
   int idleConnectionCount = 0;
   RealConnection longestIdleConnection = null;
   long longestIdleDurationNs = Long.MIN_VALUE;

   // Find either a connection to evict, or the time that the next eviction is due.
   synchronized (this) {
     for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
       RealConnection connection = i.next();

       // If the connection is in use, keep searching.
       if (pruneAndGetAllocationCount(connection, now) > 0) {
         inUseConnectionCount++;
         continue;
       }
       //统计空闲连接的数量
       idleConnectionCount++;

       // If the connection is ready to be evicted, we're done.
       long idleDurationNs = now - connection.idleAtNanos;
       if (idleDurationNs > longestIdleDurationNs) {//找出空闲时间最长的连接以及对应的空闲时间
         longestIdleDurationNs = idleDurationNs;
         longestIdleConnection = connection;
       }
     }


     if (longestIdleDurationNs >= this.keepAliveDurationNs
         || idleConnectionCount > this.maxIdleConnections) {
       // We've found a connection to evict. Remove it from the list, then close it below (outside
       // of the synchronized block).
       connections.remove(longestIdleConnection);  //在符合清理条件下,清理空闲时间最长的连接
     } else if (idleConnectionCount > 0) {
       // A connection will be ready to evict soon.
       return keepAliveDurationNs - longestIdleDurationNs;  //不符合清理条件,则返回下次需要执行清理的等待时间
     } else if (inUseConnectionCount > 0) {
       // All connections are in use. It'll be at least the keep alive duration 'til we run again.
       return keepAliveDurationNs;  //没有空闲的连接,则隔keepAliveDuration之后再次执行
     } else {
       // No connections, idle or in use.
       cleanupRunning = false;  //清理结束
       return -1;
     }
   }

   closeQuietly(longestIdleConnection.socket());  //关闭socket资源

   // Cleanup again immediately.
   return 0;  //这里是在清理一个空闲时间最长的连接以后会执行到这里,需要立即再次执行清理
 }

这里的首先统计空闲连接数量,然后查找最长空闲时间的连接以及对应空闲时长,然后判断是否超出最大空闲连接数量或者超过最大空闲时长,满足其一则执行清理最长空闲时长的连接,然后立即再次执行清理,否则会返回对应的等待时间,代码中中文注释已做说明。方法中用到了一个方法来查看一个连接上分配流的数量,这里不再贴出,有兴趣的可以自行查看。
接下来我们梳理一下清理的任务,清理任务是异步执行的,遵循两个指标,最大空闲连接数量和最大空闲时长,满足其一则清理空闲时长最大的那个连接,然后循环执行,要么等待一段时间,要么继续清理下一个连接,直到清理所有连接,清理任务才可以结束,下一次put方法调用时,如果已经停止的清理任务则会被再次触发开始。
ConnectionPool的主要方法就是这三个,其余的则是工具私有方法或者getter, setter方法,另外还有一个需要介绍一下我们在StreamAllocation中遇到的方法 connectionBecameIdle标识一个连接处于了空闲状态,即没有流任务,那么就需要调用该方法,有ConnectionPool来决定是否需要清理该连接:

 /**
   * Notify this pool that {@code connection} has become idle. Returns true if the connection has
   * been removed from the pool and should be closed.
   */
  boolean connectionBecameIdle(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (connection.noNewStreams || maxIdleConnections == 0) {
      connections.remove(connection);
      return true;
    } else {
      notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
      return false;
    }
  }

这里noNewStream标志位之前说过,它可以理解为该连接已经不可用,所以可以直接清理,而maxIdleConnections==0则标识不允许有空闲连接,也是可以直接清理的,否则唤醒清理任务的线程,执行清理方法。

至此则分析完了连接管理的逻辑,其实就是连接复用,主要包括get, put , cleanup三个方法,重点时清理任务的执行,我们可以在OkHttpClient中配置

后记

关于okhttp的连接和连接管理,逻辑还是比较容易理解,但是StreamAllocation的概念在刚接触时还是比较令人费解,但是为了逻辑顺序本篇文章还是从StreamAllocation开始分析,进而引入了连接和连接复用管理部分,读者可在理解了连接和连接复用管理部分的代码以后在回头再去读StreamAllocation的代码或许更容易理解一些。在最初的计划中是将连接与流一起分析,正是因为StreamAlloc

附录

在前面的分析中,我们了解到Connection需要一个Route路径对象来建立连接,在这一部分我们主要分析Route和RouteSelector,来学习okhttp中是如何通过Address对象选择合理的路径对象的,本部分可能比较鸡肋,对整个网络请求的流程并没有太大影响,因此有兴趣的同学可以继续阅读,没有兴趣可以略过,并不影响其他部分的分析。

首先我们先来看一下Address类,这个类用来表示需要连接远程主机的地址,它内部包含url, proxy, dns等信息,还有一些关于Https会用到的验证信息,这里我们不再分析其源码, 代码很简单,只是封装了一些属性域而已。但是它的注释还是可以了解一下,可以更好地把握这个类的作用:

/**
 * A specification for a connection to an origin server. For simple connections, this is the
 * server's hostname and port. If an explicit proxy is requested (or {@linkplain Proxy#NO_PROXY no
 * proxy} is explicitly requested), this also includes that proxy information. For secure
 * connections the address also includes the SSL socket factory, hostname verifier, and certificate
 * pinner.
 *
 * <p>HTTP requests that share the same {@code Address} may also share the same {@link Connection}.
 */

通过阅读类的注释,我们就可以发现,其实它就是对于连接对应的远程连接的地址说明,一般情况下会是主机名和端口号,也就是url, 在有代理的情况下还会包含代理信息,而对于Https会包含一些其他的验证信息等。

下面我们再来看Route对象,我们还是来看它的注释:

/**
 * The concrete route used by a connection to reach an abstract origin server. When creating a
 * connection the client has many options:
 *
 * <ul>
 *     <li><strong>HTTP proxy:</strong> a proxy server may be explicitly configured for the client.
 *         Otherwise the {@linkplain java.net.ProxySelector proxy selector} is used. It may return
 *         multiple proxies to attempt.
 *     <li><strong>IP address:</strong> whether connecting directly to an origin server or a proxy,
 *         opening a socket requires an IP address. The DNS server may return multiple IP addresses
 *         to attempt.
 * </ul>
 *
 * <p>Each route is a specific selection of these options.
 */

从注释中我们可以看出,首先Route对象可以用于建立连接,而选择一个合适的Route,有很多种选项,第一个就是Proxy代理,代理可以明确指定,也可以由ProxySelector中返回多个可用的代理,第二个选项就是IP地址,在Java中表示就是InetAddress对象,dns会返回一个服务器对应的多个IP地址,这两个选项在RouteSelector中马上就会看到,下面我们来分析RouteSelector是如何选择Route对象的。

首先来看它的属性域:

/**
 * Selects routes to connect to an origin server. Each connection requires a choice of proxy server,
 * IP address, and TLS mode. Connections may also be recycled.
 */
public final class RouteSelector {
  private final Address address;
  private final RouteDatabase routeDatabase;

  /* The most recently attempted route. */
  private Proxy lastProxy;
  private InetSocketAddress lastInetSocketAddress;

  /* State for negotiating the next proxy to use. */
  private List<Proxy> proxies = Collections.emptyList();
  private int nextProxyIndex;

  /* State for negotiating the next socket address to use. */
  private List<InetSocketAddress> inetSocketAddresses = Collections.emptyList();
  private int nextInetSocketAddressIndex;

  /* State for negotiating failed routes */
  private final List<Route> postponedRoutes = new ArrayList<>();
  ...
}

注释说的也很清楚,连接远程服务器,每一个连接都需要选择一个代理和IP地址,这里我们忽略与HTTPs有关的东西,下面的属性域也就很容易理解,address代表地址, routeDatabase是一个黑名单,存储那些不可用的Route对象,接下来的六个属性则是与Proxy和InetSocketAddress相关了,包括最近使用的,可以选择的,以及下一个可选择的索引值,最后postponedRoutes表示那些加入到黑名单中的且符合地址条件(即满足条件但是之前有过不可用记录的Route对象)所有Route对象集合,当没有可选择余地时会选择使用它们,总比没有要好。

首先我们从构造器开始:

  public RouteSelector(Address address, RouteDatabase routeDatabase) {
    this.address = address;
    this.routeDatabase = routeDatabase;

    resetNextProxy(address.url(), address.proxy());
  }

这里我们看到调用了resetNextProxy()方法,其实可以将其理解为一个初始化或者重置Proxy方法,我们来看起代码:

/** Prepares the proxy servers to try. */
  private void resetNextProxy(HttpUrl url, Proxy proxy) {
    if (proxy != null) {
      // If the user specifies a proxy, try that and only that.
      proxies = Collections.singletonList(proxy);
    } else {
      // Try each of the ProxySelector choices until one connection succeeds.
      List<Proxy> proxiesOrNull = address.proxySelector().select(url.uri());
      proxies = proxiesOrNull != null && !proxiesOrNull.isEmpty()
          ? Util.immutableList(proxiesOrNull)
          : Util.immutableList(Proxy.NO_PROXY);
    }
    nextProxyIndex = 0;
  }

逻辑流程也容易理解,优先使用在address对象中的指定的proxy对象,该方法由构造器调用,传递进来的,如果该Proxy对象为空,则从address.proxySelector中选择一个Proxy, 如果还没有则使用Proxy.NO_PROXY,并将索引初始化为0,接下来我们一并看一下resetInetSocketAddress()方法:

  /** Prepares the socket addresses to attempt for the current proxy or host. */
  private void resetNextInetSocketAddress(Proxy proxy) throws IOException {
    // Clear the addresses. Necessary if getAllByName() below throws!
    inetSocketAddresses = new ArrayList<>();

    String socketHost;
    int socketPort;
    if (proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.SOCKS) {
      socketHost = address.url().host();
      socketPort = address.url().port();
    } else {
      SocketAddress proxyAddress = proxy.address();
      ...
      InetSocketAddress proxySocketAddress = (InetSocketAddress) proxyAddress;
      socketHost = getHostString(proxySocketAddress);
      socketPort = proxySocketAddress.getPort();
    }
    ...
    if (proxy.type() == Proxy.Type.SOCKS) {
      inetSocketAddresses.add(InetSocketAddress.createUnresolved(socketHost, socketPort));
    } else {
      // Try each address for best behavior in mixed IPv4/IPv6 environments.
      List<InetAddress> addresses = address.dns().lookup(socketHost);
      for (int i = 0, size = addresses.size(); i < size; i++) {
        InetAddress inetAddress = addresses.get(i);
        inetSocketAddresses.add(new InetSocketAddress(inetAddress, socketPort));
      }
    }

    nextInetSocketAddressIndex = 0;
  }

这里省略了部分条件判断,逻辑流程同意很清楚,首先是获取到主机地址和端口号,然后创建InetSocketAddress或者通过address中的dns查询所有有可能的InetSocketAddress地址。在有了以上的了解以后我们就可以看RouteSelector中最重要的方法,即选择一个Route对象,next()方法:

  public Route next() throws IOException {
    // Compute the next route to attempt.
    if (!hasNextInetSocketAddress()) {
      if (!hasNextProxy()) {
        if (!hasNextPostponed()) {
          throw new NoSuchElementException();
        }
        return nextPostponed();
      }
      lastProxy = nextProxy();
    }
    lastInetSocketAddress = nextInetSocketAddress();

    Route route = new Route(address, lastProxy, lastInetSocketAddress);
    if (routeDatabase.shouldPostpone(route)) {
      postponedRoutes.add(route);
      // We will only recurse in order to skip previously failed routes. They will be tried last.
      return next();
    }

    return route;
  }

这段代码第一次看可能有些迷惑,不过仔细去分析其实逻辑很清晰,首先我们假定还有可用的InetSocketAddress,那么通过nextInetSocketAddress()直接获取并创建Route对象,然后检查是否在黑名单中,在黑名单则加入候选队列,其实就是最后迫于无奈才会使用。如果没有了InetSocketAddress,则选择下一个可用的代理Proxy,在nextProxy中会调用resetNextInetSocketAddress()方法重置InetSocketAddress的队列,继续查询下个可用的InetSocketAddress对象,如果没有可用的Proxy,则从候选队列中postpone中选择,即迫于无奈的选择,如果候选队列也为空那就无能为力了,只能抛异常。这样就分析完了整个选择路径的过程,过程的逻辑很清晰也很容易理解,另外该类中还有一些其他的方法,如用于更新黑名单等,有兴趣的可以自行查阅。

分析到这里,整个连接的过程就分析结束了,可以总结为在网络请求过程中,首先创建一个StreamAllocation的对象,然后调用其newStream()方法,查找一个可用连接,要么复用连接,要么新建连接,复用连接则根据address从连接池中查找,新建连接则是根据address查找一个Route对象建立连接,建立连接以后会将该连接添加到连接池中,同时连接池的清理任务停止的情况下,添加新的连接进去会触发开启清理任务。这是建立连接和管理连接的整个过程,当拥有连接以后,StreamAllocation就会在连接上建立一个流对象,该流持有connection的输入输出流,也就是socket的输入输出流,通过它们最终完成数据通信的过程,所以下一节中将会重点分析流对象Http1Codec,以及数据通信的过程。

上一篇下一篇

猜你喜欢

热点阅读