Mybatis源码之路

CachingExecutor

2020-09-11  本文已影响0人  93张先生

CachingExecutor

CachingExecutor 是一个 Executor 接口的装饰器,它为 Executor 对象增加了二级缓存的相关功能。
它封装了一个用于执行数据库操作的 Executor 对象,以及一个用于管理缓存的 TransactionalCacheManager 对象。
TransactionalCache 和 TransactionalCacheManager 是 CachingExecutor 依赖的两个组件。

组件以及二级缓存关系

image.png
public class CachingExecutor implements Executor {

  private final Executor delegate;
  private final TransactionalCacheManager tcm = new TransactionalCacheManager();

  public CachingExecutor(Executor delegate) {
    this.delegate = delegate;
    delegate.setExecutorWrapper(this);
  }

  @Override
  public Transaction getTransaction() {
    return delegate.getTransaction();
  }

  @Override
  public void close(boolean forceRollback) {
    try {
      //issues #499, #524 and #573
      if (forceRollback) {
        tcm.rollback();
      } else {
        tcm.commit();
      }
    } finally {
      delegate.close(forceRollback);
    }
  }

  @Override
  public boolean isClosed() {
    return delegate.isClosed();
  }

  @Override
  public int update(MappedStatement ms, Object parameterObject) throws SQLException {
    flushCacheIfRequired(ms);
    return delegate.update(ms, parameterObject);
  }

  /**
   * 查询过程
   * (1)获取 BoundSql 对象,创建查询语句对应的 CacheKey 对象
   * (2)检测是否开启了二级缓存,如果没有开启二级缓存,则直接调用底层 Executor 对象的 query() 方法查询数据库。如果开启了二级缓存,则继续后面的步骤
   * (3)检测查询操作是否包含输出类型的参数,如果是这种情况,则报错
   * (4)调用 TransactionalCacheManager.getObject()方法查询二级缓存,如果二级缓存中查找到相应的结果对象,则直接将该结果对象返回。
   * (5)如果二级缓存没有相应的结果对象,则调用底层 Executor 对象的 query() 方法,正如前面介绍的 ,它会先查询一级缓存,一级缓存未命中时,才会查询数据库。
   * 最后还会将得到的结果对象放入 TransactionalCache.entriesToAddOnCommit 集合中保存。
   *
   * @param ms
   * @param parameterObject
   * @param rowBounds
   * @param resultHandler
   * @param <E>
   * @return
   * @throws SQLException
   */

  @Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    // 步骤1:获取 BoundSql 对象,解析 BoundSql
    BoundSql boundSql = ms.getBoundSql(parameterObject);
    // 创建 CacheKey 对象
    CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
    return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  }

  @Override
  public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
    flushCacheIfRequired(ms);
    return delegate.queryCursor(ms, parameter, rowBounds);
  }

  @Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
      throws SQLException {
    // 获取查询语句所在命名空间对应的二级缓存
    Cache cache = ms.getCache();
    // 步骤2:是否开启了二级缓存功能
    if (cache != null) {
      flushCacheIfRequired(ms);   // 根据 <select> 节点的配置,决定是否需要清空二级缓存
      // 检测 SQL 节点的 useCache 配置以及是否使用了 resultHandler 配置
      if (ms.isUseCache() && resultHandler == null) {
        //步骤3: 二级缓存不能保存输出类型的参数 如果查询操作调用了包含输出参数的存储过程,则报错
        ensureNoOutParams(ms, boundSql);
        // 步骤4:查询二级缓存
        @SuppressWarnings("unchecked")
        List<E> list = (List<E>) tcm.getObject(cache, key);
        if (list == null) {
          // 步骤5:二级缓存没用相应的结果对象,调用封装的 Executor 对象的 query() 方法,这个 query() 方法会先查询一级缓存
          list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
          // 将查询结果保存到 TransactionalCache.entriesToAddOnCommit 集合中
          tcm.putObject(cache, key, list); // issue #578 and #116
        }
        return list;
      }
    }
    // 没有启动二级缓存,直接调用底层 Executor 执行数据数据库查询操作
    return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  }

  /**
   * 调用底层 Executor 的 flushStatements() 方法
   * @return
   * @throws SQLException
   */
  @Override
  public List<BatchResult> flushStatements() throws SQLException {
    return delegate.flushStatements();
  }

  /**
   * 首先调用底层 Executor 对象对应的方法完成事务的提交,然后再调用 TransactionalCacheManager 的对应方法完成对二级缓存的相应操作
   * @param required
   * @throws SQLException
   */
  @Override
  public void commit(boolean required) throws SQLException {
    delegate.commit(required);  // 调用底层的 Executor 提交事务
    tcm.commit();   //遍历所有相关的 TransactionalCache 对象执行 commit() 方法
  }
  /**
   * 首先调用底层 Executor 对象对应的方法完成事务的回滚,然后再调用 TransactionalCacheManager 的对应方法完成对二级缓存的相应操作
   * @param required
   * @throws SQLException
   */
  @Override
  public void rollback(boolean required) throws SQLException {
    try {
      delegate.rollback(required); // 调用底层的 Executor 回滚事务
    } finally {
      if (required) {
        tcm.rollback(); //遍历所有相关的 TransactionalCache 对象执行 rollback() 方法
      }
    }
  }

  /**
   * 二级缓存不能保存输出类型的参数 如果查询操作调用了包含输出参数的存储过程,则报错
   * @param ms
   * @param boundSql
   */
  private void ensureNoOutParams(MappedStatement ms, BoundSql boundSql) {
    if (ms.getStatementType() == StatementType.CALLABLE) {
      for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
        if (parameterMapping.getMode() != ParameterMode.IN) {
          throw new ExecutorException("Caching stored procedures with OUT params is not supported.  Please configure useCache=false in " + ms.getId() + " statement.");
        }
      }
    }
  }

  @Override
  public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
  }

  @Override
  public boolean isCached(MappedStatement ms, CacheKey key) {
    return delegate.isCached(ms, key);
  }

  @Override
  public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
    delegate.deferLoad(ms, resultObject, property, key, targetType);
  }

  /**
   * 清空一级缓存
   */
  @Override
  public void clearLocalCache() {
    delegate.clearLocalCache();
  }

  /**
   * 是否要求刷新 一级和二级缓存
   * @param ms
   */
  private void flushCacheIfRequired(MappedStatement ms) {
    // 获取 二级缓存
    Cache cache = ms.getCache();
    if (cache != null && ms.isFlushCacheRequired()) {
      tcm.clear(cache);
    }
  }

  @Override
  public void setExecutorWrapper(Executor executor) {
    throw new UnsupportedOperationException("This method should not be called");
  }

}

