Mybatis随笔

Mybatis随笔(七) Executor解析

2020-03-28  本文已影响0人  sunyelw

上篇我们说到 DefaultSqlSession 是通过 Executor 来完成SQL的执行与返回结果的封装,这里看下 Executor 的具体实现。


1、总体结构

Executor 是一个顶级接口,先看下其提供的接口方法

Executor-method

大致分为几类

再看继承关系


Executor-类图

还记得这个Executor是怎么创建的吗,看下源码

protected ExecutorType defaultExecutorType = ExecutorType.SIMPLE;
protected boolean cacheEnabled = true;

public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
    executorType = executorType == null ? defaultExecutorType : executorType;
    executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
    Executor executor;
    if (ExecutorType.BATCH == executorType) {
        executor = new BatchExecutor(this, transaction);
    } else if (ExecutorType.REUSE == executorType) {
        executor = new ReuseExecutor(this, transaction);
    } else {
        executor = new SimpleExecutor(this, transaction);
    }
    if (cacheEnabled) {
        executor = new CachingExecutor(executor);
    }
    executor = (Executor) interceptorChain.pluginAll(executor);
    return executor;
}

第一,根据ExecutorType来创建对应的Executor,ExecutorType可以通过如下方式指定

<settings>
    <!-- 默认 SIMPLE-->
    <!--SIMPLE SimpleExecutor-->
    <!--REUSE ReuseExecutor-->
    <!--BATCH BatchExecutor-->
    <setting name="defaultExecutorType" value="REUSE"/>
</settings>

这里支持的三种都是BaseExecutor的实现类,这是一种策略模式的使用。
第二,拿到了 Executor 后,通过cacheEnabled来判断是否需要套一层CachingExecutor,其中cacheEnabled默认为true,也就是说,二级缓存是默认开启的,可以通过如下设置cacheEnabled的值

<settings>
    <!-- 二级缓存 -->
    <setting name="cacheEnabled" value="false"/>
</settings>

对于CachingExecutor,是一种装饰器模式,是一种相较于继承的加强方式。

接下来看下这些具体实现。

2、BaseExecutor

public abstract class BaseExecutor implements Executor {...}

BaseExecutor是一个实现了Executor接口的抽象类,实现了所有接口方法

BaseExecutor-method

其自定义有四个方法没有具体实现

// 调用: udpate
protected abstract int doUpdate(MappedStatement ms, Object parameter) throws SQLException;
// 调用: flushStatements
protected abstract List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException;
// 调用: query -> queryFromDatabase
protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException;
// 调用: queryCursor
protected abstract <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException;

看下 doUpdate 的调用

@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    clearLocalCache();
    return doUpdate(ms, parameter);
}

就调用了两个重写方法

等于所有实现都是在doUpdate方法中,这种父类定好模板子类负责具体实现的模式就是模板模式,精髓在于步骤一致,但实现可以千差万别。

2.1 一级缓存

再看下 query 方法

@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    BoundSql boundSql = ms.getBoundSql(parameter);
    // 一级缓存key
    CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
    return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}

这里主要想讲下这个一级缓存,先看下这个 CacheKey

// 五个重要参数
private final int multiplier;
private int hashcode;
private long checksum;
private int count;
// 8/21/2017 - Sonarlint flags this as needing to be marked transient.  While true if content is not serializable, this is not always true and thus should not be marked transient.
private List<Object> updateList;

// 每添加一个对象进来,五个参数都要跟着变
public void update(Object object) {
    int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object);
    count++;
    checksum += baseHashCode;
    baseHashCode *= count;
    hashcode = multiplier * hashcode + baseHashCode;
    updateList.add(object);
}

// Cachekey相同的条件是五个参数都相同
@Override
public boolean equals(Object object) {
    if (this == object) {
        return true;
    }
    if (!(object instanceof CacheKey)) {
        return false;
    }

    final CacheKey cacheKey = (CacheKey) object;
    if (hashcode != cacheKey.hashcode) {
        return false;
    }
    if (checksum != cacheKey.checksum) {
        return false;
    }
    if (count != cacheKey.count) {
        return false;
    }
    for (int i = 0; i < updateList.size(); i++) {
        Object thisObject = updateList.get(i);
        Object thatObject = cacheKey.updateList.get(i);
        if (!ArrayUtil.equals(thisObject, thatObject)) {
            return false;
        }
    }
    return true;
}

继续看下一级缓存的键是如何创建的

