mybatis 连接池探究

2018-05-24  本文已影响1138人  上重楼

起因是目前再看 《Java并发编程的艺术》 并发基础那章实现了一个简单的连接池,虽然只是为了展示等待/通知,但是我对平时常用的mybatis的 连接池是怎么实现的感到很好奇,所以就有了这次的记录。框架平时用的多,但是源码还真的没怎么看,这一篇也算是为阅读源码起一个头。

mybatis版本是:3.4.0
连接池核心类:org.apache.ibatis.datasource.pooled.PooledDataSource
测试代码是:
简单粗暴的抛出Exception 是为了移除异常处理,关注主流程

    public static void main(String[] args) throws Exception {
        //构建一个连接池 有很多重载的构造函数
        PooledDataSource pool = new PooledDataSource(
                "com.mysql.jdbc.Driver",
                "jdbc:mysql://127.0.0.1:3308/db_name",
                "user_name",
                "password");
         
         //从连接池取出一个连接,这个是重头戏
        Connection connection = pool.getConnection();
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("SELECT * FROM checking_in_log LIMIT 1");
        if (resultSet.next()) {
            String id = resultSet.getString("id");
            System.out.println(id);
        }
        resultSet.close();
        statement.close();
        connection.close();
    }

首先跟着测试代码走一遍:

public PooledDataSource(String driver, String url, String username, String password) {
    dataSource = new UnpooledDataSource(driver, url, username, password);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

第一行代码就是初始化这个PooledDataSource对象了,里面做了2件事,初始化一个UnpooledDataSource对象,还有把expectedConnectionTypeCode设置为("" + url + username + password).hashCode();

UnpooledDataSource和PoolState这2个对象是PooledDataSource最关键的2个对象了,可以说PooledDataSource是对UnpooledDataSource的使用上的封装,数据库连接参数,设置参数大多都是通过调用UnpooledDataSource进行的。

这是UnpooledDataSource的方法
getConnection
getConnection
setLoginTimeout
getLoginTimeout
setLogWriter
getLogWriter
getDriverClassLoader
setDriverClassLoader
getDriverProperties
setDriverProperties
getDriver
setDriver
getUrl
setUrl
getUsername
setUsername
getPassword
setPassword
isAutoCommit
setAutoCommit
getDefaultTransactionIsolationLevel
setDefaultTransactionIsolationLevel

而PoolState作为一个数据对象,连接池就保存在里面 这个对象里面的方法都标记了synchronized,而对象里面的统计字段基本都是被多线程竞争的,我觉得可以改为使用JUC的工具类atomicLong。毕竟竞争压力大,synchronized的偏向锁 轻量锁无法起到一个优化的作用。可能这个类在jdk1.5以前就创建了。

这是里面的字段
    //PooledDataSource是通过new PoolState(this)得出的PollState 
  protected PooledDataSource dataSource;
  //空闲连接池
  protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
  //活动中的连接池
  protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
  //请求总数
  protected long requestCount = 0;
 //累计请求时间
  protected long accumulatedRequestTime = 0;
  //累计获取时间(连接池被别的线程拿走执行的时间)
  protected long accumulatedCheckoutTime = 0;
  //超时执行连接数计数,当连接执行超过了poolMaximumCheckoutTime(20秒)而这时候有请求竞争这个连接的时候这个值加一,同时连接会rollback,然后给新的竞争线程去使用
  protected long claimedOverdueConnectionCount = 0;
  //这个是accumulatedCheckoutTime是一样的
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  //累计等待时间,就是连接都被使用了,而又没办法抢别人超时执行的连接的时候就只能乖乖等待了
  protected long accumulatedWaitTime = 0;
  //每次等待连接就加一
  protected long hadToWaitCount = 0;
  //当获取到一个已经失效的连接的时候就加一
  protected long badConnectionCount = 0;
//取出连接所调用的方法
 @Override
  public Connection getConnection() throws SQLException {
    return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
  }

方法的返回类型是:java.sql.Connection
通过调用popConnection返回org.apache.ibatis.datasource.pooled.PooledConnection这个mybatis的封装类,然后调用它的getProxyConnection获得java.sql.Connection
PooledConnection实现了InvocationHandler接口,然后内部有2个Connection对象

private Connection realConnection;
private Connection proxyConnection;

realConnection是真实的连接,在构造器传入
proxyConnection是在构造器里面通过反射代理创建的一个对象
private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
通过源码发现,之所以有proxyConnection是为了拦截调用Connection.close()
在PooledConnection的invoke实现方法(InvocationHandler接口)可以看到

@Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        if (!Object.class.equals(method.getDeclaringClass())) {
          // issue #579 toString() should never fail
          // throw an SQLException instead of a Runtime
          checkConnection();
        }
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
  }

如果是close方法,调用后并不会真正去执行Connection.close(),而是把连接放回连接池,至于其他方法都是通过realConnection去执行的。

接下来看看2个核心方法,拿和还
pushConnection
popConnection

