HttpClient连接池管理

2021-04-21  本文已影响0人  晴天哥_王志

系列

开篇

整体流程

连接池处理流程

源码分析

public class MainClientExec implements ClientExecChain {

    private final HttpRequestExecutor requestExecutor;
    // PoolingHttpClientConnectionManager的连接池管理
    private final HttpClientConnectionManager connManager;
    private final ConnectionReuseStrategy reuseStrategy;
    private final ConnectionKeepAliveStrategy keepAliveStrategy;

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) {

        // 根据route从connManager获取ConnectionRequest对象
        final ConnectionRequest connRequest = 
                         connManager.requestConnection(route, userToken);

        final HttpClientConnection managedConn;
        try {
            // 从connRequest中获取managedConn
            final int timeout = config.getConnectionRequestTimeout();
            managedConn = connRequest.get(
                  timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        } catch(final ExecutionException ex) {
        }

        try {
            HttpResponse response;
            for (int execCount = 1;; execCount++) {
                // 通过HttpRequestExecutor并通过managedConn发送请求
                response = requestExecutor.execute(request, 
                        managedConn, context);
            }

            // check for entity, release connection if possible
            final HttpEntity entity = response.getEntity();

            if (entity == null || !entity.isStreaming()) {
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            }

            return new HttpResponseProxy(response, connHolder);
        } catch (final Error error) {
        }
    }
}
public class PoolingHttpClientConnectionManager
    implements HttpClientConnectionManager, 
               ConnPoolControl<HttpRoute>, Closeable {

    private final ConfigData configData;
    // 连接池管理的核心对象
    private final CPool pool;
    private final HttpClientConnectionOperator connectionOperator;
    private final AtomicBoolean isShutDown;

    public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        // 从连接池中获取一个CPoolEntry(Connection的包装类)
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);

        // 返回ConnectionRequest对象,内部通过leaseConnection获取HttpClientConnection
        return new ConnectionRequest() {

            @Override
            public boolean cancel() {
                return future.cancel(true);
            }

            // ConnectionRequest的get方法。调用leaseConnection方法,
            // 并且传入future(CPoolEntry的封装(connection的封装))
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit timeUnit) {

                final HttpClientConnection conn = 
                      leaseConnection(future, timeout, timeUnit);

                // 省略其他代码

                return conn;
            }
        };
    }

    protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit timeUnit) {
        final CPoolEntry entry;
        try {
            entry = future.get(timeout, timeUnit);
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
        }
    }

}
abstract class RouteSpecificPool<T, C, E extends PoolEntry<T, C>> {
    // route维度连接管理数据结构
    private final T route;
    private final Set<E> leased;
    private final LinkedList<E> available;
    // 等待队列中的Future对象
    private final LinkedList<Future<E>> pending;
}

public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
                      implements ConnPool<T, E>, ConnPoolControl<T> {

    private final Lock lock;
    private final Condition condition;
    private final ConnFactory<T, C> connFactory;
    // route维度连接管理数据结构
    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
    // 全局维度连接管理数据结构
    private final Set<E> leased;
    private final LinkedList<E> available;
    // 全局等待队列中Future
    private final LinkedList<Future<E>> pending;

    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {

        return new Future<E>() {

            private final AtomicBoolean cancelled = new AtomicBoolean(false);
            private final AtomicBoolean done = new AtomicBoolean(false);
            private final AtomicReference<E> entryRef = new AtomicReference<E>(null);

            @Override
            public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                for (;;) {
                    synchronized (this) {
                        try {
                            final E entry = entryRef.get();
                            if (entry != null) {
                                return entry;
                            }

                            // 阻塞获取CPoolEntry        
                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);

                        } catch (final IOException ex) {
                        }
                    }
                }
            }

        };
    }
}
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
                      implements ConnPool<T, E>, ConnPoolControl<T> {

    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit timeUnit,
            final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {

        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }
        // 加锁串行操作
        this.lock.lock();
        try {
            E entry;
            for (;;) {
                // 每一个route都有一个连接池,这里获取指定route的连接池
                final RouteSpecificPool<T, C, E> pool = getPool(route);
                // 循环取,直到超时
                for (;;) {
                    entry = pool.getFree(state);
                    // 为空则跳出循环
                    if (entry == null) {
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }

                // entry不为空,则修改连接池的参数,并返回。
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // 按需创建新连接,如果需要的话
                final int maxPerRoute = getMax(route);

                // 每个route对应的连接最大数量是可配置的,如果超过了,就需要通过LRU清理掉一些连接
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }

                // 当前route池中的连接数,没有达到上线
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    // 判断连接池是否超过上线,如果超过了,需要通过LRU清理掉一些连接
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        // 如果空闲连接数已经大于剩余可用空间,则需要清理下空闲连接 
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        // 根据route建立一个连接
                        final C conn = this.connFactory.create(route);
                        // 将这个连接放入route对应的“小池”中
                        entry = pool.add(conn);
                        // 将这个连接放入“大池”中
                        this.leased.add(entry);
                        return entry;
                    }
                }
                // 到这里证明没有从获得route池中获得有效连接,
                // 并且想要自己建立连接时当前route连接池已经到达最大值
                boolean success = false;
                try {
                    // 将future放入route池中等待
                    pool.queue(future);
                    // 将future放入大连接池中等待
                    this.pending.add(future);
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new ExecutionException(operationAborted());
                    }
                } finally {
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }
}

参考

上一篇 下一篇

猜你喜欢

热点阅读