@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    CacheKey cacheKey = new CacheKey();
    // 1.SQL方法唯一ID
    cacheKey.update(ms.getId());
    // 2.分页参数-初始偏移量
    cacheKey.update(rowBounds.getOffset());
    // 3.分页参数-查询条数
    cacheKey.update(rowBounds.getLimit());
    // 4.执行SQL 预编译那种带占位符?的
    cacheKey.update(boundSql.getSql());
    List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
    TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
    // mimic DefaultParameterHandler logic
    // 5.实际参数值
    for (ParameterMapping parameterMapping : parameterMappings) {
        if (parameterMapping.getMode() != ParameterMode.OUT) {
            Object value;
            String propertyName = parameterMapping.getProperty();
            if (boundSql.hasAdditionalParameter(propertyName)) {
                value = boundSql.getAdditionalParameter(propertyName);
            } else if (parameterObject == null) {
                value = null;
            } else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
                value = parameterObject;
            } else {
                MetaObject metaObject = configuration.newMetaObject(parameterObject);
                value = metaObject.getValue(propertyName);
            }
            cacheKey.update(value);
        }
    }
    // 6.环境,也就是数据库的编码,保证是同一个数据库
    if (configuration.getEnvironment() != null) {
        // issue #176
        cacheKey.update(configuration.getEnvironment().getId());
    }
    return cacheKey;
}

可以看到缓存key创建依赖六个值

这六个参数加一起可以唯一确定一条SQL,继续看下使用

@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
    if (closed) {
        throw new ExecutorException("Executor was closed.");
    }
    if (queryStack == 0 && ms.isFlushCacheRequired()) {
        clearLocalCache();
    }
    List<E> list;
    try {
        queryStack++;
        // 1.从缓存中获取
        list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
        if (list != null) {
            // 2.拿到了直接处理
            handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
        } else {
            // 3.没拿到就进行查询
            list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
        }
    } finally {
        queryStack--;
    }
    // 延迟加载
    if (queryStack == 0) {
        for (DeferredLoad deferredLoad : deferredLoads) {
            deferredLoad.load();
        }
        // issue #601
        deferredLoads.clear();
        if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
            // issue #482
            clearLocalCache();
        }
    }
    return list;
}

这里有一个好玩的就是延迟加载,后面有机会再说

原来一级缓存就是一个localCache

protected PerpetualCache localCache;

再瞅一眼PerpetualCache是个何方神圣

public class PerpetualCache implements Cache {

  private final String id;

  private final Map<Object, Object> cache = new HashMap<>();

  public PerpetualCache(String id) {
    this.id = id;
  }

  @Override
  public String getId() {
    return id;
  }

  @Override
  public int getSize() {
    return cache.size();
  }

  @Override
  public void putObject(Object key, Object value) {
    cache.put(key, value);
  }

  @Override
  public Object getObject(Object key) {
    return cache.get(key);
  }

  @Override
  public Object removeObject(Object key) {
    return cache.remove(key);
  }

  @Override
  public void clear() {
    cache.clear();
  }

  @Override
  public boolean equals(Object o) {
    if (getId() == null) {
      throw new CacheException("Cache instances require an ID.");
    }
    if (this == o) {
      return true;
    }
    if (!(o instanceof Cache)) {
      return false;
    }
    Cache otherCache = (Cache) o;
    return getId().equals(otherCache.getId());
  }

  @Override
  public int hashCode() {
    if (getId() == null) {
      throw new CacheException("Cache instances require an ID.");
    }
    return getId().hashCode();
  }
}
protected BaseExecutor(Configuration configuration, Transaction transaction) {
    this.transaction = transaction;
    this.deferredLoads = new ConcurrentLinkedQueue<>();
    // here
    // here
    // here
    this.localCache = new PerpetualCache("LocalCache");
    this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
    this.closed = false;
    this.configuration = configuration;
    this.wrapper = this;
}

然后如果没查到是queryFromDatabase方法,不出意料肯定会有塞的动作

private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    List<E> list;
    localCache.putObject(key, EXECUTION_PLACEHOLDER);
    try {
        // 1.查询结果
        list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
    } finally {
        localCache.removeObject(key);
    }
    // 2.反塞缓存
    localCache.putObject(key, list);
    if (ms.getStatementType() == StatementType.CALLABLE) {
        localOutputParameterCache.putObject(key, parameter);
    }
    return list;
}

再来想一个东西,这里的缓存都是通过Map来实现的,那么作为缓存Key对象有两个方法必须要重写,一个就是前面看过的 equals 方法,还有一个就是 hashCode 方法

class CacheKey
// 重写hashCode方法
@Override
public int hashCode() {
    return hashcode;
}

如果不重写 hashCode 方法,那么任何两个对象的 hashCode 值是不会相同的,也就导致了使用 Map 这种方式判断是否一致完全失效,因为永远不会相同。那么可不可以全部返回一个常量比如1,如下

@Override
public int hashCode() {
    return 1;
}

