Mybatis随笔(七) Executor解析
上篇我们说到 DefaultSqlSession 是通过 Executor 来完成SQL的执行与返回结果的封装,这里看下 Executor 的具体实现。
1、总体结构
Executor 是一个顶级接口,先看下其提供的接口方法

大致分为几类
- SQL具体执行(查询、更新、提交、回滚、刷新、清除缓存)
- 判断某些状态(是否关闭、是否缓存等)
- 辅助或加强(获取事务、创建缓存键等)
再看继承关系

- BaseExecutor 抽象类,
- ReuseExector
- CloseExector <ResultLoaderMap.<private>>
- BatchExector
- SimpleExector
- CachingExector
还记得这个Executor是怎么创建的吗,看下源码
protected ExecutorType defaultExecutorType = ExecutorType.SIMPLE;
protected boolean cacheEnabled = true;
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
第一,根据ExecutorType来创建对应的Executor,ExecutorType可以通过如下方式指定
<settings>
<!-- 默认 SIMPLE-->
<!--SIMPLE SimpleExecutor-->
<!--REUSE ReuseExecutor-->
<!--BATCH BatchExecutor-->
<setting name="defaultExecutorType" value="REUSE"/>
</settings>
这里支持的三种都是BaseExecutor的实现类,这是一种策略模式
的使用。
第二,拿到了 Executor 后,通过cacheEnabled来判断是否需要套一层
CachingExecutor,其中cacheEnabled默认为true,也就是说,二级缓存是默认开启的,可以通过如下设置cacheEnabled的值
<settings>
<!-- 二级缓存 -->
<setting name="cacheEnabled" value="false"/>
</settings>
对于CachingExecutor,是一种装饰器模式
,是一种相较于继承的加强方式。
接下来看下这些具体实现。
2、BaseExecutor
public abstract class BaseExecutor implements Executor {...}
BaseExecutor是一个实现了Executor接口的抽象类,实现了所有接口方法

