连接池程序员Java学习笔记

Mybatis数据库连接池解析

2016-12-03  本文已影响1147人  一只小哈

一、为什么要学习DB的连接池?
最近线上,在QPS突增突降的时候总会报异常,看日志是因为同时创建连接数过多导致,但是用的是公司自己的orm框架。并没有对内开源所以并不清楚是什么问题,但是这种问题开源框架会不会有呢?
通常我们常用的DB连接池主要有:mybatis原生、C3p0、dbcp以及阿里爸爸的druid。这几种都是比较专业的连接池,对比了一下,Mybatis原生连接池实现的最为简单,所以从github上clone下来,简要的看了一下。

Mybatis的代码目录很整洁,看上去 也很清晰。

Paste_Image.png

我们也可以很容易的直奔主题,直接看看datasource的实现。下面就是全部的代码:

Paste_Image.png

反正我第一眼看是懵逼的,竟然这么点代码就搞定了....

反观数据库连接池,主要有两种Pooled和Unpooled。字面上也能理解,一种是用数据库连接池的一种是不用连接池的。先看看Unpooled的:

public class UnpooledDataSource implements DataSource {    private ClassLoader driverClassLoader;  private Properties driverProperties;  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();  private String driver;  private String url;  private String username;  private String password;  private Boolean autoCommit;  private Integer defaultTransactionIsolationLevel;  static {    Enumeration<Driver> drivers = DriverManager.getDrivers();    while (drivers.hasMoreElements()) {      Driver driver = drivers.nextElement();      registeredDrivers.put(driver.getClass().getName(), driver);    }  }  public UnpooledDataSource() {  }public class UnpooledDataSource implements DataSource {
  
  private ClassLoader driverClassLoader;
  private Properties driverProperties;
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();

  private String driver;
  private String url;
  private String username;
  private String password;

  private Boolean autoCommit;
  private Integer defaultTransactionIsolationLevel;

  static {
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }

  public UnpooledDataSource() {
  }

UnpooledDataSource 继承自DataSource ,同时里面定义了一些我们常用的链接数据库的配置。
主要看getConnection()

  private Connection doGetConnection(String username, String password) throws SQLException {
    Properties props = new Properties();
    if (driverProperties != null) {
      props.putAll(driverProperties);
    }
    if (username != null) {
      props.setProperty("user", username);
    }
    if (password != null) {
      props.setProperty("password", password);
    }
    return doGetConnection(props);
  }

