Druiddruid 源码之旅

[druid 源码解析] 3 创建连接&销毁连接

2021-11-10  本文已影响0人  AndyWei123

回想我们上节的内容,有两个地方跳过了,一个是启动创建链接的线程,一个是启动销毁链接的线程,我们这次就来详细的探究一番这两个的用途。

创建连接线程

首先我们需要回头介绍之前没有讲解的三个 connection 数组的概念:

public void run() {
            initedLatch.countDown();

            long lastDiscardCount = 0;
            int errorCount = 0;
            for (; ; ) {
                // addLast
                try {
                    lock.lockInterruptibly();
                } catch (InterruptedException e2) {
                    break;
                }

                long discardCount = DruidDataSource.this.discardCount;
                boolean discardChanged = discardCount - lastDiscardCount > 0;
                lastDiscardCount = discardCount;

                try {
                    boolean emptyWait = true;
                    // 假如创建失败,不需要重新等待 emptyWait
                    if (createError != null
                        && poolingCount == 0
                        && !discardChanged) {
                        emptyWait = false;
                    }
                    // 假如 异步初始化也无需等待。
                    if (emptyWait
                        && asyncInit && createCount < initialSize) {
                        emptyWait = false;
                    }

                    if (emptyWait) {
                        // poolingCount:连接池中的空闲连接数量
                        // notEmptyWaitThreadCount:等待连接的线程数量
                        // 必须存在线程等待,才创建连接
                        if (poolingCount >= notEmptyWaitThreadCount //
                            && (!(keepAlive && activeCount + poolingCount < minIdle))
                            && !isFailContinuous()
                        ) {
                            empty.await();
                        }

                        // 防止创建超过maxActive数量的连接
                        if (activeCount + poolingCount >= maxActive) {
                            empty.await();
                            continue;
                        }
                    }

                } catch (InterruptedException e) {
              ......
                } finally {
                    lock.unlock();
                }

                PhysicalConnectionInfo connection = null;

                try {
                    // 创建物理链接 最终调用 Connection nativeConnection = driver.connect(url, info); 创建链接
                    connection = createPhysicalConnection();
                } catch (SQLException e) {
                  .....
                }

                boolean result = put(connection);
             
            }
        }

其主要流程如下:

  1. 假如现在没有等待获取链接的线程,就调用 empty.await(); 等待信号创建连接。
  2. 通过驱动程序创建物理链接.
  3. 将物理连接放入空闲连接数组中。
    什么时候会触发这个创建链接的信号呢。这里给出了一个实例:
    截屏2021-11-09 下午10.25.30.png
    其中 takeLast 方法判断假如连接池连接的数量为 0 就会调用这个信号触发,我们再来看看 put 方法。