其自定义有四个方法没有具体实现
// 调用: udpate
protected abstract int doUpdate(MappedStatement ms, Object parameter) throws SQLException;
// 调用: flushStatements
protected abstract List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException;
// 调用: query -> queryFromDatabase
protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException;
// 调用: queryCursor
protected abstract <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException;
看下 doUpdate 的调用
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
clearLocalCache();
return doUpdate(ms, parameter);
}
就调用了两个重写方法
- clearLocalCache
- doUpdate
等于所有实现都是在doUpdate方法中,这种父类定好模板子类负责具体实现的模式就是模板模式
,精髓在于步骤一致,但实现可以千差万别。
2.1 一级缓存
再看下 query 方法
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
// 一级缓存key
CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
这里主要想讲下这个一级缓存,先看下这个 CacheKey
- 五个属性
// 五个重要参数
private final int multiplier;
private int hashcode;
private long checksum;
private int count;
// 8/21/2017 - Sonarlint flags this as needing to be marked transient. While true if content is not serializable, this is not always true and thus should not be marked transient.
private List<Object> updateList;
// 每添加一个对象进来,五个参数都要跟着变
public void update(Object object) {
int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object);
count++;
checksum += baseHashCode;
baseHashCode *= count;
hashcode = multiplier * hashcode + baseHashCode;
updateList.add(object);
}
// Cachekey相同的条件是五个参数都相同
@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (!(object instanceof CacheKey)) {
return false;
}
final CacheKey cacheKey = (CacheKey) object;
if (hashcode != cacheKey.hashcode) {
return false;
}
if (checksum != cacheKey.checksum) {
return false;
}
if (count != cacheKey.count) {
return false;
}
for (int i = 0; i < updateList.size(); i++) {
Object thisObject = updateList.get(i);
Object thatObject = cacheKey.updateList.get(i);
if (!ArrayUtil.equals(thisObject, thatObject)) {
return false;
}
}
return true;
}
- CacheKey总共有五个属性
- CacheKey每增加一个对象,其内部五个属性都会发生变化
- 判断两个CacheKey相同的条件就是这个五个属性必须完全一致
继续看下一级缓存的键是如何创建的
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
CacheKey cacheKey = new CacheKey();
// 1.SQL方法唯一ID
cacheKey.update(ms.getId());
// 2.分页参数-初始偏移量
cacheKey.update(rowBounds.getOffset());
// 3.分页参数-查询条数
cacheKey.update(rowBounds.getLimit());
// 4.执行SQL 预编译那种带占位符?的
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
// 5.实际参数值
for (ParameterMapping parameterMapping : parameterMappings) {
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
// 6.环境,也就是数据库的编码,保证是同一个数据库
if (configuration.getEnvironment() != null) {
// issue #176
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}
可以看到缓存key创建依赖六个值
- SQL方法唯一ID
- 分页参数-初始偏移量
- 分页参数-查询条数
- 执行SQL 预编译那种带占位付?的
- 实际参数值
- 环境编码,也就是数据库的编码,保证是同一个数据库
这六个参数加一起可以唯一确定一条SQL,继续看下使用
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
// 1.从缓存中获取
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
// 2.拿到了直接处理
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
// 3.没拿到就进行查询
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
// 延迟加载
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
这里有一个好玩的就是延迟加载,后面有机会再说
原来一级缓存就是一个localCache
protected PerpetualCache localCache;
再瞅一眼PerpetualCache是个何方神圣
public class PerpetualCache implements Cache {
private final String id;
private final Map<Object, Object> cache = new HashMap<>();
public PerpetualCache(String id) {
this.id = id;
}
@Override
public String getId() {
return id;
}
@Override
public int getSize() {
return cache.size();
}
@Override
public void putObject(Object key, Object value) {
cache.put(key, value);
}
@Override
public Object getObject(Object key) {
return cache.get(key);
}
@Override
public Object removeObject(Object key) {
return cache.remove(key);
}
@Override
public void clear() {
cache.clear();
}
@Override
public boolean equals(Object o) {
if (getId() == null) {
throw new CacheException("Cache instances require an ID.");
}
if (this == o) {
return true;
}
if (!(o instanceof Cache)) {
return false;
}
Cache otherCache = (Cache) o;
return getId().equals(otherCache.getId());
}
@Override
public int hashCode() {
if (getId() == null) {
throw new CacheException("Cache instances require an ID.");
}
return getId().hashCode();
}
}
- PerpetualCache 实现 Cache 接口来实现缓存功能
- PerpetualCache 内部维护一个Map<Object, Object>对象 cache
- PerpetualCache 内部维护一个String类型的id,这个ID仅用于标识缓存类别,比如 localCache 就是
LocalCache
,是在构造方法中完成初始化
protected BaseExecutor(Configuration configuration, Transaction transaction) {
this.transaction = transaction;
this.deferredLoads = new ConcurrentLinkedQueue<>();
// here
// here
// here
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
this.configuration = configuration;
this.wrapper = this;
}
- 缓存的取/塞/删 都是对这个对象的操作
然后如果没查到是queryFromDatabase方法,不出意料肯定会有塞的动作
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
// 1.查询结果
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
// 2.反塞缓存
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
- 查询结果
- 反塞缓存
再来想一个东西,这里的缓存都是通过Map来实现的,那么作为缓存Key对象有两个方法必须要重写,一个就是前面看过的 equals 方法,还有一个就是 hashCode 方法
class CacheKey
// 重写hashCode方法
@Override
public int hashCode() {
return hashcode;
}
如果不重写 hashCode 方法,那么任何两个对象的 hashCode 值是不会相同的,也就导致了使用 Map 这种方式判断是否一致完全失效,因为永远不会相同。那么可不可以全部返回一个常量比如1,如下
@Override
public int hashCode() {
return 1;
}
这种可以通过 Map 的判断是否同一个对象(取决于 equals 方法)的校验,但会导致一个现象:这个 Map 中数组只有一位有值,这个值是一个链表,当链表达到 64 位会变成一颗红黑树(1.8,链表长度过8且总量过64)
所以推荐的写法是,重写 equals 方法时就要一起重写 hashCode 方法,hashCode 方法的返回值不能是一个常量,最好跟随 equals 方法中判断的全部或几个属性的变化而变化。
这里CacheKey就是直接使用了一个属性作为 hashCode 返回,PerpetualCache 就是使用了其 id 的 hashCode 值返回。
有兴趣可以看下 HashMap 的源码实现
再提一点,一级缓存的存在可以显著优化重复查询的性能,但当数据有变更的时候就需要及时清理缓存以免造成脏读
@Override
public void clearLocalCache() {
if (!closed) {
localCache.clear();
localOutputParameterCache.clear();
}
}
看下调用时机
- 更新前
- 提交前
- 回滚前
- 延迟加载
- 查询前(方法注解 @Flush 强制不走缓存)
其他都好理解,就一个查询方法注解 @Flush 需要注意下
3.SimpleExector
从名字就知道这是最简单的一种Executor,拼接完SQL后直接执行并返回结果集,没有多余操作

可以看到就是简单重写了BaseExecutor四个抽象方法再加上一个是处理SQL执行器的方法
简单看下 doQuery 的实现
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
return handler.query(stmt, resultHandler);
} finally {
closeStatement(stmt);
}
}
- 获取配置类
- 创建StatementHandler
- 执行SQL
其他更新操作类似上述步骤。
4.ReuseExector
顾名思义,这是一种可以重用的Executor

