mybatis源码深度解析之dataSource

2020-08-31  本文已影响0人  不是你的bug

要执行sql语句就要有db连接,mybatis的db连接也从DataSource获取,mybatis实现了UNPOOLED和POOLED两类数据源。

一、DataSorce的创建和使用

在解析配置文件是时根据指定的配置生产对应的dataSorce,生成的dataSorce保存在configuration的environment中。配置文件被解析为cconfiguration,每个DataSourceFactory都通过其唯一的Cconfiguration来创建的,environment就是配置文件中的environments配置。

// src/main/java/org/apache/ibatis/session/Configuration.java
public class Configuration {
     protected Environment environment;
}

// src/main/java/org/apache/ibatis/mapping/Environment.java
public final class Environment {
    private final DataSource dataSource;
}

1.1 DataSorce实例生成

environments配置中的dataSource的type属性指定了生成其dataSource实例的工厂类DataSourceFactory,通过对应的DataSourceFactory来生产特定的DataSource。
DataSourceFactory接口只有两个方法:

// src/main/java/org/apache/ibatis/datasource/DataSourceFactory.java
public interface DataSourceFactory {
    // 设置DataSource的属性,配置文件中的属性便是通过该方法设置的
    void setProperties(Properties props);
    // 获取一个DataSource
    DataSource getDataSource();
}

在解析配置时根据的配置的type属性DataSourceFactory类型并生成其实例。

private DataSourceFactory dataSourceElement(XNode context) throws Exception {
  if (context != null) {
    String type = context.getStringAttribute("type");
    // 获取dataSource配置的属性
    Properties props = context.getChildrenAsProperties();
    // 创建DataSourceFactory
    DataSourceFactory factory = (DataSourceFactory) resolveClass(type).getDeclaredConstructor().newInstance();
    // 为DataSourceFactory的DataSource设置属性
    factory.setProperties(props);
    return factory;
  }
  throw new BuilderException("Environment declaration requires a DataSourceFactory.");
}

每个SqlSession都有一个用来执行sql的执行器Executor,Executor的事务管理器JdbcTransaction会持用Environment中的dataSource。

// src/main/java/org/apache/ibatis/session/defaults/DefaultSqlSessionFactory.java
public class DefaultSqlSessionFactory implements SqlSessionFactory {
   
  // 生成SqlSession
  private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
      final Environment environment = configuration.getEnvironment();
      final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
      // 生成事务管理器并将dataSource给到事务管理器
      tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
      // 生成sqlSession的事务管理器
      final Executor executor = configuration.newExecutor(tx, execType);
      return new DefaultSqlSession(configuration, executor, autoCommit);
    }
}

在事务管理器中定义了获取db连接的接口,mybatis提供的两个事务管理器JDBC和MANAGED,两个类型对该接口的实现方式都差不多,都是在getConnection是判断其connection属性是否为null,若不为null则通过其dataSource获取到一个并给到connection属性。

// src/main/java/org/apache/ibatis/transaction/Transaction.java
public interface Transaction {
    //获取db连接
    Connection getConnection() throws SQLException;
}

// src/main/java/org/apache/ibatis/transaction/jdbc/JdbcTransaction.java
public class JdbcTransaction implements Transaction {
    protected Connection connection;
    protected DataSource dataSource;
    
    // 连接存在直接返回连接
    @Override
    public Connection getConnection() throws SQLException {
      if (connection == null) {
        openConnection();
      }
      return connection;
    }
   
    // 从dataSource中获取连接
    protected void openConnection() throws SQLException {
        connection = dataSource.getConnection();
     // ...
    }
}

1.2 什么时候会获取连接

在真正要对db执行一台语句的时候会去从事务管理器中去获取连接,来生成执行sql的Statement。
sqlSesson中的语句是通过其Exector来执行的,在各个类型的Executor中,对于所以的语句只要不是从缓存可以获取的或是语句的Statement有缓存,都会先从事务管理其中获取到连接去生产执行语句的Statement。

在所有Executor的父类BaseExecutor中定义了获取连接:

// src/main/java/org/apache/ibatis/executor/BatchExecutor.java
protected Connection getConnection(Log statementLog) throws SQLException {
  Connection connection = transaction.getConnection();
  if (statementLog.isDebugEnabled()) {
    return ConnectionLogger.newInstance(connection, statementLog, queryStack);
  } else {
    return connection;
  }
}

1.3 什么时候释放连接

连接的释放依赖sqlSesson的关闭,也就是在关闭sqlSession时关闭连接。
连接是一个紧缺而重要的资源并且sqlSession也不是线程安全的,所以sqlSession在使用完成后要及时的关闭。mybatis官方甚至建议sqlSesion的生命周期应该是请求内的。