private boolean put(DruidConnectionHolder holder, long createTaskId, boolean checkExists) {
        lock.lock();
        try {
            // 判断是否已经关闭
            if (this.closing || this.closed) {
                return false;
            }
            // 判断是否大于存活数量
            if (poolingCount >= maxActive) {
                if (createScheduler != null) {
                    // 调用清理任务,清除多余链接。
                    clearCreateTask(createTaskId);
                }
                return false;
            }

            if (checkExists) {
                for (int i = 0; i < poolingCount; i++) {
                    if (connections[i] == holder) {
                        return false;
                    }
                }
            }
            // 放到 connections 数组中
            connections[poolingCount] = holder;
            // 调用原子类增加 connection 统计数量
            incrementPoolingCount();
            // 统计连接池高峰
            if (poolingCount > poolingPeak) {
                poolingPeak = poolingCount;
                poolingPeakTime = System.currentTimeMillis();
            }
            // 通知获取连接的线程。
            notEmpty.signal();
            notEmptySignalCount++;

            if (createScheduler != null) {
                clearCreateTask(createTaskId);

                if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
                    && activeCount + poolingCount + createTaskCount < maxActive) {
                    emptySignal();
                }
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

我们来看一下主要流程:

  1. 首先获取锁,然后检查是否存活,是否大于最大连接数量。
  2. 将该链接放入 connections 中,并将空闲数量+1。
  3. 通知获取链接的线程。(看到这里,大家有没有想到 ArrayBlockQueue,它也是通过两个 Condition 来控制生产者和消费者的阻塞和活动的)。
    接着我们来看销毁线程。
    public class DestroyTask implements Runnable {
        @Override
        public void run() {
            // 销毁超过最大空闲的链接
            shrink(true, keepAlive);
            // 回收超时的链接
            if (isRemoveAbandoned()) {
                removeAbandoned();
            }
        }

    }

销毁线程主要做两件事情,一个是销毁超过最大空闲连接数量的链接,一个是回收超时的链接。我们先来看一下第一个:

public void shrink(boolean checkTime, boolean keepAlive) {
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            return;
        }

        boolean needFill = false;
        int evictCount = 0;
        int keepAliveCount = 0;
        int fatalErrorIncrement = fatalErrorCount - fatalErrorCountLastShrink;
        fatalErrorCountLastShrink = fatalErrorCount;

        try {
            if (!inited) {
                return;
            }
            // 需要检查的链接数量,必须要保留 minIdle 以上的连接数量
            final int checkCount = poolingCount - minIdle;
            final long currentTimeMillis = System.currentTimeMillis();
            // 遍历每一条链接
            for (int i = 0; i < poolingCount; ++i) {
                // 获取该链接
                DruidConnectionHolder connection = connections[i];

                if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
                    // keepAliveConnections: 销毁线程会检测线程,如果检测存活的线程放暂时放在这里
                    keepAliveConnections[keepAliveCount++] = connection;
                    continue;
                }

                if (checkTime) {
                    if (phyTimeoutMillis > 0) {
                        long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                        if (phyConnectTimeMillis > phyTimeoutMillis) {
                            // evictConnections: 失效、过期的连接,会暂时放在这个数组里面
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }

                    long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
                    // 配置参数错误
                    if (idleMillis < minEvictableIdleTimeMillis
                        && idleMillis < keepAliveBetweenTimeMillis
                    ) {
                        break;
                    }
                    // minEvictableIdleTimeMillis 最小丢弃空闲时间
                    if (idleMillis >= minEvictableIdleTimeMillis) {
                        if (checkTime && i < checkCount) {
                            evictConnections[evictCount++] = connection;
                            continue;
                            // maxEvictableIdleTimeMillis 最大丢弃空闲时间。
                        } else if (idleMillis > maxEvictableIdleTimeMillis) {
                            evictConnections[evictCount++] = connection;
                            continue;
                        }
                    }
                    // 假如没有超过丢弃最小时间,看是否需要放到 keepAliveConnections 中。
                    if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
                        keepAliveConnections[keepAliveCount++] = connection;
                    }
                } else {
                    if (i < checkCount) {
                        // 强制移除
                        evictConnections[evictCount++] = connection;
                    } else {
                        break;
                    }
                }
            }
            // 需要移除的数量:等于丢弃的数量和keepAlive的数量
            int removeCount = evictCount + keepAliveCount;
            // 清理 connections 中的需要移除的数量,这些都是连续的,我们可以从上面的循环中看出来
            if (removeCount > 0) {
                System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                poolingCount -= removeCount;
            }
            keepAliveCheckCount += keepAliveCount;

            if (keepAlive && poolingCount + activeCount < minIdle) {
                needFill = true;
            }
        } finally {
            lock.unlock();
        }
        // 关闭 evictConnections 中的链接
        if (evictCount > 0) {
            for (int i = 0; i < evictCount; ++i) {
                DruidConnectionHolder item = evictConnections[i];
                Connection connection = item.getConnection();
                JdbcUtils.close(connection);
                destroyCountUpdater.incrementAndGet(this);
            }
            Arrays.fill(evictConnections, null);
        }
        // 检查 keepAliveCount 中的链接是否健康,是的话放回 connections 中,假如不是就 close
        if (keepAliveCount > 0) {
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) {
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();

                boolean validate = false;
                try {
                    this.validateConnection(connection);
                    validate = true;
                } catch (Throwable error) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("keepAliveErr", error);
                    }
                    // skip
                }

                boolean discard = !validate;
                if (validate) {
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    boolean putOk = put(holer, 0L, true);
                    if (!putOk) {
                        discard = true;
                    }
                }

                if (discard) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        // skip
                    }

                    lock.lock();
                    try {
                        discardCount++;

                        if (activeCount + poolingCount <= minIdle) {
                            emptySignal();
                        }
                    } finally {
                        lock.unlock();
                    }
                }
            }
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);
        }
        // 假如需要填满,就调用 emptySignal 唤醒创建线程。
        if (needFill) {
            lock.lock();
            try {
                int fillCount = minIdle - (activeCount + poolingCount + createTaskCount);
                for (int i = 0; i < fillCount; ++i) {
                    emptySignal();
                }
            } finally {
                lock.unlock();
            }
        } else if (onFatalError || fatalErrorIncrement > 0) {
            lock.lock();
            try {
                emptySignal();
            } finally {
                lock.unlock();
            }
        }
    }

其主要步骤如下:

  1. 首先通过计算得到需要消除的链接数量,然后遍历空闲连接,从 0 开始(最老的连接,先销毁)。
  2. 将连接分类,看是需要放到 keepAliveConnections 中,还是 evictConnections , 两者必选其中一个,具体分类逻辑可以看上面代码,有具体的注释。
  3. 将需要丢弃的连接从空闲连接数组中剔除。
  4. evictConnections 数组中的连接关闭。
  5. 检查 keepAliveCount 中的链接是否健康,是的话放回 connections 中,假如不是就 close 。
    接下来我们再看一下 removeAbandoned 的逻辑。
 public int removeAbandoned() {
        int removeCount = 0;

        long currrentNanos = System.nanoTime();

        List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();

        activeConnectionLock.lock();
        try {
            // 遍历所有活动的链接,
            Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();

            for (; iter.hasNext(); ) {
                DruidPooledConnection pooledConnection = iter.next();
                // 检查连接是否还在运行,还在就跳到下一个
                if (pooledConnection.isRunning()) {
                    continue;
                }

                long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
                // 假如超时,就放到丢弃队列
                if (timeMillis >= removeAbandonedTimeoutMillis) {
                    iter.remove();
                    pooledConnection.setTraceEnable(false);
                    abandonedList.add(pooledConnection);
                }
            }
        } finally {
            activeConnectionLock.unlock();
        }

        if (abandonedList.size() > 0) {
            for (DruidPooledConnection pooledConnection : abandonedList) {
                final ReentrantLock lock = pooledConnection.lock;
                lock.lock();
                try {
                    if (pooledConnection.isDisable()) {
                        continue;
                    }
                } finally {
                    lock.unlock();
                }
                // 关闭链接
                JdbcUtils.close(pooledConnection);
                pooledConnection.abandond();
                removeAbandonedCount++;
                removeCount++;
                // 打印日志
               ..........
        }

        return removeCount;
    }

这里的逻辑比较简单,首先是遍历所有活动的链接,假如连接在运行就跳到下一个,假如不在运行,检查是否超时,假如是就将连接放入丢弃队列中,然后遍历丢弃队列,将连接关闭。

上一篇下一篇

猜你喜欢

热点阅读