  private Connection doGetConnection(Properties properties) throws SQLException {
    initializeDriver();
    Connection connection = DriverManager.getConnection(url, properties);
    configureConnection(connection);
    return connection;
  }

很明显,每调用一次都会创建一个连接,当并发量上来的时候那么数据库一定会来一个大爆炸,哈哈哈。
所以在使用mybatis一定要使用连接池,要不然你的数据库可就惨了。
再看看pooledDataSource:

private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      synchronized (state) {//用Staye做同步,空闲的链接集合和活跃的链接集合都在state中
        if (!state.idleConnections.isEmpty()) {//先查看当前的空闲连接池是否为空
          // Pool has available connection
          conn = state.idleConnections.remove(0);//不为空我就从空闲的连接池取咯
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) {//现在没有空闲链接了,那看一下活跃队列,如果当前的链接结合还没满。
            // Can create new connection
            conn = new PooledConnection(dataSource.getConnection(), this);//创建链接!
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {//如果活跃连接满了,链接都很活跃,那证明并发很高了
            // Cannot create new connection
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {//既然现在满了,那就检查一下看看能不能有可以抢占的链接,先看一下这个连接被用了多久了,有米有超过我的限制。
              // Can claim overdue connection                //如果超限了,那么我此时应该把它抢占
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);//移除这个连接
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  log.debug("Bad connection. Could not roll back");
                }  
              }
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);//用原来老的Connection创建一个新的PooledConnection
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else { //如果此时我没有可以抢占的连接池,那我怎么办?显然不能反回,那我还是在这等一会儿吧。wait一会儿
              // Must wait
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait); //wait唤醒之后继续while 重复上述步骤。
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) { //如果我现在拿到了链接,不管是从哪拿的吧
          if (conn.isValid()) {//先判断一下我的链接是否还有效,有效的话我就初始化一下。那么此时默认下一次循环会退出while
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

上面的代码虽然是一大坨,但是条理十分清楚(具体步骤可以看代码注释)。

这里我们能看到的比较醒目的一个synchronized (state) ,这就表明这个方法是同步方法,但是这样也就降低了popConnection的性能,因为多线程需要竞争state的锁。

对于弹出的连接,mybatis需要检查连接的有效性,对于有效性的检测,Mybatis做的是十分保守的,具体的实现在isValid()方法中:

  /*
   * Method to see if the connection is usable
   *
   * @return True if the connection is usable
   */
  public boolean isValid() {
    return valid && realConnection != null && dataSource.pingConnection(this);
  }

   /*
   * Method to check to see if a connection is still usable
   *
   * @param conn - the connection to check
   * @return True if the connection is still usable
   */
  protected boolean pingConnection(PooledConnection conn) {
    boolean result = true;

    try {
      result = !conn.getRealConnection().isClosed();
    } catch (SQLException e) {
      if (log.isDebugEnabled()) {
        log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
      }
      result = false;
    }

    if (result) {
      if (poolPingEnabled) {
        if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
          try {
            if (log.isDebugEnabled()) {
              log.debug("Testing connection " + conn.getRealHashCode() + " ...");
            }
            Connection realConn = conn.getRealConnection();
            Statement statement = realConn.createStatement();
            ResultSet rs = statement.executeQuery(poolPingQuery);
            rs.close();
            statement.close();
            if (!realConn.getAutoCommit()) {
              realConn.rollback();
            }
            result = true;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
            }
          } catch (Exception e) {
            log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
            try {
              conn.getRealConnection().close();
            } catch (Exception e2) {
              //ignore
            }
            result = false;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
            }
          }
        }
      }
    }
    return result;
  }

上面的实现很简单,其实就是在获取配置中的poolPinQuery语句。一般我们用Select now | | select 1。
但是这种策略会严重降低sql的查询效率,如果开启此功能,每一次Query相当于两次Query。
而其他的数据库连接池,则是有单独的线程去check,像C3P0。所以在线程池这里,Mybatis原生做的并不是很好。

连接取出来之后,在Query之后需要将连接放回连接池中,那么这又是怎么进行的呢?

其实仅仅是调用了Connection的Close,但是这个Connection不是jdbc的Connection,而是PooledConnection。它包装了realConnection(Db的Connection)和一个proxyConnection(Connection的代理)。

lass PooledConnection implements InvocationHandler {

  private static final String CLOSE = "close";
  private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };

  private int hashCode = 0;
  private PooledDataSource dataSource;
  private Connection realConnection;
  private Connection proxyConnection;
  private long checkoutTimestamp;
  private long createdTimestamp;
  private long lastUsedTimestamp;
  private int connectionTypeCode;
  private boolean valid;


  /*
   * Required for InvocationHandler implementation.
   *
   * @param proxy  - not used
   * @param method - the method to be executed
   * @param args   - the parameters to be passed to the method
   * @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
   */
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        if (!Object.class.equals(method.getDeclaringClass())) {
          // issue #579 toString() should never fail
          // throw an SQLException instead of a Runtime
          checkConnection();
        }
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
  }

首先,它实现了InvocationHandler ,那么我们基本上知道了它是一个JDK原生代理的实现。接下来的invoke方法里面,对Close方法做了处理。处理的方式是将Connection push到连接池中。

protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) { //先同步
      state.activeConnections.remove(conn);//链接从活跃连接池移除
      if (conn.isValid()) {//再次判断链接的有效性
        //如果空闲连接池不满,那么将链接返回连接池,如果满了,那么Close掉链接
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          state.notifyAll();//之前等待获取连接的pop方法就被唤醒了,重新去争抢线程
        } else {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          conn.invalidate();
        }
      } else {
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

Push的逻辑也比较简单。连接在取出,使用,最后到返回连接池。Mybatis做的都十分的简单清晰,但是性能确实可能受到影响,猜测可能开源的优秀连接池太多Mybatis不想重新造轮子了吧。

上一篇下一篇

猜你喜欢

热点阅读