Mybatis

MyBatis原理(二)——执行器Executor

2021-11-30  本文已影响0人  Lnstark

一、 执行器的分类

Executor类图.png

Mybatis里的执行器主要是一个接口Executor,一个抽象类BaseExecutor,以及3个实现类:

Executor提供了查询、更新(包含增删改)、提交、回滚、获取事务、关闭等方法。
抽象类BaseExecutor里实现了获取连接、一级缓存。
如果开了二级缓存,那么SqlSession会用CachingExecutor来处理二级缓存的逻辑,它里面包含一个BaseExecutor delegate,来处理正常SQL逻辑。即装饰者模式。

二、 缓存

我们知道myBatis里有一级缓存和二级缓存,一级缓存是会话级的,存在内存里,无法跨线程使用,二级缓存是应用级的,是可以跨线程的,可以存在内存里,也可以存在硬盘,或者第三方集成。开了二级缓存的话,会先使用二级缓存,再使用一级。

@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
  ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
  if (closed) {
    throw new ExecutorException("Executor was closed.");
  }
  // 如果查询栈是0且设置了每次查询前清空缓存,那么这里会执行清理一级缓存
  if (queryStack == 0 && ms.isFlushCacheRequired()) {
    clearLocalCache();
  }
  List<E> list;
  try {
    queryStack++;
    // 从缓存中拿数据
    list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
    if (list != null) {
      // 处理存储过程的出参逻辑
      handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
    } else {
      // 如果缓存没数据,那就去查库,并且将结果放到缓存里
      list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
    }
  } finally {
    queryStack--;
  }
  if (queryStack == 0) {
    for (DeferredLoad deferredLoad : deferredLoads) {
      deferredLoad.load();
    }
    deferredLoads.clear();
    // 如果localCacheScope设置为statement的话,那么每次查询完都会清空一级缓存
    if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
      clearLocalCache();
    }
  }
  return list;
}

我们可以看到1级缓存默认是存在的,且无法关闭,但我们可以设置localCacheScope为statement来变相的禁用1级缓存。
一级缓存命中条件:

运行时参数

  1. 同一个会话
  2. SQL语句、参数相同
  3. 相同的statementID
  4. RowBounds相同

操作相关

  1. 未手动清空缓存
  2. 未配置flushCache=true
  3. 未执行update语句
  4. 缓存作用域不是statement

二级缓存的设计牵扯到很多缓存相关的特点,比如溢出淘汰、过期清理、线程安全、序列化等。这么多功能如何设计呢?MyBatis把每个功能写成一个组件类,然后用装饰者加责任链的模式,将各个组件进行串联。在执行缓存的基本功能时,其它的缓存逻辑会沿着这个责任链依次往下传递。


二级缓存架构.png

如何启用?
我们看Configuration的newExecutor方法

public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
  executorType = executorType == null ? defaultExecutorType : executorType;
  executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
  Executor executor;
  if (ExecutorType.BATCH == executorType) {
    executor = new BatchExecutor(this, transaction);
  } else if (ExecutorType.REUSE == executorType) {
    executor = new ReuseExecutor(this, transaction);
  } else {
    executor = new SimpleExecutor(this, transaction);
  }
  if (cacheEnabled) {
    executor = new CachingExecutor(executor);
  }
  executor = (Executor) interceptorChain.pluginAll(executor);
  return executor;
}

可以看到设置cacheEnabled字段可以控制SqlSession的执行器是BaseExecutor还是CachingExecutor,cacheEnabled默认为true。开了cacheEnabled然后还要指定缓存空间,要么mapper.xml里加<cache/>或<cache-ref/>元素,要么mapper.java上加@CacheNamespace或@CacheNamespaceRef注解。一个mapper对应一个缓存空间。

如何用自定义缓存?
二级缓存的默认实现是PerpetualCache(其实一级缓存也是),里面就是简单的用HashMap来做。我们要实现自定义缓存的话,先创建一个类如RedisCache实现Cache接口,然后配置
<cache type="packagename.RedisCache">或者@CacheNamespace(implementation = RedisCache.class)就可以了。

命中条件
二级缓存的命中场景与一级缓存类似,不同点在于他不用在一个会话内,必须提交之后才生效。为什么呢?比如两个会话在修改同一数据,当会话二修改后,再将其查询出来,假如它实时填充到二级缓存,而会话一就能获取修改之后的数据,但实际上修改的数据回滚了,并没真正的提交到数据库。类似数据库的脏读。

为何二级缓存能够会话间共享?
我们先了解下缓存过程,首先看具体的CachingExecutor的query方法

