DataSource
DataSource
在数据持久层中,数据源是一个非常重要的组件,其性能直接关系到整个数据持久层的性能。在实践中比较常见的第三方数据源组件有 pache Common DBCP,C3PO,Druid 等,MyBatis 不仅可以集成第三方数据源组件,还提供了自己的数据源实现。
UML 类图
MyBatis 提供了两个 javax. sql.DataSource 接口实现,分别是PooledDataSource 和 UnpooledDataSource, Mybatis 使用不同的 DataSourceFactory 接口实现创建不同类型的 DataSource ,这是工厂方法模式的一个典型应用。
image.png
DataSource
javax.sql.DataSource 主要是用来获取数据库连接 Connetion的。
public interface DataSource extends CommonDataSource, Wrapper {
/**
* <p>Attempts to establish a connection with the data source that
* this {@code DataSource} object represents.
*
* @return a connection to the data source
* @exception SQLException if a database access error occurs
* @throws java.sql.SQLTimeoutException when the driver has determined that the
* timeout value specified by the {@code setLoginTimeout} method
* has been exceeded and has at least tried to cancel the
* current database connection attempt
*/
Connection getConnection() throws SQLException;
/**
* <p>Attempts to establish a connection with the data source that
* this {@code DataSource} object represents.
*
* @param username the database user on whose behalf the connection is
* being made
* @param password the user's password
* @return a connection to the data source
* @exception SQLException if a database access error occurs
* @throws java.sql.SQLTimeoutException when the driver has determined that the
* timeout value specified by the {@code setLoginTimeout} method
* has been exceeded and has at least tried to cancel the
* current database connection attempt
* @since 1.4
*/
Connection getConnection(String username, String password)
throws SQLException;
}
DataSourceFactory
DataSourceFactory 是一个接口,数据源工厂类,主要定义了获取DataSource 的规范。
public interface DataSourceFactory {
// 设置数据源的属性
void setProperties(Properties props);
// 获取数据源
DataSource getDataSource();
}
UnpooledDataSourceFactory
非连接池数据源工厂,通过构造方法直接返回 UnpooledDataSource 对象,并设置相关连接属性。
public class UnpooledDataSourceFactory implements DataSourceFactory {
private static final String DRIVER_PROPERTY_PREFIX = "driver.";
private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();
protected DataSource dataSource;
/**
* 直接返回 UnpooledDataSource 对象
*/
public UnpooledDataSourceFactory() {
this.dataSource = new UnpooledDataSource();
}
/**
* 设置数据源的属性
* @param properties mybatis.config.xml 中配置的 dataSource 节点下的属性,包括 driver、url、username 等
*
* <dataSource type="UNPOOLED">
* <property name="driver" value="org.hsqldb.jdbcDriver" />
* <property name="url" value="jdbc:hsqldb:mem:localtime" />
* <property name="username" value="sa" />
* </dataSource>
*
*/
@Override
public void setProperties(Properties properties) {
Properties driverProperties = new Properties();
// 创建 dataSource 的 MetaObject 对象
MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
// 遍历 properties
for (Object key : properties.keySet()) {
String propertyName = (String) key;
if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
//以 driver. 开头的自己置项是对 DataSource 的配置,记录到 driverProperties 中保存
String value = properties.getProperty(propertyName);
driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
} else if (metaDataSource.hasSetter(propertyName)) {
String value = (String) properties.get(propertyName);
// 数据类型转换
Object convertedValue = convertValue(metaDataSource, propertyName, value);
// 设置属性给 metaDataSource,从而设置值给 dataSource 对象
metaDataSource.setValue(propertyName, convertedValue);
} else {
throw new DataSourceException("Unknown DataSource property: " + propertyName);
}
}
// 设置 driverProperties 属性值
if (driverProperties.size() > 0) {
metaDataSource.setValue("driverProperties", driverProperties);
}
}
@Override
public DataSource getDataSource() {
return dataSource;
}
/**
* 转换数据类型
* @param metaDataSource
* @param propertyName
* @param value
* @return
*/
private Object convertValue(MetaObject metaDataSource, String propertyName, String value) {
Object convertedValue = value;
Class<?> targetType = metaDataSource.getSetterType(propertyName);
if (targetType == Integer.class || targetType == int.class) {
convertedValue = Integer.valueOf(value);
} else if (targetType == Long.class || targetType == long.class) {
convertedValue = Long.valueOf(value);
} else if (targetType == Boolean.class || targetType == boolean.class) {
convertedValue = Boolean.valueOf(value);
}
return convertedValue;
}
}
PooledDataSourceFactory
PooledDataSourceFactory 继承了 UnpooledDataSourceFactory,构造方法直接返回 PooledDataSource 对象。
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
public PooledDataSourceFactory() {
this.dataSource = new PooledDataSource();
}
}
JndiDataSourceFactory
JndiDataSourceFactory 是依赖开 JNDI 服务从容器中获取用户配置的 DataSource。
JNDI 是 Java Naming and Directory Interface ,Java 命名和目录接口,提供与外界的一个访问关系,只要相关应用、设备能提供服务,那么我们就可以通过JNDI来连接处理。
参考:https://www.cnblogs.com/wlzjdm/p/7856356.html
public class JndiDataSourceFactory implements DataSourceFactory {
public static final String INITIAL_CONTEXT = "initial_context";
public static final String DATA_SOURCE = "data_source";
public static final String ENV_PREFIX = "env.";
private DataSource dataSource;
@Override
public void setProperties(Properties properties) {
try {
InitialContext initCtx;
// 获取 env. 开头的环境变量属性
Properties env = getEnvProperties(properties);
if (env == null) {
initCtx = new InitialContext();
} else {
initCtx = new InitialContext(env);
}
if (properties.containsKey(INITIAL_CONTEXT)
&& properties.containsKey(DATA_SOURCE)) {
// 查找 initial_context 目录
Context ctx = (Context) initCtx.lookup(properties.getProperty(INITIAL_CONTEXT));
// 查找数据源
dataSource = (DataSource) ctx.lookup(properties.getProperty(DATA_SOURCE));
} else if (properties.containsKey(DATA_SOURCE)) {
// 直接查找数据源
dataSource = (DataSource) initCtx.lookup(properties.getProperty(DATA_SOURCE));
}
} catch (NamingException e) {
throw new DataSourceException("There was an error configuring JndiDataSourceTransactionPool. Cause: " + e, e);
}
}
@Override
public DataSource getDataSource() {
return dataSource;
}
/**
* 获取 env. 开头的环境变量属性
* @param allProps
* @return
*/
private static Properties getEnvProperties(Properties allProps) {
final String PREFIX = ENV_PREFIX;
Properties contextProperties = null;
for (Entry<Object, Object> entry : allProps.entrySet()) {
String key = (String) entry.getKey();
String value = (String) entry.getValue();
if (key.startsWith(PREFIX)) {
if (contextProperties == null) {
contextProperties = new Properties();
}
contextProperties.put(key.substring(PREFIX.length()), value);
}
}
return contextProperties;
}
}
UnpooledDataSource
UnpooledDataSource 是非连接池数据源,主要用来获取 Connection。主要涉及了数据库驱动的注册、连接的参数设置和连接的获取。
UnpooledDataSource 覆写了 DataSource 接口定义的getConnection() 及其重载方法,用于获取数据库连接,每次通过 UnpooledDataSource.getConnection() 方法都会去数据库连接时都会创建一个新的连接。
- 获取连接的一般方法:
1.加载驱动程序 Class.forName("com.mysql.jdbc.Driver");
2.获取数据库连接 Connection con = DriverManager.getConnection(url,user,password);
3.创建statement类对象,用来执行SQL语句,Statement statement = con.createStatement();
4.执行 SQL 语句,获取结果 ResultSet rs = statement.executeQuery(sql);
5.关闭 ResultSet、Connection 对象
public class UnpooledDataSource implements DataSource {
// 加载 Driver 类的类加载器
private ClassLoader driverClassLoader;
// 数据库连接驱动的相关配置
private Properties driverProperties;
// 缓存所有已注册的数据库连接驱动
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
// 数据库连接 驱动名称
private String driver;
// 用户名
private String username;
// 数据库 URL
private String url;
// 密码
private String password;
// 事务自动提交
private Boolean autoCommit;
// 默认事务隔离级别
private Integer defaultTransactionIsolationLevel;
// 默认网络超时时间
private Integer defaultNetworkTimeout;
// 获取已经注册的驱动程序,并缓存到 registeredDrivers
static {
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
Driver driver = drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
public UnpooledDataSource() {
}
public UnpooledDataSource(String driver, String url, String username, String password) {
this.driver = driver;
this.url = url;
this.username = username;
this.password = password;
}
public UnpooledDataSource(String driver, String url, Properties driverProperties) {
this.driver = driver;
this.url = url;
this.driverProperties = driverProperties;
}
public UnpooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
this.driverClassLoader = driverClassLoader;
this.driver = driver;
this.url = url;
this.username = username;
this.password = password;
}
public UnpooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
this.driverClassLoader = driverClassLoader;
this.driver = driver;
this.url = url;
this.driverProperties = driverProperties;
}
/**
* 获取数据库连接
* @return
* @throws SQLException
*/
@Override
public Connection getConnection() throws SQLException {
return doGetConnection(username, password);
}
/**
* 获取数据库连接
* @param username
* @param password
* @return
* @throws SQLException
*/
@Override
public Connection getConnection(String username, String password) throws SQLException {
return doGetConnection(username, password);
}
/**
* 设置数据驱动 获取数据库连接的最大等待时间
* @param loginTimeout
*/
@Override
public void setLoginTimeout(int loginTimeout) {
DriverManager.setLoginTimeout(loginTimeout);
}
@Override
public int getLoginTimeout() {
return DriverManager.getLoginTimeout();
}
@Override
public void setLogWriter(PrintWriter logWriter) {
DriverManager.setLogWriter(logWriter);
}
@Override
public PrintWriter getLogWriter() {
return DriverManager.getLogWriter();
}
public ClassLoader getDriverClassLoader() {
return driverClassLoader;
}
public void setDriverClassLoader(ClassLoader driverClassLoader) {
this.driverClassLoader = driverClassLoader;
}
public Properties getDriverProperties() {
return driverProperties;
}
public void setDriverProperties(Properties driverProperties) {
this.driverProperties = driverProperties;
}
public synchronized String getDriver() {
return driver;
}
public synchronized void setDriver(String driver) {
this.driver = driver;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public Boolean isAutoCommit() {
return autoCommit;
}
public void setAutoCommit(Boolean autoCommit) {
this.autoCommit = autoCommit;
}
public Integer getDefaultTransactionIsolationLevel() {
return defaultTransactionIsolationLevel;
}
public void setDefaultTransactionIsolationLevel(Integer defaultTransactionIsolationLevel) {
this.defaultTransactionIsolationLevel = defaultTransactionIsolationLevel;
}
/**
* @since 3.5.2
*/
public Integer getDefaultNetworkTimeout() {
return defaultNetworkTimeout;
}
/**
* Sets the default network timeout value to wait for the database operation to complete. See {@link Connection#setNetworkTimeout(java.util.concurrent.Executor, int)}
*
* @param defaultNetworkTimeout
* The time in milliseconds to wait for the database operation to complete.
* @since 3.5.2
*/
public void setDefaultNetworkTimeout(Integer defaultNetworkTimeout) {
this.defaultNetworkTimeout = defaultNetworkTimeout;
}
/**
* 具体获得连接,封装参数
* @param username
* @param password
* @return
* @throws SQLException
*/
private Connection doGetConnection(String username, String password) throws SQLException {
Properties props = new Properties();
if (driverProperties != null) {
props.putAll(driverProperties);
}
if (username != null) {
props.setProperty("user", username);
}
if (password != null) {
props.setProperty("password", password);
}
return doGetConnection(props);
}
/**
* 具体获取连接
* @param properties
* @return
* @throws SQLException
*/
private Connection doGetConnection(Properties properties) throws SQLException {
// 初始化数据库驱动
initializeDriver();
// 获取连接
Connection connection = DriverManager.getConnection(url, properties);
// 配置数据的 autoCommit 和 隔离级别
configureConnection(connection);
return connection;
}
/**
* 初始化数据库驱动
* @throws SQLException
*/
private synchronized void initializeDriver() throws SQLException {
if (!registeredDrivers.containsKey(driver)) { // 检测驱动是否已经注册
// 获取数据库驱动类
Class<?> driverType;
try {
if (driverClassLoader != null) {
driverType = Class.forName(driver, true, driverClassLoader);
} else {
driverType = Resources.classForName(driver);
}
// DriverManager requires the driver to be loaded via the system ClassLoader.
// http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
// 创建 Driver 对象
Driver driverInstance = (Driver)driverType.getDeclaredConstructor().newInstance();
// 注册驱动,DriverProxy 是 UnPooledDataSource 中的内部类,是 Driver 的静态代理类
DriverManager.registerDriver(new DriverProxy(driverInstance));
// 将驱动添加到 registeredDrivers 集合中
registeredDrivers.put(driver, driverInstance);
} catch (Exception e) {
throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
}
}
}
/**
* 配置链接参数
* @param conn
* @throws SQLException
*/
private void configureConnection(Connection conn) throws SQLException {
// 设置网络超时时间
if (defaultNetworkTimeout != null) {
conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
}
if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
conn.setAutoCommit(autoCommit);
}
if (defaultTransactionIsolationLevel != null) {
conn.setTransactionIsolation(defaultTransactionIsolationLevel);
}
}
/**
* 驱动代理
*/
private static class DriverProxy implements Driver {
private Driver driver;
DriverProxy(Driver d) {
this.driver = d;
}
@Override
public boolean acceptsURL(String u) throws SQLException {
return this.driver.acceptsURL(u);
}
@Override
public Connection connect(String u, Properties p) throws SQLException {
return this.driver.connect(u, p);
}
@Override
public int getMajorVersion() {
return this.driver.getMajorVersion();
}
@Override
public int getMinorVersion() {
return this.driver.getMinorVersion();
}
@Override
public DriverPropertyInfo[] getPropertyInfo(String u, Properties p) throws SQLException {
return this.driver.getPropertyInfo(u, p);
}
@Override
public boolean jdbcCompliant() {
return this.driver.jdbcCompliant();
}
@Override
public Logger getParentLogger() {
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
}
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new SQLException(getClass().getName() + " is not a wrapper.");
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
@Override
public Logger getParentLogger() {
// requires JDK version 1.6
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
}
}
数据源连接池
MyBatis 实现了数据源连接池,主要通过 PooledConnection、PoolState、PooledDataSource 实现了连接池功能。连接池主要用来保存活跃连接,还有从连接池中获取连接,还有连接 Connection.close() 方法,不在是关闭连接,而是将连接放入连接池中,供下次连接的获取。通过 JDK 动态代理创建了 Connection 的代理对象,在调用 close() 方法时,通过 invoke 方法实行了拦截,将连接放入了连接池中。
UML
PooledDataSource 实现了简易数据库连接池的功能,它依赖的组件如图, PooledDataSource 创建新数据库连接的功能是依赖其中封装的 UnpooledDataSource对象实现的。
PooledDataSource 并不会直接管理 java.sqI.Connection 对象,而是管理 PooledConnection 对象。在 PooledConnection 中封装了 真正的数据库连接对象(java.sql.Connection )以及其代理对象,这里的代理对象是通过 JDK 动态代理产生的。
image.png
PooledConnection
连接池中的一个连接,包含一个连接的代理对象,用来实现调用 close() 方法时,进行拦截,将连接重新放入连接池中。
主要实现了 InvocationHandler 接口,创建了 connection 代理对象,通过 invoke() 方法拦截了 connection.close() 方法,将连接重新放入连接池中。
class PooledConnection implements InvocationHandler {
private static final String CLOSE = "close";
private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
private final int hashCode;
// 记录当前 PooledConnection 对象所在的 PooledDataSource 对象。
// 该 PooledConnection 是从该 PooledDataSource 中获取的;
// 当调用 close() 方法时会将 PooledConnection 放回该 PooledDataSource 中
private final PooledDataSource dataSource;
// 真正的数据连接
private final Connection realConnection;
// 数据库连接代理对象
private final Connection proxyConnection;
// 从数据库连接池 PooledDataSource 中, 获取该链接的时间戳
private long checkoutTimestamp;
// 该连接被创建的时间戳
private long createdTimestamp;
// 最后一次被使用的时间戳
private long lastUsedTimestamp;
// 由数据库 URL、用户名和密码计算出来的 hash 值,可用于标识该连接所在的连接池
private int connectionTypeCode;
// 检测当前 PooledConnection 是否有效,主要是为了防止程序通过 close() 方法将连接归还给连接池之后,依然通过该连接操作数据库
private boolean valid;
/**
* Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in.
*
* @param connection - the connection that is to be presented as a pooled connection
* @param dataSource - the dataSource that the connection is from
*/
public PooledConnection(Connection connection, PooledDataSource dataSource) {
this.hashCode = connection.hashCode();
this.realConnection = connection;
this.dataSource = dataSource;
this.createdTimestamp = System.currentTimeMillis();
this.lastUsedTimestamp = System.currentTimeMillis();
this.valid = true;
this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
}
/**
* Invalidates the connection.
*/
public void invalidate() {
valid = false;
}
/**
* 连接是否有效,及执行测试 SQL
* Method to see if the connection is usable.
*
* @return True if the connection is usable
*/
public boolean isValid() {
return valid && realConnection != null && dataSource.pingConnection(this);
}
/**
* Getter for the *real* connection that this wraps.
*
* @return The connection
*/
public Connection getRealConnection() {
return realConnection;
}
/**
* Getter for the proxy for the connection.
*
* @return The proxy
*/
public Connection getProxyConnection() {
return proxyConnection;
}
/**
* Gets the hashcode of the real connection (or 0 if it is null).
*
* @return The hashcode of the real connection (or 0 if it is null)
*/
public int getRealHashCode() {
return realConnection == null ? 0 : realConnection.hashCode();
}
/**
* Getter for the connection type (based on url + user + password).
*
* @return The connection type
*/
public int getConnectionTypeCode() {
return connectionTypeCode;
}
/**
* Setter for the connection type.
*
* @param connectionTypeCode - the connection type
*/
public void setConnectionTypeCode(int connectionTypeCode) {
this.connectionTypeCode = connectionTypeCode;
}
/**
* Getter for the time that the connection was created.
*
* @return The creation timestamp
*/
public long getCreatedTimestamp() {
return createdTimestamp;
}
/**
* Setter for the time that the connection was created.
*
* @param createdTimestamp - the timestamp
*/
public void setCreatedTimestamp(long createdTimestamp) {
this.createdTimestamp = createdTimestamp;
}
/**
* Getter for the time that the connection was last used.
*
* @return - the timestamp
*/
public long getLastUsedTimestamp() {
return lastUsedTimestamp;
}
/**
* Setter for the time that the connection was last used.
*
* @param lastUsedTimestamp - the timestamp
*/
public void setLastUsedTimestamp(long lastUsedTimestamp) {
this.lastUsedTimestamp = lastUsedTimestamp;
}
/**
* Getter for the time since this connection was last used.
*
* @return - the time since the last use
*/
public long getTimeElapsedSinceLastUse() {
return System.currentTimeMillis() - lastUsedTimestamp;
}
/**
* Getter for the age of the connection.
*
* @return the age
*/
public long getAge() {
return System.currentTimeMillis() - createdTimestamp;
}
/**
* Getter for the timestamp that this connection was checked out.
*
* @return the timestamp
*/
public long getCheckoutTimestamp() {
return checkoutTimestamp;
}
/**
* Setter for the timestamp that this connection was checked out.
*
* @param timestamp the timestamp
*/
public void setCheckoutTimestamp(long timestamp) {
this.checkoutTimestamp = timestamp;
}
/**
* 连接已经取出,检测现在到连接取出的时间段
* Getter for the time that this connection has been checked out.
*
* @return the time
*/
public long getCheckoutTime() {
return System.currentTimeMillis() - checkoutTimestamp;
}
@Override
public int hashCode() {
return hashCode;
}
/**
* Allows comparing this connection to another.
*
* @param obj - the other connection to test for equality
* @see Object#equals(Object)
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof PooledConnection) {
return realConnection.hashCode() == ((PooledConnection) obj).realConnection.hashCode();
} else if (obj instanceof Connection) {
return hashCode == obj.hashCode();
} else {
return false;
}
}
/**
* Required for InvocationHandler implementation.
* 主要用来 拦截 close 方法,实现将连接放入连接池中
* @param proxy - not used
* @param method - the method to be executed
* @param args - the parameters to be passed to the method
* @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
// 如果调用 close() 方法,则将其重新放入连接池,而不是真正关闭数据库连接
if (CLOSE.equals(methodName)) {
// 将连接放入数据库连接池中
dataSource.pushConnection(this);
return null;
}
try {
if (!Object.class.equals(method.getDeclaringClass())) {
// issue #579 toString() should never fail
// throw an SQLException instead of a Runtime
checkConnection(); // 通过 valid 字段检测连接是否有效
}
// 调用真正数据库连接对象的对应方法
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
private void checkConnection() throws SQLException {
if (!valid) {
throw new SQLException("Error accessing PooledConnection. Connection is invalid.");
}
}
}
PoolState
PoolState 是线程池的状态,用于管理 PooledConnection 对象状态的组件,通过 idleConnections 和 activeConnections 两个 PooledConnection 集合分别管理空闲状态的连接和活跃状态的连接。
public class PoolState {
// 连接池
protected PooledDataSource dataSource;
// 空闲的连接集合
protected final List<PooledConnection> idleConnections = new ArrayList<>();
// 活跃的连接集合
protected final List<PooledConnection> activeConnections = new ArrayList<>();
// 请求数据库连接的次数
protected long requestCount = 0;
// 获取连接的累积时间
protected long accumulatedRequestTime = 0;
// checkoutTime 表示应 用从连接池中取出连接,到归还连接这段时长,accumulatedCheckoutTime 记录了所有连接累积的 checkout Time 时长
protected long accumulatedCheckoutTime = 0;
// 当连接长时间未归还给连接池时,会被认为该连接超时,claimedOverdueConnectionCount 记录了超时的连接个数
protected long claimedOverdueConnectionCount = 0;
// 累计超时时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
// 累计等待时间
protected long accumulatedWaitTime = 0;
// 等待次数
protected long hadToWaitCount = 0;
// 无效的连接数
protected long badConnectionCount = 0;
public PoolState(PooledDataSource dataSource) {
this.dataSource = dataSource;
}
public synchronized long getRequestCount() {
return requestCount;
}
public synchronized long getAverageRequestTime() {
return requestCount == 0 ? 0 : accumulatedRequestTime / requestCount;
}
public synchronized long getAverageWaitTime() {
return hadToWaitCount == 0 ? 0 : accumulatedWaitTime / hadToWaitCount;
}
public synchronized long getHadToWaitCount() {
return hadToWaitCount;
}
public synchronized long getBadConnectionCount() {
return badConnectionCount;
}
public synchronized long getClaimedOverdueConnectionCount() {
return claimedOverdueConnectionCount;
}
public synchronized long getAverageOverdueCheckoutTime() {
return claimedOverdueConnectionCount == 0 ? 0 : accumulatedCheckoutTimeOfOverdueConnections / claimedOverdueConnectionCount;
}
public synchronized long getAverageCheckoutTime() {
return requestCount == 0 ? 0 : accumulatedCheckoutTime / requestCount;
}
public synchronized int getIdleConnectionCount() {
return idleConnections.size();
}
public synchronized int getActiveConnectionCount() {
return activeConnections.size();
}
@Override
public synchronized String toString() {
StringBuilder builder = new StringBuilder();
builder.append("\n===CONFINGURATION==============================================");
builder.append("\n jdbcDriver ").append(dataSource.getDriver());
builder.append("\n jdbcUrl ").append(dataSource.getUrl());
builder.append("\n jdbcUsername ").append(dataSource.getUsername());
builder.append("\n jdbcPassword ").append(dataSource.getPassword() == null ? "NULL" : "************");
builder.append("\n poolMaxActiveConnections ").append(dataSource.poolMaximumActiveConnections);
builder.append("\n poolMaxIdleConnections ").append(dataSource.poolMaximumIdleConnections);
builder.append("\n poolMaxCheckoutTime ").append(dataSource.poolMaximumCheckoutTime);
builder.append("\n poolTimeToWait ").append(dataSource.poolTimeToWait);
builder.append("\n poolPingEnabled ").append(dataSource.poolPingEnabled);
builder.append("\n poolPingQuery ").append(dataSource.poolPingQuery);
builder.append("\n poolPingConnectionsNotUsedFor ").append(dataSource.poolPingConnectionsNotUsedFor);
builder.append("\n ---STATUS-----------------------------------------------------");
builder.append("\n activeConnections ").append(getActiveConnectionCount());
builder.append("\n idleConnections ").append(getIdleConnectionCount());
builder.append("\n requestCount ").append(getRequestCount());
builder.append("\n averageRequestTime ").append(getAverageRequestTime());
builder.append("\n averageCheckoutTime ").append(getAverageCheckoutTime());
builder.append("\n claimedOverdue ").append(getClaimedOverdueConnectionCount());
builder.append("\n averageOverdueCheckoutTime ").append(getAverageOverdueCheckoutTime());
builder.append("\n hadToWait ").append(getHadToWaitCount());
builder.append("\n averageWaitTime ").append(getAverageWaitTime());
builder.append("\n badConnectionCount ").append(getBadConnectionCount());
builder.append("\n===============================================================");
return builder.toString();
}
}
PooledDataSource
PooledDataSource 是一个简单的、同步的、线程安全的连接池。
PooledDataSource 中管理的真正的数据库连接对象是由 PooledDataSource 中封装的 UnpooledDataSource 对象创建的,并由 PoolState 管理所有连接的状态,把 UnpooledDataSource 包装成了 PooledDataSource, 从而形成了连接池。
主要的方法:
从连接池获取连接
private PooledConnection popConnection(String username, String password) throws SQLException
从连接池中,获取一个 PooledConnection 对象。
包含多种情况,1.PooledConnection 不存在,进行创建,2.存在直接从连接池中获取 等。
image.png
连接关闭重新放入连接池
protected void pushConnection(PooledConnection conn) throws SQLException
连接使用结束,关闭时,将连接重新放入连接池。
调用connect.close() 时,PooledConnection.invoke() 反射调用 close(),将 PooledConnection 对象归还给连接池,将 connect 从激活的 activeConnections 中移除
检测连接是否有效
执行测试 SQL
protected boolean pingConnection(PooledConnection conn) {
boolean result = true; // 记录 ping 操作是否成功
try {
result = !conn.getRealConnection().isClosed(); // 检测真正的数据库连接是否已经关闭
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
result = false;
}
if (result) {
if (poolPingEnabled) { // 检测 poolPingEnabled 设置,是否运行执行测试 SQL 语句
// 长时间(超过 poolPingConnectionsNotUsedFor 指定的时长)未使用的连接,才需要 ping 操作来检测数据库连接是否正常
if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
try {
if (log.isDebugEnabled()) {
log.debug("Testing connection " + conn.getRealHashCode() + " ...");
}
// 执行 测试 SQL 语句
Connection realConn = conn.getRealConnection();
try (Statement statement = realConn.createStatement()) {
statement.executeQuery(poolPingQuery).close();
}
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
}
} catch (Exception e) {
log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
try {
conn.getRealConnection().close(); // 关闭真实 Connection 连接
} catch (Exception e2) {
//ignore
}
result = false;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
}
}
}
}
return result;
}
连接池属性修改
当修改 PooledDataSource 的字段时,例如数据库 URL、用户名、密码、autoCommit 配置等,都会调用 forceCloseAll() 方法,将所有数据库连接关闭,同时也会将所有相应的 PooledConnection 对象都设置为无效,清空 activeConnections 集合 和 idleConnections 集合。应用系统之后通过 PooledDataSource.getConnection() 获取连接时,会按照新的配置重新创建新的数据库连接以及相应的 PooledConnection 对象。
public void forceCloseAll() {
synchronized (state) { // 同步上锁
// 线程池 hashCode
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
// 处理全部活跃数
for (int i = state.activeConnections.size(); i > 0; i--) {
try {
// 从 activeConnections 集合中获取 PooledConnection 对象
PooledConnection conn = state.activeConnections.remove(i - 1);
conn.invalidate();// 将 PooledConnection 对象设置为无效
// 获取真正的数据库连接对象
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {// 回滚未提交的事务
realConn.rollback();
}
realConn.close(); // 关闭真正的数据库连接
} catch (Exception e) {
// ignore
}
}
// 同样处理 idleConnections 集合
for (int i = state.idleConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.idleConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
}
if (log.isDebugEnabled()) {
log.debug("PooledDataSource forcefully closed/removed all connections.");
}
}
PooledDataSource
public class PooledDataSource implements DataSource {
private static final Log log = LogFactory.getLog(PooledDataSource.class);
// 通过 PoolState 管理连接池的状态并记录统计信息
private final PoolState state = new PoolState(this);
// 记录 UnpooledDataSource 对象,用于生成真实的数据库连接对象,构造函数中会初始化该字段
private final UnpooledDataSource dataSource;
// OPTIONAL CONFIGURATION FIELDS
// 最大活跃连接数
protected int poolMaximumActiveConnections = 10;
// 最大空闲连接数
protected int poolMaximumIdleConnections = 5;
// 最大 checkout 时长
protected int poolMaximumCheckoutTime = 20000;
// 在无法获取连接时,线程需妥等待的时间
protected int poolTimeToWait = 20000;
// 最大坏连接容忍数
protected int poolMaximumLocalBadConnectionTolerance = 3;
// 在检测一个数据库连接是否可用时,会给数据库发送一个测试 SQL 语句
protected String poolPingQuery = "NO PING QUERY SET";
// 是否允许发送测试 SQL 吾句
protected boolean poolPingEnabled;
// 当连接超 poolPingConnectionsNotUsedFor 毫秒未使用时,会发送一次测试 SQL 语句,检测连接是否正常
protected int poolPingConnectionsNotUsedFor;
// 根据数据库的 URL 用户名和密码生成的一个 hash 值,该哈希值用于标志着当前的连接池,在构造函数中初始化
private int expectedConnectionTypeCode;
// 创建一个 UnpooledDataSource 连接,用来包装成连接池中的连接
public PooledDataSource() {
dataSource = new UnpooledDataSource();
}
public PooledDataSource(UnpooledDataSource dataSource) {
this.dataSource = dataSource;
}
public PooledDataSource(String driver, String url, String username, String password) {
dataSource = new UnpooledDataSource(driver, url, username, password);
// 计算数据库连接池的 hashCode
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
public PooledDataSource(String driver, String url, Properties driverProperties) {
dataSource = new UnpooledDataSource(driver, url, driverProperties);
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
dataSource = new UnpooledDataSource(driverClassLoader, driver, url, username, password);
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
dataSource = new UnpooledDataSource(driverClassLoader, driver, url, driverProperties);
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
}
// 获取一个连接
/**
* PooledDataSource.getConnection() 方法首先会调用 PooledDataSource.popConnection() 方法获取 PooledConnection 对象,
* 然后通过 PooledConnection.getProxyConnection() 方法获取数据库连接的代理对象。
*/
@Override
public Connection getConnection() throws SQLException {
return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return popConnection(username, password).getProxyConnection();
}
@Override
public void setLoginTimeout(int loginTimeout) {
DriverManager.setLoginTimeout(loginTimeout);
}
@Override
public int getLoginTimeout() {
return DriverManager.getLoginTimeout();
}
@Override
public void setLogWriter(PrintWriter logWriter) {
DriverManager.setLogWriter(logWriter);
}
@Override
public PrintWriter getLogWriter() {
return DriverManager.getLogWriter();
}
public void setDriver(String driver) {
dataSource.setDriver(driver);
forceCloseAll();
}
public void setUrl(String url) {
dataSource.setUrl(url);
forceCloseAll();
}
public void setUsername(String username) {
dataSource.setUsername(username);
forceCloseAll();
}
public void setPassword(String password) {
dataSource.setPassword(password);
forceCloseAll();
}
public void setDefaultAutoCommit(boolean defaultAutoCommit) {
dataSource.setAutoCommit(defaultAutoCommit);
forceCloseAll();
}
public void setDefaultTransactionIsolationLevel(Integer defaultTransactionIsolationLevel) {
dataSource.setDefaultTransactionIsolationLevel(defaultTransactionIsolationLevel);
forceCloseAll();
}
public void setDriverProperties(Properties driverProps) {
dataSource.setDriverProperties(driverProps);
forceCloseAll();
}
/**
* Sets the default network timeout value to wait for the database operation to complete. See {@link Connection#setNetworkTimeout(java.util.concurrent.Executor, int)}
*
* @param milliseconds
* The time in milliseconds to wait for the database operation to complete.
* @since 3.5.2
*/
public void setDefaultNetworkTimeout(Integer milliseconds) {
dataSource.setDefaultNetworkTimeout(milliseconds);
forceCloseAll();
}
/**
* The maximum number of active connections.
*
* @param poolMaximumActiveConnections The maximum number of active connections
*/
public void setPoolMaximumActiveConnections(int poolMaximumActiveConnections) {
this.poolMaximumActiveConnections = poolMaximumActiveConnections;
forceCloseAll();
}
/**
* The maximum number of idle connections.
*
* @param poolMaximumIdleConnections The maximum number of idle connections
*/
public void setPoolMaximumIdleConnections(int poolMaximumIdleConnections) {
this.poolMaximumIdleConnections = poolMaximumIdleConnections;
forceCloseAll();
}
/**
* The maximum number of tolerance for bad connection happens in one thread
* which are applying for new {@link PooledConnection}.
*
* @param poolMaximumLocalBadConnectionTolerance
* max tolerance for bad connection happens in one thread
*
* @since 3.4.5
*/
public void setPoolMaximumLocalBadConnectionTolerance(
int poolMaximumLocalBadConnectionTolerance) {
this.poolMaximumLocalBadConnectionTolerance = poolMaximumLocalBadConnectionTolerance;
}
/**
* The maximum time a connection can be used before it *may* be
* given away again.
*
* @param poolMaximumCheckoutTime The maximum time
*/
public void setPoolMaximumCheckoutTime(int poolMaximumCheckoutTime) {
this.poolMaximumCheckoutTime = poolMaximumCheckoutTime;
forceCloseAll();
}
/**
* The time to wait before retrying to get a connection.
*
* @param poolTimeToWait The time to wait
*/
public void setPoolTimeToWait(int poolTimeToWait) {
this.poolTimeToWait = poolTimeToWait;
forceCloseAll();
}
/**
* The query to be used to check a connection.
*
* @param poolPingQuery The query
*/
public void setPoolPingQuery(String poolPingQuery) {
this.poolPingQuery = poolPingQuery;
forceCloseAll();
}
/**
* Determines if the ping query should be used.
*
* @param poolPingEnabled True if we need to check a connection before using it
*/
public void setPoolPingEnabled(boolean poolPingEnabled) {
this.poolPingEnabled = poolPingEnabled;
forceCloseAll();
}
/**
* If a connection has not been used in this many milliseconds, ping the
* database to make sure the connection is still good.
*
* @param milliseconds the number of milliseconds of inactivity that will trigger a ping
*/
public void setPoolPingConnectionsNotUsedFor(int milliseconds) {
this.poolPingConnectionsNotUsedFor = milliseconds;
forceCloseAll();
}
public String getDriver() {
return dataSource.getDriver();
}
public String getUrl() {
return dataSource.getUrl();
}
public String getUsername() {
return dataSource.getUsername();
}
public String getPassword() {
return dataSource.getPassword();
}
public boolean isAutoCommit() {
return dataSource.isAutoCommit();
}
public Integer getDefaultTransactionIsolationLevel() {
return dataSource.getDefaultTransactionIsolationLevel();
}
public Properties getDriverProperties() {
return dataSource.getDriverProperties();
}
/**
* @since 3.5.2
*/
public Integer getDefaultNetworkTimeout() {
return dataSource.getDefaultNetworkTimeout();
}
public int getPoolMaximumActiveConnections() {
return poolMaximumActiveConnections;
}
public int getPoolMaximumIdleConnections() {
return poolMaximumIdleConnections;
}
public int getPoolMaximumLocalBadConnectionTolerance() {
return poolMaximumLocalBadConnectionTolerance;
}
public int getPoolMaximumCheckoutTime() {
return poolMaximumCheckoutTime;
}
public int getPoolTimeToWait() {
return poolTimeToWait;
}
public String getPoolPingQuery() {
return poolPingQuery;
}
public boolean isPoolPingEnabled() {
return poolPingEnabled;
}
public int getPoolPingConnectionsNotUsedFor() {
return poolPingConnectionsNotUsedFor;
}
/**
* 当修改 PooledDataSource 的字段时,例如数据库 URL、用户名、密码、autoCommit 配置等,都会调用 forceCloseAll() 方法,将所有数据库连接关闭,
* 同时也会将所有相应的 PooledConnection 对象都设置为无效,清空 activeConnections 集合 和 idleConnections 集合。
* 应用系统之后通过 PooledDataSource.getConnection() 获取连接时,会按照新的配置重新创建新的数据库连接以及相应的 PooledConnection 对象。
* Closes all active and idle connections in the pool.
*/
public void forceCloseAll() {
synchronized (state) { // 同步上锁
// 线程池 hashCode
expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
// 处理全部活跃数
for (int i = state.activeConnections.size(); i > 0; i--) {
try {
// 从 activeConnections 集合中获取 PooledConnection 对象
PooledConnection conn = state.activeConnections.remove(i - 1);
conn.invalidate();// 将 PooledConnection 对象设置为无效
// 获取真正的数据库连接对象
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {// 回滚未提交的事务
realConn.rollback();
}
realConn.close(); // 关闭真正的数据库连接
} catch (Exception e) {
// ignore
}
}
// 同样处理 idleConnections 集合
for (int i = state.idleConnections.size(); i > 0; i--) {
try {
PooledConnection conn = state.idleConnections.remove(i - 1);
conn.invalidate();
Connection realConn = conn.getRealConnection();
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
realConn.close();
} catch (Exception e) {
// ignore
}
}
}
if (log.isDebugEnabled()) {
log.debug("PooledDataSource forcefully closed/removed all connections.");
}
}
public PoolState getPoolState() {
return state;
}
// 计算数据库连接池的 hashCode
private int assembleConnectionTypeCode(String url, String username, String password) {
return ("" + url + username + password).hashCode();
}
// 调用connect.close() 时,PooledConnection.invoke() 反射调用 close(),将 PooledConnection 对象归还给连接池,将 connect 从激活的 activeConnections 中移除
// 连接使用结束,关闭时,将连接重新放入连接池
protected void pushConnection(PooledConnection conn) throws SQLException {
synchronized (state) { // 同步上锁
// 将 connect 从激活的 activeConnections 中移除
state.activeConnections.remove(conn);
// 连接是否有效
if (conn.isValid()) {
// 检测空闲连接数是否已达到上限,以及 PooledConnection 是否为该连接池的连接
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 累积 checkout 时长
if (!conn.getRealConnection().getAutoCommit()) { // 回滚为提交事务
conn.getRealConnection().rollback();
}
//数据库 connection 重新包装为 PooledConnection 放入 idleConnections 中
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
conn.invalidate(); // conn 重置为无效
if (log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
state.notifyAll(); // 唤醒阻塞等待的线程
} else {
// 空闲连接数已达到上限 或 PooledConnection 对象并不属于该连接池
state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 累积 checkout 时长
if (!conn.getRealConnection().getAutoCommit()) { // 回滚事务
conn.getRealConnection().rollback();
}
conn.getRealConnection().close(); // 关闭真正的数据库连接
if (log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
conn.invalidate(); // PooledConnection 对象设置为元效
}
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
}
state.badConnectionCount++; // 统计元效 PooledConnection 对象个数
}
}
}
/**
* 从连接池中,获取一个 PooledConnection 对象。包含多种情况,1.PooledConnection 不存在,进行创建,2.存在直接获取 等
* @param username
* @param password
* @return
* @throws SQLException
*/
private PooledConnection popConnection(String username, String password) throws SQLException {
// 计数等待标志
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
// 本地方法无效连接数
int localBadConnectionCount = 0;
while (conn == null) {
synchronized (state) {// 同步操作
if (!state.idleConnections.isEmpty()) { // 检测空闲连接
// Pool has available connection
conn = state.idleConnections.remove(0); // 获取连接
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else {
// Pool does not have available connection
// 活跃数没有达到最大值,则可以创建连接
if (state.activeConnections.size() < poolMaximumActiveConnections) {
// Can create new connection
conn = new PooledConnection(dataSource.getConnection(), this);
if (log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
} else {
// 活跃连接数已达到最大值,则不能创建新连接
// 获取最先创建的活跃连接
// Cannot create new connection
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
// 连接已经被取出多长时间
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) { // 检测连接是否超时
// Can claim overdue connection
// 对超时连接进行统计
state.claimedOverdueConnectionCount++;
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
state.accumulatedCheckoutTime += longestCheckoutTime;
// 将超时连接移除 activeConnections 集合
state.activeConnections.remove(oldestActiveConnection);
// 如果超时连接未提交,则自动回滚
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException e) {
/*
Just log a message for debug and continue to execute the following
statement like nothing happened.
Wrap the bad connection with a new PooledConnection, this will help
to not interrupt current executing thread and give current thread a
chance to join the next competition for another valid/good database
connection. At the end of this loop, bad {@link @conn} will be set as null.
*/
log.debug("Bad connection. Could not roll back");
}
}
// 创建新 PooledConnection 对象,但是真正的数据库连接并未创建
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
// 使连接失效
oldestActiveConnection.invalidate();
if (log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
} else {
// 无空闲连接、无法创建新连接且无超时连接, 则只能阻塞等待
// Must wait
try {
if (!countedWait) {
state.hadToWaitCount++; //统计等待次数
countedWait = true;
}
if (log.isDebugEnabled()) {
log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait); // 阻塞等待
// 统计累计等待时间
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) {
// ping to server and check the connection is valid or not
if (conn.isValid()) { // 检测 PooledConnection 是否有效
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
// 配置 PooledConnection 的相关属性
// 设置连接所在的连接池
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
// 设置连接被取出的时间
conn.setCheckoutTimestamp(System.currentTimeMillis());
// 最后一次被使用的时间戳
conn.setLastUsedTimestamp(System.currentTimeMillis());
// 添加到活跃连接集合
state.activeConnections.add(conn);
// 请求数据库次数 +1
state.requestCount++;
// 获取连接的累积时间
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else {
if (log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
// 无效的连接数
state.badConnectionCount++;
// 本地方法无效连接数 +1
localBadConnectionCount++;
conn = null;
// 本地无效连接数 > 空闲连接数 + 最大坏连接容忍数
if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
/**
* 检测这个链接是否继续有效
* 执行测试 SQL
* Method to check to see if a connection is still usable
*
* @param conn - the connection to check
* @return True if the connection is still usable
*/
protected boolean pingConnection(PooledConnection conn) {
boolean result = true; // 记录 ping 操作是否成功
try {
result = !conn.getRealConnection().isClosed(); // 检测真正的数据库连接是否已经关闭
} catch (SQLException e) {
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
result = false;
}
if (result) {
if (poolPingEnabled) { // 检测 poolPingEnabled 设置,是否运行执行测试 SQL 语句
// 长时间(超过 poolPingConnectionsNotUsedFor 指定的时长)未使用的连接,才需要 ping 操作来检测数据库连接是否正常
if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
try {
if (log.isDebugEnabled()) {
log.debug("Testing connection " + conn.getRealHashCode() + " ...");
}
// 执行 测试 SQL 语句
Connection realConn = conn.getRealConnection();
try (Statement statement = realConn.createStatement()) {
statement.executeQuery(poolPingQuery).close();
}
if (!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
}
} catch (Exception e) {
log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
try {
conn.getRealConnection().close(); // 关闭真实 Connection 连接
} catch (Exception e2) {
//ignore
}
result = false;
if (log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
}
}
}
}
}
return result;
}
/**
* 解析 Connection 代理对象,返回 真正的 Connection 对象
* Unwraps a pooled connection to get to the 'real' connection
*
* @param conn - the pooled connection to unwrap
* @return The 'real' connection
*/
public static Connection unwrapConnection(Connection conn) {
if (Proxy.isProxyClass(conn.getClass())) {
InvocationHandler handler = Proxy.getInvocationHandler(conn);
if (handler instanceof PooledConnection) {
return ((PooledConnection) handler).getRealConnection();
}
}
return conn;
}
@Override
protected void finalize() throws Throwable {
forceCloseAll();
super.finalize();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new SQLException(getClass().getName() + " is not a wrapper.");
}
@Override
public boolean isWrapperFor(Class<?> iface) {
return false;
}
@Override
public Logger getParentLogger() {
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
}
}
DataSource 初始化
在 mybaits-config.xml 定义了 DataSource 节点
<environments default="development">
<environment id="development">
<transactionManager type="JDBC">
<property name="" value="" />
</transactionManager>
<dataSource type="UNPOOLED">
<property name="driver" value="org.hsqldb.jdbcDriver" />
<property name="url" value="jdbc:hsqldb:mem:ancestorref" />
<property name="username" value="sa" />
</dataSource>
</environment>
</environments>
在 XMLConfigBuilder 解析 mybaits-config.xml 文件,然后解析 dataSource 节点
private void environmentsElement(XNode context) throws Exception {
if (context != null) {
//未指定 XMLConfigBuilder environment 字段,则使用 default 性指定的 <environment>
if (environment == null) {
environment = context.getStringAttribute("default");
}
for (XNode child : context.getChildren()) {
String id = child.getStringAttribute("id");
if (isSpecifiedEnvironment(id)) {
TransactionFactory txFactory = transactionManagerElement(child.evalNode("transactionManager")); // 创建 TransactionFactory
DataSourceFactory dsFactory = dataSourceElement(child.evalNode("dataSource")); // 创建 DataSourceFactory
DataSource dataSource = dsFactory.getDataSource(); // 创建 DataSource
Environment.Builder environmentBuilder = new Environment.Builder(id)
.transactionFactory(txFactory)
.dataSource(dataSource);
configuration.setEnvironment(environmentBuilder.build()); // 创建 Environment,并反写 configuration 对象
}
}
}
}
/**
* 根据 配置文件中的节点,创建 DataSourceFactory 对象
* @param context
* @return
* @throws Exception
*/
private DataSourceFactory dataSourceElement(XNode context) throws Exception {
if (context != null) {
String type = context.getStringAttribute("type");
Properties props = context.getChildrenAsProperties();
DataSourceFactory factory = (DataSourceFactory) resolveClass(type).getDeclaredConstructor().newInstance();
factory.setProperties(props);
return factory;
}
throw new BuilderException("Environment declaration requires a DataSourceFactory.");
}