OkHttp 原理解析 (三)

2019-06-10  本文已影响0人  莫库施勒
OkHttp.jpg

ConnectInterceptor

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

    return realChain.proceed(request, transmitter, exchange);
  }
}

之前我们在RetryAndFollowUpInterceptor 已经 prepareToConnect() 做过准备了,然后就是在 BridgeIntercepter 中添加一些请求头和相应头,接着是CacheIntercepter 看是否可以直接使用缓存,如果有缓存的话也不会走到这里,如果没有缓存就需要 ConnectIntercepter 借用 Transmitter来桥接应用层和网络层,通过 ExchangeFinder 中的 finHealthyConnection()connectionPool 中找到一个可用的连接,这个连接可能是复用的,并 connect(),从而得到 输入/输出 流 (source/sink) ,返回一个 ExchangeCallServerIntercepter , 通过这个 Exchange 就可以添加请求头和请求体,并读取响应头和响应体,来交给上面的 Intercepter,层层向上传递。

// ExchangeFinder.java
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; // This is a fresh attempt.

      // 尝试复用已分配 Connection
      releasedConnection = transmitter.connection;
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      if (transmitter.connection != null) {
        // 得到了已分配的connection
        result = transmitter.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // 尝试获取已回收的connection
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry;
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // 从connectionPool中找到了就返回
      return result;
    }

    // 如果需要路由选择器,就创建。这是一个阻塞操作
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        // 根据 IP addresses 集合, 再次尝试从 connectionPool中获取connection。这里与上次的区别是 routes不为空
        routes = routeSelection.getAll();
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // 这里就创建一个 Connection并指派
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    // 得到了connection,返回
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // 进行 TCP + TLS handshakes. 一个阻塞操作
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // 将 connection进行合并,只有在多个connection 复用一个 host的时候
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // We lost the race! Close the connection we created and return the pooled connection.
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;
      } else {
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

以上代码主要做的事情有:

  1. StreamAllocation的connection如果可以复用则复用;
  2. 如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回;
  3. 如果连接池里没有,则new一个RealConnection对象;
  4. 调用RealConnection的connect()方法发起请求;
  5. 将RealConnection对象存进连接池中,以便下次复用;
  6. 返回RealConnection对象。

RealConnection

 // Connection 接口
 Route route(); //返回一个路由
 Socket socket();  //返回一个socket
 Handshake handshake();  //如果是一个https,则返回一个TLS握手协议
 Protocol protocol(); //返回一个协议类型 比如 http1.1 等或者自定义类型 

RealConnection是Connection的实现类,代表着链接socket的链路,如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路。

 // RealConnection 成员变量
  private final ConnectionPool connectionPool;
  private final Route route;

  //下面这些字段,通过connect()方法初始化赋值,且不会再次赋值

  private Socket rawSocket; //底层 TCP socket

  private Socket socket;  //应用层socket

  private Handshake handshake;  //握手
 
  private Protocol protocol;  //协议
  
  private Http2Connection http2Connection; // http2的链接

  // 通过source和sink,与服务器交互的输入输出流
  private BufferedSource source;
  private BufferedSink sink;

  // 下面这个字段是表示链接状态的字段,并且有connectPool统一管理

  // 如果noNewStreams被设为true,则noNewStreams一直为true,不会被改变,
  // 并且这个链接不会再创建新的stream流
  public boolean noNewStreams;
  
  //成功的次数
  public int successCount;

  //此链接可以承载最大并发流的限制,如果不超过限制,可以随意增加
  public int allocationLimit = 1;

由上面的我们可以得出一些结论:

首先是connect() 方法

  public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
      EventListener eventListener) {
    if (protocol != null) throw new IllegalStateException("already connected");
    // 创建一个 Selector 来选择 connectionSpec 也就是线路
    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    ...
    // 尝试连接
    while (true) {
      try {
        // 如果要求隧道模式,建立通道连接,通常不会使用这种
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break;
          }
        } else {
          // socket 连接
          connectSocket(connectTimeout, readTimeout, call, eventListener);
        }
        // 建立 https 连接
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
        eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
        break;
      } catch (IOException e) {
        ...
      }
    }
    if (route.requiresTunnel() && rawSocket == null) {
      ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
          + MAX_TUNNEL_ATTEMPTS);
      throw new RouteException(exception);
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }

socket 连接

  private void connectSocket(int connectTimeout, int readTimeout, Call call,
      EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
     // 根据代理类型来选择socket是代理还是直连类型
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
      // 为支持不同的平台,实际是 socket.connect(address, connectTimeout)
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    try {
      // 得到输入/输出流
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }

隧道连接

  private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
      throws IOException {
    // 创建隧道请求
    Request tunnelRequest = createTunnelRequest();
    HttpUrl url = tunnelRequest.url();
    int attemptedConnections = 0;
    int maxAttempts = 21;
    while (true) {
      if (++attemptedConnections > maxAttempts) {
        throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
      }
      // 建立Socket连接
      connectSocket(connectTimeout, readTimeout);
      // 建立隧道
      tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);

      if (tunnelRequest == null) break; // Tunnel successfully created.

      closeQuietly(rawSocket);
      rawSocket = null;
      sink = null;
      source = null;
    }
  }

它们调用connectSocket 中参数 Call 是不一样的。