@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
    throws SQLException {
  // 先从MappedStatement里取出二级缓存,这里的cache就是mapper对应的全局缓存空间
  Cache cache = ms.getCache();
  // 如果有的话
  if (cache != null) {
    // 如果设置了每次查询前清空缓存,那么这里会执行清理二级缓存
    flushCacheIfRequired(ms);
    // 如果开启了缓存并且自定义结果处理器为空,那么继续走缓存逻辑
    if (ms.isUseCache() && resultHandler == null) {
      ensureNoOutParams(ms, boundSql);
      @SuppressWarnings("unchecked")
      // 从缓存事务管理器里取出真正的缓存数据
      List<E> list = (List<E>) tcm.getObject(cache, key);
      // 如果没有的话那么执行查询
      if (list == null) {
        list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
        tcm.putObject(cache, key, list); // issue #578 and #116
      }
      return list;
    }
  }
  // 如果MappedStatement没有缓存的话就直接查询
  return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

可以看到缓存由事务缓存管理器TransactionalCacheManager(简称tcm)来管理。

public class TransactionalCacheManager {

  private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();

  // 根据全局缓存空间拿到暂存空间,并清楚暂存数据
  public void clear(Cache cache) {
    getTransactionalCache(cache).clear();
  }

  // 根据全局缓存空间拿到暂存空间,然后取出数据
  public Object getObject(Cache cache, CacheKey key) {
    return getTransactionalCache(cache).getObject(key);
  }

  // 根据全局缓存空间拿到暂存空间,然后存进去数据
  public void putObject(Cache cache, CacheKey key, Object value) {
    getTransactionalCache(cache).putObject(key, value);
  }

  // 会话提交时执行CachingExecutor的commit,它里面执行tcm的这个方法
  // TransactionalCache会把暂存缓存刷到全局缓存空间
  public void commit() {
    for (TransactionalCache txCache : transactionalCaches.values()) {
      txCache.commit();
    }
  }

  // ...

  // 根据全局缓存空间拿到TransactionalCache
  // 没有的话创建一个TransactionalCache(delegate为全局缓存空间),它里面有暂存空间
  private TransactionalCache getTransactionalCache(Cache cache) {
    return transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
  }

}

它里面包含了一个Map<Cache, TransactionalCache>,key其实就是全局缓存空间。TransactionalCache就是我们上面提到的传递链模式中的一环

它里面有一个Cache delegate(即下一环,这里其实就是全局缓存空间),一个Map<Object, Object> entriesToAddOnCommit,即暂存空间,等会话提交或者关闭时,这里的数据会flush到delegate里,即全局缓存空间里,也就是二级缓存。

public class TransactionalCache implements Cache {
  private static final Log log = LogFactory.getLog(TransactionalCache.class);

  // 下一环,现在这里就是全局缓存空间
  private final Cache delegate;
  private boolean clearOnCommit;
  // 暂存空间
  private final Map<Object, Object> entriesToAddOnCommit;
  private final Set<Object> entriesMissedInCache;

  // ...

  // tcm的commit会执行这个方法
  public void commit() {
    if (clearOnCommit) {
      delegate.clear();
    }
    flushPendingEntries();
    reset();
  }

  // ...
  // 将暂存空间刷到delegate里
  private void flushPendingEntries() {
    for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
      delegate.putObject(entry.getKey(), entry.getValue());
    }
    for (Object entry : entriesMissedInCache) {
      if (!entriesToAddOnCommit.containsKey(entry)) {
        delegate.putObject(entry, null);
      }
    }
  }
}

大致结构如下图:

二级缓存结构及原理.png
何时清空二级缓存?
mapper里的SQL语句有个叫flushCache的配置,他对应MappedStatement的flushCacheRequired字段,表示是否清空缓存。查询语句默认false,增删改语句默认true。上面可以看到在query方法里有一行flushCacheIfRequired(ms),如果查询开启了这个字段,那么每次查询前都会清空缓存。更新的时候也会调用flushCacheIfRequired方法。
private void flushCacheIfRequired(MappedStatement ms) {
  Cache cache = ms.getCache();
  // 如果缓存不为空,且开启了flushCache,那就清空tcm里对应的暂存区
  if (cache != null && ms.isFlushCacheRequired()) {
    tcm.clear(cache);
  }
}

再来看tcm里的clear:

public void clear(Cache cache) {
  // 获取TransactionalCache然后clear
  getTransactionalCache(cache).clear();
}

再看TransactionalCache的clear方法:

public void clear() {
  clearOnCommit = true;
  entriesToAddOnCommit.clear();
}

它设置clearOnCommit 为true,然后清空暂存区。等到提交的时候会将全局缓存空间里的内容清除掉。

public void commit() {
  // 上面将clearOnCommit 设置为true,这里就清除delegate的缓存了
  if (clearOnCommit) {
    delegate.clear();
  }
  flushPendingEntries();
  // reset方法再将clearOnCommit设置为false
  reset();
}

另外有一个细节,在查询二级缓存的时候,假如clearOnCommit为true,那么就不返回数据,因为该缓存即将被清除。

public Object getObject(Object key) {
  // issue #116
  Object object = delegate.getObject(key);
  if (object == null) {
    entriesMissedInCache.add(key);
  }
  // issue #146
  // clearOnCommit为true说明提交后要清除缓存,直接返回null
  if (clearOnCommit) {
    return null;
  } else {
    return object;
  }
}

参考资料

B站——MyBatis源码解析大合集
源码阅读网
深入浅出mybatis之缓存机制

上一篇下一篇

猜你喜欢

热点阅读