相较于 SimpleExecutor,多了三个方法,这三个方法就是其可重用的秘密
// 缓存Map
private final Map<String, Statement> statementMap = new HashMap<>();
// 判断当前SQL是否存在缓存Statement
private boolean hasStatementFor(String sql) {
try {
Statement statement = statementMap.get(sql);
return statement != null && !statement.getConnection().isClosed();
} catch (SQLException e) {
return false;
}
}
// 从缓存中获取Statement
private Statement getStatement(String s) {
return statementMap.get(s);
}
// 将Statement塞入缓存
private void putStatement(String sql, Statement stmt) {
statementMap.put(sql, stmt);
}
就是用一个Map来存储使用的Statement,以节省每次重复处理Statement的时间,缺点就是会额外占用一部分内存,也是一种空间换时间的思想,具体处理在获取Statement方法prepareStatement
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
BoundSql boundSql = handler.getBoundSql();
String sql = boundSql.getSql();
// 1.判断是否存在缓存
if (hasStatementFor(sql)) {
// 2.存在就从缓存中直接拿
stmt = getStatement(sql);
applyTransactionTimeout(stmt);
} else {
// 3.不存在就创建后塞入缓存
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
putStatement(sql, stmt);
}
handler.parameterize(stmt);
return stmt;
}
- 缓存存在就从缓存中直接拿
- 缓存不存在就创建后塞入缓存
而清除的时候就要多一步对缓存的清理
statementMap.clear();
5.BatchExector
这是一个批量执行以优化执行性能的执行器