TransactionalCacheManager

TransactionalCacheManager 用于管理 CachingExecutor 使用的二级缓存对象,其中只定义了一个 transactionalCaches,它的 key 是对应的 CachingExecutor 使用的二级缓存对象,value 是相应的 TransactionalCache 对象,在该 TransactionalCache 中封装了对应的二级缓存对象,也就是这里的 key。

public class TransactionalCacheManager {

  private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();

  /**
   * 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
   * @param cache
   */
  public void clear(Cache cache) {
    getTransactionalCache(cache).clear();
  }

  /**
   * 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
   * @param cache
   */
  public Object getObject(Cache cache, CacheKey key) {
    return getTransactionalCache(cache).getObject(key);
  }

  /**
   * 调用指定二级缓存对应的 TransactionalCache 对象对应的方法
   * @param cache
   * @param key
   * @param value
   */
  public void putObject(Cache cache, CacheKey key, Object value) {
    getTransactionalCache(cache).putObject(key, value);
  }

  /**
   * 遍历 transactionalCaches 集合,调用 TransactionalCache 相应的方法
   */
  public void commit() {
    for (TransactionalCache txCache : transactionalCaches.values()) {
      txCache.commit();
    }
  }
  /**
   * 遍历 transactionalCaches 集合,调用 TransactionalCache 相应的方法
   */
  public void rollback() {
    for (TransactionalCache txCache : transactionalCaches.values()) {
      txCache.rollback();
    }
  }

  /**
   * 如果集合中不包含 TransactionalCache,则创建一个,并放入 transactionalCaches 中
   * @param cache
   * @return
   */
  private TransactionalCache getTransactionalCache(Cache cache) {
    return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
  }

}

TransactionalCache

TransactionalCache 主要用于保存 某个 SqlSession 的 某个事务中需要向 某个二级缓存中添加的缓存 的数据。
名字为 TransactionalCache 主要是针对解决了二级缓存中一些事务的问题,所以起名字为 TransactionalCache。

The 2nd level cache transactional buffer.
This class holds all cache entries that are to be added to the 2nd level cache during a Session.

public class TransactionalCache implements Cache {

  private static final Log log = LogFactory.getLog(TransactionalCache.class);
  // 底层封装的二级缓存所对应的 Cache 对象,以存储 namespace 为单位的 Cache 对象,默认为 PerpetualCache
  private final Cache delegate;
  // 当改字段为 true 时,则表示当前 TransactionalCache 不可查询,且提交事务时会将底层 Cache 清空
  private boolean clearOnCommit;
  // 暂时记录添加到 TransactionalCache 中的数据。在事务提交时,会将其中的数据添加到二级缓存中
  private final Map<Object, Object> entriesToAddOnCommit;
  // 记录缓存未命中的 CacheKey 对象
  private final Set<Object> entriesMissedInCache;

  public TransactionalCache(Cache delegate) {
    this.delegate = delegate;
    this.clearOnCommit = false;
    this.entriesToAddOnCommit = new HashMap<>();
    this.entriesMissedInCache = new HashSet<>();
  }

  @Override
  public String getId() {
    return delegate.getId();
  }

  @Override
  public int getSize() {
    return delegate.getSize();
  }