二、unpooled数据源

mybatis提供了unpooled、POOLED和JNDI三种类型的数据源:

unpooled比较直接简单,每次获取连接时都新建一条连接,关闭sqlSesion时直接将其关闭。

// 获取连接
private Connection doGetConnection(Properties properties) throws SQLException {
  initializeDriver();
  Connection connection = DriverManager.getConnection(url, properties);
  configureConnection(connection);
  return connection;
}

三、POOLED数据源

pooled数据源是池化的连接。

3.1 POOLEN连接池配置项

poolMaximumActiveConnections

最大活动连接数,即被某个线程占用在使用的连接,默认为10。

poolMaximumIdleConnections

最大空间连接数,即未被任何线程使用的连接,默认为5。

poolMaximumCheckoutTime

活跃连接被收回给其它线程使用前的等待时间,即活跃连接超过了该时间则有可能被给到其它线程使用,单位为毫秒,默认为20000。

poolTimeToWait

当需要连接到空闲连接为空同时活跃连接也到达了最大并且做‘老’的活跃连接也未超时时,获取连接的动作要等待poolTimeToWait时间后再重新尝试获取,单位为毫秒,默认为20000。

poolMaximumLocalBadConnectionTolerance

获取到连接可能不能用了,这就是一个环连接,但获取到一个环连接时会尝试从新获取,但一个线程一次获取连接时获取到的坏连接次数不能超过poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance,默认为3。

poolPingEnabled

检测连接是否可用时,是否开启ping测试,这里ping测试是指通过使用该连接执行一个sql看是否可用成功执行。若执行成功则连接可用,若执行抛出任何异常连接都不可用。默认为“NO PING QUERY SET”,执行该语句一定会报错,所以该配置必须配。

poolPingQuery

执行ping测试时执行的sql,默认为false。

poolPingConnectionsNotUsedFor
poolPingConnectionsNotUsedFor执行ping测试频率,默认为0即每次测试连接连接性的同时都进行ping测试。

3.2 POOLED数据源简介

POOLED数据源的连接池又分为空闲连接池和活动连接池,初始时这两个池都是空的。获取连接的基本步骤如下:

  1. 先从空闲连接池中获取,若空闲连接池中有空闲的连接则返回。
  2. 若空闲连接池中没有连接但活动连接池没满,则新建一个新的连接并放到活跃连接池中然后放回。
  3. 若空闲连接池中为空且活跃池也满了,则尝试从活跃连接池检出第一个连接,当然该连接可能当前并没有过期(没有超过poolMaximumCheckoutTime),那么就等待poolTimeToWait时间后重复到第一步。
1598447976325-0e237a93bd2c040b.png

当关闭连接时,对POOLED的连接并不会释放,而是放回到空闲链接池中。

我们可以看到POOLED数据源的的瓶颈主要还是在于活跃连接池的大小,活跃连接池的数量决定了当前最多有多少连接可以在同一时段内访问db,对于db访问密集的应用来说该数量甚至决定了单个应用能同时支持的并发数。

同时db是在用过后才会放到空闲连接池中,并不会预先创建,这样避免了持有过多无用的连接,但这要求我们连接要避免慢查询并且一定要及时关闭连接。
上面只是连接池的基本逻辑,详细实现如下。

3.3 PoolState

PoolState记录了池的状态,主要有以下属性:

protected final List<PooledConnection> idleConnections = new ArrayList<>();
protected final List<PooledConnection> activeConnections = new ArrayList<>();

3.5 PooledConnection

从POOLED的数据源中获取到的连接都是Connection的动态代理实例,动态代理类就是PooledConnection,但与普通的动态代理不同的是每个Connection的动态代理实例都有一个唯一的动态代理类PooledConnection。

为什么要动态代理?
线程从POOLED中获取到的连接后,可以对链接执行任意的操作包括关闭链接的操作,但我们池化链接要是让外部随意就把链接关了,那么我们池中的链接可能慢慢的就都没了,
所以不能让连接被随便的关闭,对于关闭的链接应该放到空闲链接池中去给后续其它线程使用。
所以使用动态代理,PooledConnection代理类对于close方法并不会执行colse而是将链接返回到空闲链接池中。

同时PooledConnection还记录了连接的一些其它属性:

3.5.1 代理执行

在创建PooledConnection是为其Connection创建一个动态代理实例,返回给外部的也正是这个动态代理的实例:

public PooledConnection(Connection connection, PooledDataSource dataSource) {
  this.hashCode = connection.hashCode();
  this.realConnection = connection;
  this.dataSource = dataSource;
  this.createdTimestamp = System.currentTimeMillis();
  this.lastUsedTimestamp = System.currentTimeMillis();
  this.valid = true;
  // 动态代理实例
  this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}

在执行时若是执行的是close方法会建连接返回到连接池中,若是是Object的方法则直接执行,若执行的是非close和非Object中的方法,则会先验证valid是否为true,也就是连接是否有效:

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  String methodName = method.getName();
  // close方法将连接返回到空闲池
  if (CLOSE.equals(methodName)) {
    dataSource.pushConnection(this);
    return null;
  }
  try {
    // 非Object中继承的发放先验证vaild
    if (!Object.class.equals(method.getDeclaringClass())) {
      checkConnection();
    }
    return method.invoke(realConnection, args);
  } catch (Throwable t) {
    throw ExceptionUtil.unwrapThrowable(t);
  }
}

private void checkConnection() throws SQLException {
  if (!valid) {
    throw new SQLException("Error accessing PooledConnection. Connection is invalid.");
  }
}
3.5.2 判断链接是否可用

在池中的链接会被一直持有者,也就是一直不会主动关闭,但难免会误关闭或遇到一下特殊情况而“被”关闭导致的链接不可用的情况,例如mysql的链接在wait_timeout(默认8小时)内没有与db发生交互那么mysql就会将链接置为过期,需要重新链接。所以判断一个PooledConnection是真实可用的十分重要。
判断PooledConnection是可用的首先valid必须是true并且realConnection不为null以及realConnection是可以‘ping’通的。

public boolean isValid() {
  return valid && realConnection != null && dataSource.pingConnection(this);
}

3.6 PooledDataSource

POOLED数据源的连接池的DataSource,实现了DataSource接口。用于获取和管理池化的连接。POOLED数据源的几个配置最终就是影射到它的几个字段上,并根据这几个配置来管理池化连接。此外还有一个PoolState字段保存了连接池状态。

public class PooledDataSource implements DataSource {
    // 连接池状态
    private final PoolState state = new PoolState(this);
    // 在任意时间可存在的活动(正在使用)连接数量
    protected int poolMaximumActiveConnections = 10;
    // 任意时间可能存在的空闲连接数
    protected int poolMaximumIdleConnections = 5;
    // 一个线程获取连接时能容忍的连线获取到坏连接数
    protected int poolMaximumLocalBadConnectionTolerance = 3;
    // 从active池中强制检出的最大时间
    protected int poolMaximumCheckoutTime = 20000;
    // 没有可用连接时再次尝试获取连接的等待时间
    protected int poolTimeToWait = 20000;
    // 是否进行ping检测
    protected boolean poolPingEnabled;
    // ping的频率,默认为0
    protected int poolPingConnectionsNotUsedFor;
    // ping检测语句
    protected String poolPingQuery = "NO PING QUERY SET";
}

每个SqlSessionFactory只有一个连接池即只会有一个PooledDataSource,但从中获取连接的线程有多个,为了保证获取连接是的线程安全,在PooledDataSource中获取连接和将连接返回到空闲池时的临界区使用synchronized对state加锁来同步访问。

// 获取连接
private PooledConnection popConnection(String userName, String password) throws SQLException {
    // ...
    while (conn == null) {
        synchronized (state) {
            // ...
        }
    }
}
// 连接放回空闲池
protected void pushConnection(PooledConnection conn) throws SQLException {
    synchronized (state) {
        // ...
    }
}
// 关闭所有连接
public void forceCloseAll() {
  synchronized (state) {
  }
}

3.6.1 pingConnection

PooledDataSource提供了ping检测连接可用性。
pingConnection的ping探测不是我们常见的ping命令的探测,毕竟可ping通一个机器和这个机器的db链接可用可以说是是两回事。
PooledDataSource的pingConnection先校验连接是否已经关闭了若已经关闭了那么这个连接自然也就不能用了。
若连接未关闭并开启了ping测试即poolPingEnabled配置为true,那么就使用连接执行poolPingQuery指定的语句,语句执行成功则任务链接正常,但若执行时抛出任何异常都认为链接不可用。
poolPingQuery的默认配置为“NO PING QUERY SET”,这个语句是不符合sql语法的,执行这个语句会抛出MySQLSyntaxErrorException异常,在执行语句时抛出异常会认为链接不可用,所以一定要记得配置poolPingQuery。
通过执行语句来做检测的‘成本’是很高的,所以配置poolPingQuery应该尽量简单并且没有很大的查询返回。同时pingConnection是ping检测只有在连接从PooledDataSource中检出的时间大于poolPingConnectionsNotUsedFor的时间是才进行ping, poolPingConnectionsNotUsedFor默认为0,建议将poolPingConnectionsNotUsedFor设置为和数据库链接超时一样避免不必要的检测。
在进行ping检测是若检测到链接不可用了,还会主动将链接close,当再次但检测这个链接时直接判断closed而返回不用做sql执行检测。

