OkHttp讲解(四)-链接池

2021-08-22  本文已影响0人  涛涛123759

Android知识总结

一、 Interceptor 关联类分析

1.1、 StreamAllocation 的成员变量

简介
StreamAllocation是用来协调Connections、Streams和Calls这三个实体的。

HTTP通信 执行 网络请求Call 需要在 连接Connection 上建立一个新的 流Stream,我们将 StreamAllocation 称之 流 的桥梁,它负责为一次 请求 寻找 连接 并建立 流,从而完成远程通信。

  public final Address address;//地址
  private Route route; //路由
  private final ConnectionPool connectionPool;  //连接池
  private final Object callStackTrace; //日志

  // State guarded by connectionPool.
  private final RouteSelector routeSelector; //路由选择器
  private int refusedStreamCount;  //拒绝的次数
  private RealConnection connection;  //连接
  private boolean released;  //是否已经被释放
  private boolean canceled  //是否被取消了

1.2、 RealConnection

  • okhttp是底层实现框架,与httpURLconnection是同一级别的。OKHttp底层建立网络连接的关键就是RealConnection类。RealConnection类底层封装socket,是真正的创建连接者。分析这个类之后就明白了OKHttp与httpURLconnection的本质不同点。
  • RealConnection是Connection的实现类,代表着链接socket的链路,如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路,而且通过RealConnection代表是连接socket链路,RealConnection对象意味着我们已经跟服务端有了一条通信链路了,在这个类里面实现的三次握手。
  • 在OKHttp里面,记录一次连接的是RealConnection,这个负责连接,在这个类里面用socket来连接,用HandShake来处理握手。
  //链接的线程池
  private final ConnectionPool connectionPool;
  private final Route route;
  //下面这些字段,通过connect()方法开始初始化,并且绝对不会再次赋值
  /** The low-level TCP socket. */
  private Socket rawSocket; //底层socket
  private Socket socket;  //应用层socket
  //握手
  private Handshake handshake;
   //协议
  private Protocol protocol;
   // http2的链接
  private Http2Connection http2Connection;
  //通过source和sink,大家可以猜到是与服务器交互的输入输出流
  private BufferedSource source;
  private BufferedSink sink;
  //下面这个字段是 属于表示链接状态的字段,并且有connectPool统一管理
  //如果noNewStreams被设为true,则noNewStreams一直为true,不会被改变,并且表示这个链接不会再创新的stream流
  public boolean noNewStreams;
  //成功的次数
  public int successCount;
  //此链接可以承载最大并发流的限制,如果不超过限制,可以随意增加
  public int allocationLimit = 1;

RealConnection的connect方法,connect()里面进行了三次握手

public void connect(。。。) {
     //如果协议不等于null,抛出一个异常
    if (protocol != null) throw new IllegalStateException("already connected");

   。。 省略部分代码。。。。

    while (true) {//一个while循环
         //如果是https请求并且使用了http代理服务器
        if (route.requiresTunnel()) {
          connectTunnel(...);
        } else {//
            //直接打开socket链接
          connectSocket(connectTimeout, readTimeout);
        }
        //建立协议
        establishProtocol(connectionSpecSelector);
        break;//跳出while循环
        。。省略部分代码。。。
  }

 //当前route的请求是https并且使用了Proxy.Type.HTTP代理
 public boolean requiresTunnel() {
    return address.sslSocketFactory != null && proxy.type() == Proxy.Type.HTTP;
  }

普通连接的建立过程为建立TCP连接,建立TCP连接的过程为

 private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    //根据代理类型来选择socket类型,是代理还是直连
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
    //连接socket
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      throw new ConnectException("Failed to connect to " + route.socketAddress());
    }
    source = Okio.buffer(Okio.source(rawSocket));//从socket中获取source 对象。
    sink = Okio.buffer(Okio.sink(rawSocket));//从socket中获取sink 对象。
  }

Okio.source(rawSocket)Okio.sink(rawSocket)的原码

  public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
  }

  public static Sink sink(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Sink sink = sink(socket.getOutputStream(), timeout);
    return timeout.sink(sink);
  }

