线程应用实例-手写连接池(超时等待模式)

2020-07-03  本文已影响0人  菠萝丶丶

我们经常在使用数据库连接池时会遇到如网络不好连接池报等待超时异常,总是感觉别人写的框架很牛逼,在看了上一节的等待通知机制之后,我们也可以自己动手写一个连接池,并且如果超出一定的时间我们可以对用户提示异常。

以下是对该连接池的具体实现代码:

public class ConnectionPoolTest {
    static ConnectionPool pool = new ConnectionPool(10);
    // 保证所有ConnectionRunner能够同时开始
    static CountDownLatch start = new CountDownLatch(1);
    // main线程将会等待所有ConnectionRunner结束后才能继续执行
    static CountDownLatch end;
    // 超时等待时间,可以修改等待时间进行观察
    static long timeoutMills = 2000;

    public static void main(String[] args) throws InterruptedException {
        // 线程数量,可以修改线程数量进行观察
        int threadCount = 280;
        // 单个线程获取连接次数
        end = new CountDownLatch(threadCount);
        AtomicInteger got = new AtomicInteger();
        AtomicInteger notGot = new AtomicInteger();
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(() -> {
                try {
                    start.await();
                } catch (Exception ex) {
                }
                try {
                    Connection connection = pool.fetchConnection(timeoutMills);
                    connection.use();
                    pool.releaseConnection(connection);
                    got.getAndIncrement();
                } catch (Exception ex) {
                    System.err.println(ex.getMessage());
                    notGot.getAndIncrement();
                }
                end.countDown();
            }, "ConnectionRunnerThread-" + i);
            thread.start();
        }
        start.countDown();
        end.await();
        System.out.println("total invoke: " + threadCount);
        System.out.println("got connection: " + got);
        System.out.println("not got connection " + notGot);
    }

}

class ConnectionPool {
    private LinkedList<Connection> pool = new LinkedList<>();

    public ConnectionPool(int initialSize) {
        if (initialSize <= 0) {
            throw new IllegalArgumentException("initialSize can not less or equals to zero");
        }
        for (int i = 0; i < initialSize; i++) {
            pool.addLast(new Connection());
        }
    }

    public Connection fetchConnection(long mills) throws InterruptedException, TimeoutException {
        synchronized (pool) {
            long wakeUp = System.currentTimeMillis() + mills;
            long sleepMills = mills;
            do {
                if (pool.isEmpty()) {
                    pool.wait(sleepMills);
                }
                sleepMills = wakeUp - System.currentTimeMillis();
                if (sleepMills <= 0) {
                    throw new TimeoutException("time out, wait for " + mills + " millisecond");
                }
            } while (pool.isEmpty());
            System.out.println(pool);
            // 只要从while循环中返回,则一定是pool不为空
            return pool.removeFirst();
        }
    }

    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                // 归还连接之后通知等待在pool上的所有线程,让其他消费者能够感知连接池中有可用的连接
                pool.addLast(connection);
                pool.notifyAll();
            }
        }
    }
}

class Connection {
    // 模拟数据库连接所要执行的操作消耗的时间(50-99毫秒)
    public void use() {
        try {
            Thread.sleep((long)new Random().nextInt(50) + 50);
        } catch (InterruptedException e) {
        }
    }

}

程序分析:

连接池ConnectionPool

初始化:

在创建连接池对象时必须指定连接池容量,然后初始化该连接池内指定数量的连接对象,将其添加到连接池队列中。

获取连接:

在调用fetchConnection方法时需要先获取队列pool的锁,然后计算出如果等待超时的时刻,同时进入循环开始判断队列是否为空,如果不为空则开始等待给定的时间,直到被释放连接的线程唤醒,或者等待超时。如果是被唤醒的情况,剩余的休眠时间大于0,并且此时如果队列不为空,跳出循环并且返回队列中的连接对象。如果是超时的情况,则剩余的休眠时间小于或等于0,此时抛出等待超时异常。

释放连接:

调用releaseConnection并且先获取pool对象的锁,添加到队列尾部再唤醒等待在pool对象上的获取连接的线程

流程图如下所示:


连接池示例程序流程图
上一篇下一篇

猜你喜欢

热点阅读