connectSocket中的代理连接建立的过程

  1. 没有设置代理的情况下,则直接与HTTP服务器建立TCP连接
  2. 设置了SOCKS代理的情况下,创建Socket时,为其传入proxy,连接时还是以HTTP服务器为目标。
  3. 设置了HTTP代理时,如果是HTTP请求,则与HTTP代理服务器建立TCP连接。HTTP代理服务器解析HTTP请求/响应的内容,并根据其中的信息来完成数据的转发。
  4. 设置了HTTP代理时,如果是 HTTPS/HTTP2请求,与HTTP服务器建立通过HTTP代理的隧道连接。HTTP代理不再解析传输的数据,仅仅完成数据转发的功能。此时HTTP代理的功能退化为如同SOCKS代理类似。
  5. 设置了代理类时,HTTP的服务器的域名解析会交给代理服务器执行。如果是HTTP代理,会对HTTP代理的域名做域名解析。

establishProtocol 建立连接过程:

  1. 建立 TLS 连接
    1. 用SSLSocketFactory基于原始的TCP Socket,创建一个SSLSocket, 配置SSLSocket。
    2. configureTlsExtensions 配置 TLS扩展
    3. 进行TLS握手
    4. 获取证书信息。
    5. 对证书进行验证。
    6. 完成HTTP/2的ALPN扩展
    7. 基于前面获取到SSLSocket创建于执行的IO的BufferedSource和BufferedSink等,并保存握手信息以及所选择的协议。
  2. 如果是HTTP 2.0,通过Http2Connection.Builder 建立一个 Http2Connection,通过 http2Connection.start() 和服务器建立连接。

ConnectionPool

管理http和http/2的链接,以减少请求的网络延迟。同一个address将共享同一个connection。实现了连接复用的功能。

public final class ConnectionPool {
  final RealConnectionPool delegate;
}

当前版本将具体的实现委托给了 RealConnectionPool

public final class RealConnectionPool {
  // 后台线程用来清理过期连接,在每一个连接池中最多又一个线程。
  // 这个 executor 允许自己被GC 清理
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
  // 清理任务
  private final Runnable cleanupRunnable = () -> {
    while (true) {
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (RealConnectionPool.this) {
          try {
            RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
  // 过期连接队列
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  // 路由数据库,用来记录不可用的route
  final RouteDatabase routeDatabase = new RouteDatabase();
}

默认情况下,这个连接池最多维持5个连接,且每个链接最多活5分钟。
从 ConnectionPool 获取Connection

  // RealConectionPool.java
  boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
      @Nullable List<Route> routes, boolean requireMultiplexed) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (requireMultiplexed && !connection.isMultiplexed()) continue;
      if (!connection.isEligible(address, routes)) continue;
      transmitter.acquireConnectionNoEvents(connection);
      return true;
    }
    return false;
  }

然后把这个connection 设置到 Transmitter 中去

 // 此方法有两处调用,一个是 findConnection,另一个是 connectionPool.transmitterAcquirePooledConnection()
 // 后一个方法也会在 findConnection处被调用
  void acquireConnectionNoEvents(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));

    if (this.connection != null) throw new IllegalStateException();
    this.connection = connection;
    connection.transmitters.add(new TransmitterReference(this, callStackTrace));
  }

从代码可以看出来,这个connection 必须 isMultiplexed、 isEligible, 才可以
至于添加 connection ,就是异步触发清理任务,然后将连接添加到队列中。

 void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

至于这个清理任务,代码就是上面的 cleanupRunnable

  1. 调用cleanup方法
  2. 等待 connectionBecameIdle() 触发 notifyAll()
    而这个 connectionBecameIdle() 是在 TransmitterreleaseConnectionNoEvents() -> maybeReleaseConnection() -> exchangeMessageDone() -> Exchange.bodyComplete -> complete -> close
    这个 close 属于 ForwardingSource,它的 delegate, 即为 codec.openResponseBodySource(response)

我们现在看一下 cleanup 做了什么

  1. 统计空连接数量
  2. 查找最长空闲时间的连接,以及它的空闲时长
  3. 如果超过了最大连接数或者最大空闲时长,就 remove 掉这个连接
  4. 否则返回一个等待时长,也就是cleanup 的返回值 waitNanos
    然后阻塞相应的时间,如果有了废弃连接就清理,否则,接着等待

cleanup中还有一个方法 pruneAndGetAllocationCount(),它是用来追踪泄露连接的,返回还存活于 connection 的 transmitter 的数量。所谓泄漏,就是还在追踪这个connection 但是程序已经废弃掉他们了。

Transmitter

是OkHttp的应用程序和网络层之间的桥梁。 此类公开了高级应用程序层原语:连接,请求,响应和流。
它持有okhttpclient对象以及RealCall对象。
它支持异步取消,如果是一个 HTTP/2, 取消的是这个流而不是共享的这个连接,但是如果是在进行TLS握手,就会取消整个连接。

ExchangeFinder

它尝试去是为一些可能的变化去找到一条可用的连接,策略如下:

  1. 如果当前 call 已经有了一个连接,能够满足请求,就用相同的连接,做一些初始化修改。
  2. 如果连接池中的一个连接满足这个请求。
  3. 如果没有现存的连接,就创建一个路由列表,并创建一个新连接。如果失败了,就迭代的尝试列表中可用的路由。
上一篇下一篇

猜你喜欢

热点阅读