建立隧道连接的过程

  private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
      throws IOException {
      //1、创建隧道请求对象
    Request tunnelRequest = createTunnelRequest();
    HttpUrl url = tunnelRequest.url();
    int attemptedConnections = 0;
    int maxAttempts = 21;
    //一个while循环
    while (true) {
       //尝试连接词说超过最大次数
      if (++attemptedConnections > maxAttempts) {
        throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
      }
      //2、打开socket链接
      connectSocket(connectTimeout, readTimeout);
     //3、请求开启隧道并返回tunnelRequest 
      tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);

     //4、成功开启了隧道,跳出while循环
      if (tunnelRequest == null) break; /

      //隧道未开启成功,关闭相关资源,继续while循环    
      //当然,循环次数超限后抛异常,退出wiile循环
      closeQuietly(rawSocket);
      rawSocket = null;
      sink = null;
      source = null;
    }
  }
  //隧道请求是一个常规的HTTP请求,只是请求的内容有点特殊。最初创建的隧道请求如
  private Request createTunnelRequest() {
    return new Request.Builder()
        .url(route.address().url())
        .header("Host", Util.hostHeader(route.address().url(), true))
        .header("Proxy-Connection", "Keep-Alive") // For HTTP/1.0 proxies like Squid.
        .header("User-Agent", Version.userAgent())
        .build();
  }

二、创建链接流程

ConnectInterceptor里面创建链接,并把创建的链接放如链接池中。具体过程如下:

@Override 
public Response intercept(Chain chain) throws IOException {
   RealInterceptorChain realChain = (RealInterceptorChain) chain;
   Request request = realChain.request();
    //获取可复用流
   StreamAllocation streamAllocation = realChain.streamAllocation();
   boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //创建输出流
   HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
   //根据HTTP/1.x(keep-alive)和HTTP/2(流复用)的复用机制,发起连接
   RealConnection connection = streamAllocation.connection();
   return realChain.proceed(request, streamAllocation, httpCodec, connection);
 }

通过ConnectInterceptor 中的HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);方法调用StreamAllocation#newStream方法

  public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //获取一个健康的连接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      //实例化HttpCodec,如果是HTTP/2则是Http2Codec否则是Http1Codec
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }

然后调用StreamAllocation#findHealthyConnection方法

    private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
                                                 int writeTimeout, int pingIntervalMillis,
                                                 boolean connectionRetryEnabled,
                                                 boolean doExtensiveHealthChecks) throws IOException {
        while (true) {
            //todo 找到一个连接
            RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
                    pingIntervalMillis, connectionRetryEnabled);

            //todo 如果这个连接是新建立的,那肯定是健康的,直接返回
            synchronized (connectionPool) {
                if (candidate.successCount == 0) {
                    return candidate;
                }
            }

            //todo 如果不是新创建的,需要检查是否健康
            if (!candidate.isHealthy(doExtensiveHealthChecks)) {
                //todo 不健康 关闭连接,释放Socket,从连接池移除
                // 继续下次寻找连接操作
                noNewStreams();
                continue;
            }
            return candidate;
        }
    }

然后调用StreamAllocation#findConnection找一个连接

    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
                                          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        boolean foundPooledConnection = false;
        RealConnection result = null;
        Route selectedRoute = null;
        Connection releasedConnection;
        Socket toClose;
        synchronized (connectionPool) {
            if (released) throw new IllegalStateException("released");
            if (codec != null) throw new IllegalStateException("codec != null");
            if (canceled) throw new IOException("Canceled");
            releasedConnection = this.connection;
            toClose = releaseIfNoNewStreams();
            if (this.connection != null) {
                // We had an already-allocated connection and it's good.
                result = this.connection;
                releasedConnection = null;
            }
            if (!reportedAcquired) {
                // If the connection was never reported acquired, don't report it as released!
                releasedConnection = null;
            }

            if (result == null) {
                //todo 尝试从连接池获取连接,如果有可复用的连接,会给第三个参数 this的connection赋值
                //Attempt to get a connection from the pool.
                Internal.instance.get(connectionPool, address, this, null);
                if (connection != null) {
                    foundPooledConnection = true;
                    result = connection;
                } else {
                    selectedRoute = route;
                }
            }
        }
        closeQuietly(toClose);

        if (releasedConnection != null) {
            eventListener.connectionReleased(call, releasedConnection);
        }
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
        }
        if (result != null) {
            // If we found an already-allocated or pooled connection, we're done.
            return result;
        }

        // If we need a route selection, make one. This is a blocking operation.
        //todo 创建一个路由 (dns解析的所有ip与代理的组合)
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
            newRouteSelection = true;
            routeSelection = routeSelector.next();
        }

        synchronized (connectionPool) {
            if (canceled) throw new IOException("Canceled");

            if (newRouteSelection) {
                //todo 根据代理和不同的ip从连接池中找可复用的连接
                List<Route> routes = routeSelection.getAll();
                for (int i = 0, size = routes.size(); i < size; i++) {
                    Route route = routes.get(i);
                    Internal.instance.get(connectionPool, address, this, route);
                    if (connection != null) {
                        foundPooledConnection = true;
                        result = connection;
                        this.route = route;
                        break;
                    }
                }
            }
            //todo 还是没找到,必须新建一个连接了
            if (!foundPooledConnection) {
                if (selectedRoute == null) {
                    selectedRoute = routeSelection.next();
                }
                route = selectedRoute;
                refusedStreamCount = 0;
                result = new RealConnection(connectionPool, selectedRoute);
                acquire(result, false);
            }
        }
        if (foundPooledConnection) {
            eventListener.connectionAcquired(call, result);
            return result;
        }
        // Do TCP + TLS handshakes. This is a blocking operation.
        //todo 实际上就是创建socket连接,但是要注意的是如果存在http代理的情况
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
                connectionRetryEnabled, call, eventListener);
        routeDatabase().connected(result.route());

        Socket socket = null;
        synchronized (connectionPool) {
            reportedAcquired = true;

            // Pool the connection.
            //todo 将新创建的连接放到连接池中
            Internal.instance.put(connectionPool, result);
            if (result.isMultiplexed()) {
                socket = Internal.instance.deduplicate(connectionPool, address, this);
                result = connection;
            }
        }
        closeQuietly(socket);

        eventListener.connectionAcquired(call, result);
        return result;
    }