这种可以通过 Map 的判断是否同一个对象(取决于 equals 方法)的校验,但会导致一个现象:这个 Map 中数组只有一位有值,这个值是一个链表,当链表达到 64 位会变成一颗红黑树(1.8,链表长度过8且总量过64)

所以推荐的写法是,重写 equals 方法时就要一起重写 hashCode 方法,hashCode 方法的返回值不能是一个常量,最好跟随 equals 方法中判断的全部或几个属性的变化而变化。

这里CacheKey就是直接使用了一个属性作为 hashCode 返回,PerpetualCache 就是使用了其 id 的 hashCode 值返回。

有兴趣可以看下 HashMap 的源码实现

再提一点,一级缓存的存在可以显著优化重复查询的性能,但当数据有变更的时候就需要及时清理缓存以免造成脏读

@Override
public void clearLocalCache() {
    if (!closed) {
        localCache.clear();
        localOutputParameterCache.clear();
    }
}

看下调用时机

其他都好理解,就一个查询方法注解 @Flush 需要注意下

3.SimpleExector

从名字就知道这是最简单的一种Executor,拼接完SQL后直接执行并返回结果集,没有多余操作

SimpleExecutor-method

可以看到就是简单重写了BaseExecutor四个抽象方法再加上一个是处理SQL执行器的方法

简单看下 doQuery 的实现

@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
        Configuration configuration = ms.getConfiguration();
        StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
        stmt = prepareStatement(handler, ms.getStatementLog());
        return handler.query(stmt, resultHandler);
    } finally {
        closeStatement(stmt);
    }
}

其他更新操作类似上述步骤。

4.ReuseExector

顾名思义,这是一种可以重用的Executor

ReuseExecutor-method

相较于 SimpleExecutor,多了三个方法,这三个方法就是其可重用的秘密

// 缓存Map
private final Map<String, Statement> statementMap = new HashMap<>();
// 判断当前SQL是否存在缓存Statement
private boolean hasStatementFor(String sql) {
    try {
        Statement statement = statementMap.get(sql);
        return statement != null && !statement.getConnection().isClosed();
    } catch (SQLException e) {
        return false;
    }
}
// 从缓存中获取Statement
private Statement getStatement(String s) {
    return statementMap.get(s);
}
// 将Statement塞入缓存
private void putStatement(String sql, Statement stmt) {
    statementMap.put(sql, stmt);
}

就是用一个Map来存储使用的Statement,以节省每次重复处理Statement的时间,缺点就是会额外占用一部分内存,也是一种空间换时间的思想,具体处理在获取Statement方法prepareStatement

private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    BoundSql boundSql = handler.getBoundSql();
    String sql = boundSql.getSql();
    // 1.判断是否存在缓存
    if (hasStatementFor(sql)) {
        // 2.存在就从缓存中直接拿
        stmt = getStatement(sql);
        applyTransactionTimeout(stmt);
    } else {
        // 3.不存在就创建后塞入缓存
        Connection connection = getConnection(statementLog);
        stmt = handler.prepare(connection, transaction.getTimeout());
        putStatement(sql, stmt);
    }
    handler.parameterize(stmt);
    return stmt;
}

而清除的时候就要多一步对缓存的清理

statementMap.clear();

5.BatchExector

这是一个批量执行以优化执行性能的执行器

BatchExecutor-method

除了四个抽象方法之外多了一个 doFlushStatements 方法

分别来看下查询和更新方法,更新:

@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    final String sql = boundSql.getSql();
    final Statement stmt;
    // 1.是否与上一条执行的SQL为相同对象, 参数可以不同
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
        // 2.0 一致的话则为同一条SQL
        int last = statementList.size() - 1;
        // 2.1 获取 Statement
        stmt = statementList.get(last);
        // 2.2 设置超时时间
        applyTransactionTimeout(stmt);
        // 2.3 预编译
        handler.parameterize(stmt);//fix Issues 322
        BatchResult batchResult = batchResultList.get(last);
        // 2.4 设置参数
        batchResult.addParameterObject(parameterObject);
    } else {
        // 3.0 非同一个SQL, 则需要重新创建 Statement
        Connection connection = getConnection(ms.getStatementLog());
        // 3.1 创建 Statement
        stmt = handler.prepare(connection, transaction.getTimeout());
        // 3.2 预编译
        handler.parameterize(stmt);    //fix Issues 322
        // 3.3 设置批量执行器参数
        currentSql = sql;
        currentStatement = ms;
        statementList.add(stmt);
        // 3.3 设置SQL执行参数
        batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    // 4.加入批处理
    handler.batch(stmt);
    return BATCH_UPDATE_RETURN_VALUE;
}

大胆猜测一下执行语句会在哪,整个批量执行器除了四个抽象方法就只剩下一个方法了,不妨看下其实现