protected boolean pingConnection(PooledConnection conn) {
    boolean result = true;
    // 已经closed了那么必然是不可用了
    try {
        result = !conn.getRealConnection().isClosed();
    } catch (SQLException e) {
        return false;
    }

    // 启用了ping且拼间隔时间大于等于0,同时返回使用的时间大于ping间隔时间则进行检测
    if (result && poolPingEnabled
            && poolPingConnectionsNotUsedFor >= 0
            && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
        try {
            Connection realConn = conn.getRealConnection();
            try (Statement statement = realConn.createStatement()) {
                statement.executeQuery(poolPingQuery).close();
            }
            if (!realConn.getAutoCommit()) {
                realConn.rollback();
            }
            result = true;
        } catch (Exception e) {
            try {
                // 对不可用的连接主动关闭
                conn.getRealConnection().close();
            } catch (Exception e2) {
            }
            result = false;
        }
    }
    return result;
}

什么时候进行链接的有效性检测?
只有以下两个地方会进行检测:

  1. 从PooledDataSource获取到链接时进行检测确保放回的链接是真的可用的。
private PooledConnection popConnection(String username, String password) throws SQLException {
    // 获取连接
    ...
    // 获取到连接对连接进行检测
    if (conn != null) {
        if (conn.isValid()) {
            // ...
        }
    }
}
  1. 还有就是在连接用完放回到空闲链接池的时候进行检测确保放回的连接是可用的。
protected void pushConnection(PooledConnection conn) throws SQLException {
    synchronized (state) {
        // 从活跃连接池中移除
        state.activeConnections.remove(conn);
        // 连接是可用的则返回空闲链接池
        if (conn.isValid()) {
            // 返回空闲连接池
        }
    }
}
3.6.2 获取链接

PooledDataSource实现了DataSource接口,其中的getConnection两个方法通过调用popConnection来获取连接:

@Override
public Connection getConnection() throws SQLException {
  return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}

@Override
public Connection getConnection(String username, String password) throws SQLException {
  return popConnection(username, password).getProxyConnection();
}

popConnection获取连接时经过以下步骤:

1、若空闲连接池不为空则从空闲链接池只能够获取

if (!state.idleConnections.isEmpty()) {
  conn = state.idleConnections.remove(0);
}

2、如果空闲连接池为空但活跃连接池未满则新建一个连接

if (state.activeConnections.size() < poolMaximumActiveConnections) {
    conn = new PooledConnection(dataSource.getConnection(), this);
    } else
}

3、若活跃连接池已满则尝试从活跃连接中检出第一个,若第一个连接已经过期则将其从活跃连接池中移除并使用其realConnection新建一个PoolenConnection,若连接未设置自动提交则执行以下回滚,避免新连接给到其它线程使用是误提交了非自己的操作,将旧的连接置为无效,这样若其它线程还使用就的连接进行操作就会抛出异常。

PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) {
    state.activeConnections.remove(oldestActiveConnection);
    if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
  try {
    oldestActiveConnection.getRealConnection().rollback();
  } catch (SQLException e) {
  }
  // 使用原来的连接建一个新的PooledConnection
  conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
  // 原来的连接置为无效
  oldestActiveConnection.invalidate();

}

4、 活跃链接池中的第一个连接可能并没有过期,那么就要等待poolTimeToWait时间后从新进行第一步

try {
  state.wait(poolTimeToWait);
} catch (InterruptedException e) {
  break;
}
``


##### 3.5.7 释放连接
先将连接从活跃连接池中移除。再校验连接是否是有效的若无效了则不会添加到空闲池中。
若连有效且空闲连接池未满,则使用被关闭的连接的realConnection新建一个连接并添加到空闲连接池中,并将原来置为无效,防止再被原来持有该连接的线程使用到。

若空闲连接池已满则直接将连接关闭,当然关闭前还是会对非自动提交的连接主动进行回滚。

```java
protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) {
      // 将连接从活跃连接中移除
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        // 有效的连接才返回连接池
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            // 非自动提交的主动回滚一下
            conn.getRealConnection().rollback();
          }
          // 使用原来的连接新建一个
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          // 原来的连接置为无效
          conn.invalidate();
          state.notifyAll();
        }  
  }
上一篇 下一篇

猜你喜欢

热点阅读