然后调用StreamAllocation#isHealthy判断是否健康链接

    public boolean isHealthy(boolean doExtensiveChecks) {
        //todo Socket关闭,肯定不健康
        if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
            return false;
        }

        if (http2Connection != null) {
            return !http2Connection.isShutdown();
        }

        if (doExtensiveChecks) {
            try {
                int readTimeout = socket.getSoTimeout();
                try {
                    socket.setSoTimeout(1);
                    if (source.exhausted()) {
                        return false; // Stream is exhausted; socket is closed.
                    }
                    return true;
                } finally {
                    socket.setSoTimeout(readTimeout);
                }
            } catch (SocketTimeoutException ignored) {
                // Read timed out; socket is good.
            } catch (IOException e) {
                return false; // Couldn't read; socket is closed.
            }
        }
        return true;
    }

三、链接池的存取数据

根据图我们看ConnectionPool里面代码:

//这是一个用于清楚过期链接的线程池,每个线程池最多只能运行一个线程,并且这个线程池允许被垃圾回收
  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  /** The maximum number of idle connections for each address. */
  //每个address的最大空闲连接数。
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  //清理任务
  private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };
  //链接的双向队列
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  //路由的数据库
  final RouteDatabase routeDatabase = new RouteDatabase();
   //清理任务正在执行的标志
  boolean cleanupRunning;
//创建一个适用于单个应用程序的新连接池。
 //该连接池的参数将在未来的okhttp中发生改变
 //目前最多可容乃5个空闲的连接,存活期是5分钟
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    //保持活着的时间,否则清理将旋转循环
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }

管理http和http/2的链接,以便减少网络请求延迟。同一个address将共享同一个connection。该类实现了复用连接的目标。

  • 1、主要就是connections,可见ConnectionPool内部以队列方式存储连接;
  • 2、routDatabase是一个黑名单,用来记录不可用的route,但是看代码貌似ConnectionPool并没有使用它。所以此处不做分析。
  • 3、剩下的就是和清理有关了,所以executor是清理任务的线程池,cleanupRunning是清理任务的标志,cleanupRunnable是清理任务。
    /**
     * todo 保存连接以复用。
     * 本方法没上锁,只加了断言: 当前线程拥有this(pool)对象的锁。
     * 表示使用这个方法必须要上锁,而且是上pool的对象锁。
     * okhttp中使用到这个函数的地方确实也是这么做的
     */
    void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        //todo 如果清理任务未执行就启动它,再把新连接加入队列
        if (!cleanupRunning) {
            cleanupRunning = true;
            executor.execute(cleanupRunnable);
        }
        connections.add(connection);
    }
    /**
     * todo 获取可复用的连接
     */
    @Nullable
    RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
            //todo 要拿到的连接与连接池中的连接  连接的配置(dns/代理/域名等等)一致 就可以复用
            // 在使用了,所以 acquire 会创建弱引用放入集合记录
            if (connection.isEligible(address, route)) {
                streamAllocation.acquire(connection, true);
                return connection;
            }
        }
        return null;
    }