  /**
   * 它首先会查询底层的二级缓存,并将为命中的 key 记录到 entriesMissedInCache,之后根据 clearOnCommit 字段的值决定具体的返回值
   * @param key The key
   * @return
   */
  @Override
  public Object getObject(Object key) {
    // issue #116
    // 查询底层的 Cache 是否包含了指定的 key
    Object object = delegate.getObject(key);
    // 如果底层缓存对象中不包含改缓存项,则将该 key 记录到 entriesMissedInCache 集合中
    if (object == null) {
      entriesMissedInCache.add(key);
    }
    // issue #146
    if (clearOnCommit) {
      return null;
    } else {
      // 返回层底层 Cache 中查询到的对象
      return object;
    }
  }

  /**
   * 该方法并没有直接将结果对象记录到其封装的二级缓存中,而是暂时保存在 entriesToAddOnCommit 集合中,
   * 在事务提交时才会将这些结果对象从 entriesToAddOnCommit 集合 添加到二级缓存中。
   * @param key Can be any object but usually it is a {@link CacheKey}
   * @param object
   */
  @Override
  public void putObject(Object key, Object object) {
    entriesToAddOnCommit.put(key, object);
  }

  @Override
  public Object removeObject(Object key) {
    return null;
  }

  /**
   * 清空 entriesToAddOnCommit 集合,并设置 clearOnCommit 为 true
   */
  @Override
  public void clear() {
    clearOnCommit = true;
    entriesToAddOnCommit.clear();
  }

  /**
   * 会根据 clearOnCommit 字段的值决定是否清空二级缓存,然后调用 flushPendingEntries() 方法将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
   */
  public void commit() {
    // 在事务提交前,清空二级缓存
    if (clearOnCommit) {
      delegate.clear();
    }
    // 将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
    flushPendingEntries();
    // 重置 clearOnCommit 值,清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
    reset();
  }

  /**
   * 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除,并清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
   */
  public void rollback() {
    // 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除
    unlockMissedEntries();
    // 清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
    reset();
  }

  /**
   * 重置 clearOnCommit 值,清空 entriesToAddOnCommit 和 entriesMissedInCache 集合
   */
  private void reset() {
    clearOnCommit = false;
    entriesToAddOnCommit.clear();
    entriesMissedInCache.clear();
  }

  /**
   * 将 entriesToAddOnCommit 集合中记录的结果对象保存到二级缓存中
   */
  private void flushPendingEntries() {
    for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
      delegate.putObject(entry.getKey(), entry.getValue());
    }
    // 遍历 entriesMissedInCache 集合,缓存为命中的,且 entriesToAddOnCommit 集合中不包含的缓存项, 添加到二级缓冲中
    for (Object entry : entriesMissedInCache) {
      if (!entriesToAddOnCommit.containsKey(entry)) {
        delegate.putObject(entry, null);
      }
    }
  }

  /**
   * 将 entriesMissedInCache 集合中记录的缓存项从二级缓存中删除
   */
  private void unlockMissedEntries() {
    for (Object entry : entriesMissedInCache) {
      try {
        delegate.removeObject(entry);
      } catch (Exception e) {
        log.warn("Unexpected exception while notifiying a rollback to the cache adapter."
            + "Consider upgrading your cache adapter to the latest version.  Cause: " + e);
      }
    }
  }

}
线程安全

不同的 CachingExecutor 对象由不同的线程操作,二级缓存会不会存在线程不安全的问题呢?

CacheBuilder.build() 方法,其中回调用 setStandardDecorators() 方法,为 PerpetualCache 类型的 Cache 对象添加 SynchronizedCache 装饰器,从而保证了二级线程安全。

事务提交

为什么要在事务提交时才将 TransactionalCache. entriesToAddOnCommit 集合中缓存的数据写入到二级缓存,而不是像一级缓存那样,将每次查询结果都直接写入二级缓存呢?

这是为了防止出现“脏读” 情况 最终实现的效果有点类似于“不可重复读”的事务隔离级别。假设当前数据库的隔离级别是“不可重复读”,先
后开启 Tl、T2 两个事务,如图 3- 52 示,在事务 Tl 添加了记录A, 之后查询A记录,最后提交事务,事务T2 会查询记录A。 如果事务 Tl 查询记录A时,就将A对应的结果对象放人二级缓存,则在事务 T2 第一次查询记录A时即可从二级缓存中直接获取其对应的结果对象。此时 Tl 仍然未提交,这就出现了"脏读"的情况,显然不是用户期的结果。


image.png

按照 CacheExecutor 的本身实现,事务T1查询记录A时二级缓存未命中,会查询数据库,因为是同一事务,所以可以查询到记录A 并得到相应的结果对象,并且会将记录 A 保存到 TransactionalCache .entriesToAddOnCommit 集合中。而事务T2第一次查询记录A时,二级缓存未命中,则会访问数据库,因为是不同个事务,数据库的“不可重复读”隔离级别会保证事务T2无法查询到记录A,这样就避免了上面的“脏读”的场景。在图 3-52 中,事务T1提交时会将 entriesToAddOnCommit 集合中的数据添加到二级缓存中,所以事务T2第二次查询记录A时,二级缓存才会命中,这就导致了同一事务中多次读取的结果不一致,也就是 “不可重复读”的场景。

上一篇 下一篇

猜你喜欢

热点阅读