//取出一个连接
 private PooledConnection popConnection(String username, String password) throws SQLException {
    //是否需要等待连接标记
    boolean countedWait = false;
    //返回对象
    PooledConnection conn = null;
    //方法开始的时间,用来计算请求连接耗费时间的
    long t = System.currentTimeMillis();
    //获取到坏连接的次数
    int localBadConnectionCount = 0;
    
    //如果没获取到连接就一直重试,有3种情况会退出循环,成功获取到连接
    //调用wait(timeout)的时候抛出//InterruptedException异常
   //localBadConnectionCount > (poolMaximumIdleConnections + 3)的时候抛出异常,这个意思是获取到失效连接次数已经大于最大空闲连接数+3次时,不再尝试面对现实
    while (conn == null) {
      //先对 PoolState这个对象加锁,前面也有说,这个对象在连接池是唯一的。
      synchronized (state) {
        //如果空闲连接列表不为空,从其中取出第一个连接,这是最理想的情况了,在压力不大的时候一般都这样
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          //下面那行源码英文注释其实已经说明了问题,当活动连接还没达到最大活动连接限制的时候直接new一个新连接
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            //在不能创建新连接的时候就要进行下面的操作了,如果第1个连接的执行已经超时就干掉它取而代之,否则就乖乖等待
            // Cannot create new connection
            //获取最初创建的连接
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            //获取连接的执行时间,就是连接被线程持有的时间
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            //判断是否已经执行超时,默认是20秒
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // Can claim overdue connection
              //这4个在前面已经解释了
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              //就旧的连接从活动连接列表移除
              state.activeConnections.remove(oldestActiveConnection);
              //下面这个就很残暴了,如果连接不是自动提交的,调用它的回滚。如果人家正在执行就懵逼了,所以长时间执行的大sql poolMaximumCheckoutTime还是有必要设置一下
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  log.debug("Bad connection. Could not roll back");
                }  
              }
              //这里相当于把旧衣服脱掉,穿上新衣服。😀 真正的Connection对象还是没变的
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              //把旧对象标记为失效
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // Must wait    需要等待的处理
              try {
                //累加一次等待计数,判断条件是避免循环等待的时候多次累计
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                //下面几行就是并发编程的内容了,poolTimeToWait默认值是20秒
                long wt = System.currentTimeMillis();
                //wait 放弃掉自己的锁,然后开始等待被notify唤醒或者达到超时时间,这里要注意的是,如果已经到了超时时间,而别的线程还没有释放锁,照样是醒不来的
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                //中断异常,跳出
                break;
              }
            }
          }
        }

        //经历了前面的磨难,终于快到收官了 如果连接对象不为null且有效
        //conn.isValid() 方法做了:valid && realConnection != null && dataSource.pingConnection(this);
        if (conn != null) {
          if (conn.isValid()) {
            //又是这一步操作,简单粗暴/(ㄒoㄒ)/~~  
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            //这一步在构造器那里有类似的,连接的hashCode
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            //设置连接被获取的时间,这个在前面判断是否超时执行的时候用到了
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            //最后更新时间
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            //将这个穿了新衣服的连接加入到活动连接
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            //这个分支是最苦逼的,获取到连接,但是是一个失效的。
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            //将连接抛弃掉,重新去获取
            conn = null;
              //下面这几行前面也有说,获取失效连接超过一定次数,抛出异常,放弃希望
            if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    //这个循环体外的判断,看上去主要还是防止那个wait抛出中断异常的break
    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

长长的popConnection后面还有一个短短的pushConnection 添加链接到

 protected void pushConnection(PooledConnection conn) throws SQLException {
    //同样是先加锁
    synchronized (state) {
      //先把要链接从活动连接里面移除
      state.activeConnections.remove(conn);
      //要添加的这个连接还必须是一个有效连接
      if (conn.isValid()) {
        //判断空闲连接列表还有空位,且这个连接是从这个池创建的
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          //和获取连接有点类似,累加执行时间
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          //不为自动提交的时候回滚一下
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          //换个新衣服
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          //将连接对象加到空闲列表
          state.idleConnections.add(newConn);
          //使用旧连接对象的创建时间和修改时间 
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          //将旧对象标记为失效
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          //通知所有对state对象执行wait的线程可以继续了(让可能的在等待获取连接的wait对象动一下)
          state.notifyAll();
        } else {
          //如果空闲列表满了,或者这个连接对象不属于这个池创建的 真正的关闭这个连接 然后标记为失效
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
         //执行真实连接对象的close,返回给用户的连接都是ProxyConnection那个动态代理的对象,close只是返回连接到池。
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          //标记为失效
          conn.invalidate();
        }
      } else {  
        //无效的连接对象直接就不处理,只累计一下计数器
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

大概就是这些了,有一些比较蛋疼的地方,比如PoolState的
protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
空闲列表和活动列表,这2个列表是一直在变化的,所以用ArrayList性能就有点问题了。而源码里面更多只是对头和尾进行操作,这里用LinkedList可能会更好?

上一篇下一篇

猜你喜欢

热点阅读