执行RealConnection#isEligible检查是否可以复用

    public boolean isEligible(Address address, @Nullable Route route) {
        // If this connection is not accepting new streams, we're done.
        //todo 实际上就是在使用(对于http1.1)就不能复用
        if (allocations.size() >= allocationLimit || noNewStreams) return false;
        // If the non-host fields of the address don't overlap, we're done.
        //todo 如果地址不同,不能复用。包括了配置的dns、代理、证书以及端口等等 (域名还没判断,所有下面马上判断域名)
        if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

        // If the host exactly matches, we're done: this connection can carry the address.
        //todo 都相同,那就可以复用了
        if (address.url().host().equals(this.route().address().url().host())) {
            return true; // This connection is a perfect match.
        }

        // At this point we don't have a hostname match. But we still be able to carry the
      // request if
        // our connection coalescing requirements are met. See also:
        // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
        // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/

        // 1. This connection must be HTTP/2.
        if (http2Connection == null) return false;

        // 2. The routes must share an IP address. This requires us to have a DNS address for both
        // hosts, which only happens after route planning. We can't coalesce connections that use a
        // proxy, since proxies don't tell us the origin server's IP address.
        if (route == null) return false;
        if (route.proxy().type() != Proxy.Type.DIRECT) return false;
        if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
        if (!this.route.socketAddress().equals(route.socketAddress())) return false;

        // 3. This connection's server certificate's must cover the new host.
        if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
        if (!supportsUrl(address.url())) return false;

        // 4. Certificate pinning must match the host.
        try {
            address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
        } catch (SSLPeerUnverifiedException e) {
            return false;
        }
        return true; // The caller's address can be carried by this connection.
    }
    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;

        // Find either a connection to evict, or the time that the next eviction is due.
        synchronized (this) {
            for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
                RealConnection connection = i.next();

                //todo 检查连接是否正在被使用
                //If the connection is in use, keep searching.
                if (pruneAndGetAllocationCount(connection, now) > 0) {
                    inUseConnectionCount++;
                    continue;
                }
                //todo 否则记录闲置连接数
                idleConnectionCount++;

                // If the connection is ready to be evicted, we're done.
                //TODO 获得这个连接已经闲置多久
                // 执行完遍历,获得闲置了最久的连接
                long idleDurationNs = now - connection.idleAtNanos;
                if (idleDurationNs > longestIdleDurationNs) {
                    longestIdleDurationNs = idleDurationNs;
                    longestIdleConnection = connection;
                }
            }
            //todo 超过了保活时间(5分钟) 或者池内数量超过了(5个) 马上移除,然后返回0,表示不等待,马上再次检查清理
            if (longestIdleDurationNs >= this.keepAliveDurationNs
                    || idleConnectionCount > this.maxIdleConnections) {
                // We've found a connection to evict. Remove it from the list, then close it
                // below (outside
                // of the synchronized block).
                connections.remove(longestIdleConnection);
            } else if (idleConnectionCount > 0) {
                // A connection will be ready to evict soon.
                //todo 池内存在闲置连接,就等待 保活时间(5分钟)-最长闲置时间 =还能闲置多久 再检查
                return keepAliveDurationNs - longestIdleDurationNs;
            } else if (inUseConnectionCount > 0) {
                // All connections are in use. It'll be at least the keep alive duration 'til we
                // run again.
                //todo 有使用中的连接,就等 5分钟 再检查
                return keepAliveDurationNs;
            } else {
                // No connections, idle or in use.
                //todo 都不满足,可能池内没任何连接,直接停止清理(put后会再次启动)
                cleanupRunning = false;
                return -1;
            }
        }
        closeQuietly(longestIdleConnection.socket());
        // Cleanup again immediately.
        return 0;
    }

ConnectionPool#pruneAndGetAllocationCount检查连接是否正在被使用

    private int pruneAndGetAllocationCount(RealConnection connection, long now) {
        //todo 这个连接被使用就会创建一个弱引用放入集合,这个集合不为空就表示这个连接正在被使用
        // 实际上 http1.x 上也只能有一个正在使用的。
        List<Reference<StreamAllocation>> references = connection.allocations;
        for (int i = 0; i < references.size(); ) {
            Reference<StreamAllocation> reference = references.get(i);
            if (reference.get() != null) {
                i++;
                continue;
            }

            // We've discovered a leaked allocation. This is an application bug.
            StreamAllocation.StreamAllocationReference streamAllocRef =
                    (StreamAllocation.StreamAllocationReference) reference;
            String message = "A connection to " + connection.route().address().url()
                    + " was leaked. Did you forget to close a response body?";
            Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

            references.remove(i);
            connection.noNewStreams = true;


            // If this was the last allocation, the connection is eligible for immediate eviction.
            if (references.isEmpty()) {
                connection.idleAtNanos = now - keepAliveDurationNs;
                return 0;
            }
        }
        return references.size();
    }
上一篇下一篇

猜你喜欢

热点阅读