除了四个抽象方法之外多了一个 doFlushStatements 方法
分别来看下查询和更新方法,更新:
@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
final Configuration configuration = ms.getConfiguration();
final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
final BoundSql boundSql = handler.getBoundSql();
final String sql = boundSql.getSql();
final Statement stmt;
// 1.是否与上一条执行的SQL为相同对象, 参数可以不同
if (sql.equals(currentSql) && ms.equals(currentStatement)) {
// 2.0 一致的话则为同一条SQL
int last = statementList.size() - 1;
// 2.1 获取 Statement
stmt = statementList.get(last);
// 2.2 设置超时时间
applyTransactionTimeout(stmt);
// 2.3 预编译
handler.parameterize(stmt);//fix Issues 322
BatchResult batchResult = batchResultList.get(last);
// 2.4 设置参数
batchResult.addParameterObject(parameterObject);
} else {
// 3.0 非同一个SQL, 则需要重新创建 Statement
Connection connection = getConnection(ms.getStatementLog());
// 3.1 创建 Statement
stmt = handler.prepare(connection, transaction.getTimeout());
// 3.2 预编译
handler.parameterize(stmt); //fix Issues 322
// 3.3 设置批量执行器参数
currentSql = sql;
currentStatement = ms;
statementList.add(stmt);
// 3.3 设置SQL执行参数
batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
// 4.加入批处理
handler.batch(stmt);
return BATCH_UPDATE_RETURN_VALUE;
}
- 判断与上一条执行对象是否为同一个SQL,是则添加打到批处理,不是则新建执行对象Statement
- 这里并没有看到执行的语句
大胆猜测一下执行语句会在哪,整个批量执行器除了四个抽象方法就只剩下一个方法了,不妨看下其实现
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
try {
List<BatchResult> results = new ArrayList<>();
// 1.是否回滚
if (isRollback) {
// 1.0 回滚
return Collections.emptyList();
}
// 1.1 非回滚, 遍历已经添加入statementList的对象逐个执行批量的操作
for (int i = 0, n = statementList.size(); i < n; i++) {
Statement stmt = statementList.get(i);
applyTransactionTimeout(stmt);
BatchResult batchResult = batchResultList.get(i);
try {
// 2.批量执行并把影响行数放入结果集中
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
List<Object> parameterObjects = batchResult.getParameterObjects();
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
for (Object parameter : parameterObjects) {
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}
// 3.批量执行完成关闭
// Close statement to close cursor #1109
closeStatement(stmt);
} catch (BatchUpdateException e) {
// 4.发生异常处理异常信息
StringBuilder message = new StringBuilder();
message.append(batchResult.getMappedStatement().getId())
.append(" (batch index #")
.append(i + 1)
.append(")")
.append(" failed.");
if (i > 0) {
message.append(" ")
.append(i)
.append(" prior sub executor(s) completed successfully, but will be rolled back.");
}
throw new BatchExecutorException(message.toString(), e, results, batchResult);
}
results.add(batchResult);
}
return results;
} finally {
// 5.确保资源关闭
for (Statement stmt : statementList) {
closeStatement(stmt);
}
// 6.属性清空方便GC或下一次批量
currentSql = null;
statementList.clear();
batchResultList.clear();
}
}
果然在其中看到了执行的语句
为什么是分开的?我们想下一般这种批量的代码是怎么写的,我都是这样写的:
SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(is);
SqlSession sqlSession = factory.openSession(ExecutorType.BATCH, true);
PreparedStatement ps = sqlSession.getConnection().prepareStatement("update account set name = 'java' where id = ?");
for (int i = 0; i < 100; i++) {
ps.setInt(1, i);
// 添加到批量处理
ps.addBatch();
}
// 批量执行
ps.executeBatch();
或者
@Test
public void batchInsert() {
String config = "resources/mybatis-config.xml";
SqlSession sqlSession = null;
try (InputStream is = Resources.getResourceAsStream(config)) {
SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(is);
sqlSession = factory.openSession(ExecutorType.BATCH, false);
AccountMapper accountMapper = sqlSession.getMapper(AccountMapper.class);
for (int i = 0; i < 100; i++) {
accountMapper.insertSelective(new Account("name_" + i, i));
}
sqlSession.commit();
} catch (Exception ignore) {
assert sqlSession != null;
sqlSession.rollback();
log.error(ignore);
} finally {
if (null != sqlSession ) sqlSession.close();
}
}
可见这种方式是由开发者自己控制多少条SQL执行一次,也合理。
flushStatements 的调用时机
- 查询
- 回滚
- 提交(不过批量执行的实现就是返回一个空集合)
更新方法包含了 新增/删除/更新 三种操作
再看下query 方法
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)throws SQLException {
Statement stmt = null;
try {
// 1.查询之前会先刷新一遍, 没执行的赶紧执行了
flushStatements();
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
Connection connection = getConnection(ms.getStatementLog());
// 2.获取执行对象Statement
stmt = handler.prepare(connection, transaction.getTimeout());
// 3.预编译
handler.parameterize(stmt);
// 4.执行
return handler.query(stmt, resultHandler);
} finally {
// 5.关闭
closeStatement(stmt);
}
}
- 查询之前会先刷新一遍, 没执行的赶紧执行了
- 获取执行对象Statement
- 预编译
- 执行
- 关闭
这个流程总体上没问题,就一个点很值得思考,那就是第一步的刷新。
先不说为什么要刷新,而是加了这个刷新对批量执行器有什么影响吗?
举个栗子:
我们需要更新十条SQL,查询一条,有以下两种写法
1.不中断同类型的更新SQL
- 先执行十条更新SQL
- 再执行一条查询
因为十条更新是一次批量,所以总共连接两次
2.中断同类型的更新SQL
- 先执行五条更新SQL
- 再执行一条查询
- 先执行五条更新SQL
因为每五条更新是一次批量,在执行查询时会自动把之前存下的更新SQL执行了,所以总共连接三次。
所以我们最好不要在批量更新语句中间夹杂着查询语句,或者说尽量把更新语句放在一起执行,这样效率更高。
下一篇预告 StatementHandler ~