@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
        List<BatchResult> results = new ArrayList<>();
        // 1.是否回滚
        if (isRollback) {
            // 1.0 回滚
            return Collections.emptyList();
        }
        // 1.1 非回滚, 遍历已经添加入statementList的对象逐个执行批量的操作
        for (int i = 0, n = statementList.size(); i < n; i++) {
            Statement stmt = statementList.get(i);
            applyTransactionTimeout(stmt);
            BatchResult batchResult = batchResultList.get(i);
            try {
                // 2.批量执行并把影响行数放入结果集中
                batchResult.setUpdateCounts(stmt.executeBatch());
                MappedStatement ms = batchResult.getMappedStatement();
                List<Object> parameterObjects = batchResult.getParameterObjects();
                KeyGenerator keyGenerator = ms.getKeyGenerator();
                if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
                    Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
                    jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
                } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
                    for (Object parameter : parameterObjects) {
                        keyGenerator.processAfter(this, ms, stmt, parameter);
                    }
                }
                // 3.批量执行完成关闭
                // Close statement to close cursor #1109
                closeStatement(stmt);
            } catch (BatchUpdateException e) {
                // 4.发生异常处理异常信息
                StringBuilder message = new StringBuilder();
                message.append(batchResult.getMappedStatement().getId())
                .append(" (batch index #")
                .append(i + 1)
                .append(")")
                .append(" failed.");
                if (i > 0) {
                    message.append(" ")
                    .append(i)
                    .append(" prior sub executor(s) completed successfully, but will be rolled back.");
                }
                throw new BatchExecutorException(message.toString(), e, results, batchResult);
            }
            results.add(batchResult);
        }
        return results;
    } finally {
        // 5.确保资源关闭
        for (Statement stmt : statementList) {
            closeStatement(stmt);
        }
        // 6.属性清空方便GC或下一次批量
        currentSql = null;
        statementList.clear();
        batchResultList.clear();
    }
}

果然在其中看到了执行的语句

为什么是分开的?我们想下一般这种批量的代码是怎么写的,我都是这样写的:

SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(is);
SqlSession sqlSession = factory.openSession(ExecutorType.BATCH, true);
PreparedStatement ps = sqlSession.getConnection().prepareStatement("update account set name = 'java' where id = ?");
for (int i = 0; i < 100; i++) {
    ps.setInt(1, i);
    // 添加到批量处理
    ps.addBatch();
}
// 批量执行
ps.executeBatch();

或者

@Test
public void batchInsert() {
    String config = "resources/mybatis-config.xml";
    SqlSession sqlSession = null;
    try (InputStream is = Resources.getResourceAsStream(config)) {
        SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(is);
        sqlSession = factory.openSession(ExecutorType.BATCH, false);
        AccountMapper accountMapper = sqlSession.getMapper(AccountMapper.class);
        for (int i = 0; i < 100; i++) {
            accountMapper.insertSelective(new Account("name_" + i, i));
        }
        sqlSession.commit();
    } catch (Exception ignore) {
        assert sqlSession != null;
        sqlSession.rollback();
        log.error(ignore);
    } finally {
        if (null != sqlSession ) sqlSession.close();
    }
}

可见这种方式是由开发者自己控制多少条SQL执行一次,也合理。

flushStatements 的调用时机

更新方法包含了 新增/删除/更新 三种操作

再看下query 方法

@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)throws SQLException {
    Statement stmt = null;
    try {
        // 1.查询之前会先刷新一遍, 没执行的赶紧执行了
        flushStatements();
        Configuration configuration = ms.getConfiguration();
        StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
        Connection connection = getConnection(ms.getStatementLog());
        // 2.获取执行对象Statement
        stmt = handler.prepare(connection, transaction.getTimeout());
        // 3.预编译
        handler.parameterize(stmt);
        // 4.执行
        return handler.query(stmt, resultHandler);
    } finally {
        // 5.关闭
        closeStatement(stmt);
    }
}

这个流程总体上没问题,就一个点很值得思考,那就是第一步的刷新。

先不说为什么要刷新,而是加了这个刷新对批量执行器有什么影响吗?

举个栗子:
我们需要更新十条SQL,查询一条,有以下两种写法
1.不中断同类型的更新SQL

因为十条更新是一次批量,所以总共连接两次
2.中断同类型的更新SQL

因为每五条更新是一次批量,在执行查询时会自动把之前存下的更新SQL执行了,所以总共连接三次。

所以我们最好不要在批量更新语句中间夹杂着查询语句,或者说尽量把更新语句放在一起执行,这样效率更高。


下一篇预告 StatementHandler ~

上一篇 下一篇

猜你喜欢

热点阅读