数据库中间件 Sharding-JDBC 源码分析 —— SQL
1. 概述
越过千山万水(SQL 解析、SQL 路由、SQL 改写),我们终于来到了 SQL 执行。开森不开森?!
2. ExecutorEngine
ExecutorEngine,SQL执行引擎。
分表分库,需要执行的 SQL 数量从单条变成了多条,此时有两种方式执行:
- 串行执行 SQL
- 并行执行 SQL
前者,编码容易,性能较差,总耗时是多条 SQL 执行时间累加。
后者,编码复杂,性能较好,总耗时约等于执行时间最长的 SQL。
ExecutorEngine 当然采用的是后者,并行执行 SQL。
2.1 ListeningExecutorService
Guava( Java 工具库 ) 提供的继承自 ExecutorService 的线程服务接口,提供创建 ListenableFuture 功能。ListenableFuture 接口,继承 Future 接口,有如下好处:
我们强烈地建议你在代码中多使用 ListenableFuture 来代替 JDK 的 Future, 因为:
1. 大多数 Futures 方法中需要它。
2. 转到 ListenableFuture 编程比较容易。
3. Guava 提供的通用公共类封装了公共的操作方方法,不需要提供 Future 和 ListenableFuture 的扩展方法。
传统 JDK中 的 Future 通过异步的方式计算返回结果:在多线程运算中可能在没有结束就返回结果。
ListenableFuture 可以允许你注册回调方法(callbacks),在运算(多线程执行)完成的时候进行调用。这样简单的改进,使得可以明显的支持更多的操作,这样的功能在 JDK concurrent 中的 Future 是不支持的。
下文我们看 Sharding-JDBC 是如何通过 ListenableFuture 简化并发编程的。
先看 ExecutorEngine 如何初始化 ListeningExecutorService:
public final class ExecutorEngine implements AutoCloseable {
private final ListeningExecutorService executorService;
public ExecutorEngine(final int executorSize) {
executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(
executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build()));
MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS);
}
...
}
- 一个分片数据源( ShardingDataSource ) 独占 一个 SQL执行引擎( ExecutorEngine )。
-
MoreExecutors#listeningDecorator()
创建 ListeningExecutorService,这样 #submit(), #invokeAll() 可以返回 ListenableFuture。 - 默认情况下,线程池大小为 8。可以根据实际业务需要,设置 ShardingProperties 进行调整。
-
setNameFormat()
并发编程时,一定要对线程名字做下定义,这样排查问题会方便很多。 -
MoreExecutors#addDelayedShutdownHook()
,应用关闭时,等待所有任务全部完成再关闭。默认配置等待时间为 60 秒,建议将等待时间做成可配的。
2.2 关闭
数据源关闭时,会调用 ExecutorEngine 也进行关闭。
// ShardingDataSource.java
@Override
public void close() {
executorEngine.close();
}
// ExecutorEngine
@Override
public void close() {
executorService.shutdownNow();
try {
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (final InterruptedException ignored) {
}
if (!executorService.isTerminated()) {
throw new ShardingJdbcException("ExecutorEngine can not been terminated");
}
}
- shutdownNow() 尝试使用 Thread.interrupt() 打断正在执行中的任务,未执行的任务不再执行。
- awaitTermination() 因为 #shutdownNow() 打断不是立即结束,需要一个过程,因此这里等待了 5 秒。
- 等待 5 秒后,线程池不一定已经关闭,此时抛出异常给上层。建议打印下日志,记录出现这个情况。
2.3 执行 SQL 任务
ExecutorEngine 对外暴露executeStatement()
, executePreparedStatement()
, executeBatch()
三个方法分别提供给 StatementExecutor、PreparedStatementExecutor、BatchPreparedStatementExecutor 调用。而这三个方法,内部调用的都是execute()
私有方法。
// ExecutorEngine.java
private <T> List<T> execute(
final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits,
final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws SQLException {
if (baseStatementUnits.isEmpty()) {
return Collections.emptyList();
}
OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size());
// 发布执行之前事件
EventBusInstance.getInstance().post(event);
Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();
BaseStatementUnit firstInput = iterator.next();
// 第二个任务开始所有 SQL任务 提交线程池【异步】执行任务
ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);
T firstOutput;
List<T> restOutputs;
try {
// 第一个任务【同步】执行任务
firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);
// 等待第二个任务开始所有 SQL任务完成
restOutputs = restFutures.get();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
event.setException(ex);
event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
// 发布执行失败事件
EventBusInstance.getInstance().post(event);
ExecutorExceptionHandler.handleException(ex);
return null;
}
event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
// 发布执行成功事件
EventBusInstance.getInstance().post(event);
// 返回结果
List<T> result = Lists.newLinkedList(restOutputs);
result.add(0, firstOutput);
return result;
}
第一个任务【同步】调用executeInternal()
执行任务。
private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws Exception {
// 【同步】执行任务
return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap());
}
第二个开始的任务提交线程池异步调用executeInternal()
执行任务。
private <T> ListenableFuture<List<T>> asyncExecute(
final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) {
List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size());
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
for (final BaseStatementUnit each : baseStatementUnits) {
// 提交线程池【异步】执行任务
result.add(executorService.submit(new Callable<T>() {
@Override
public T call() throws Exception {
return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap);
}
}));
}
// 返回 ListenableFuture
return Futures.allAsList(result);
}
我们注意下Futures.allAsList(result)
和 restOutputs=restFutures.get()
。神器 Guava 简化并发编程的好处就提现出来了。 ListenableFuture#get()
当所有任务都成功时,返回所有任务执行结果;当任何一个任务失败时,马上抛出异常,无需等待其他任务执行完成。
为什么会分同步执行和异步执行呢?猜测,当 SQL 执行是单表时,只要进行第一个任务的同步调用,性能更加优秀。
// ExecutorEngine.java
private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback,
final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {
synchronized (baseStatementUnit.getStatement().getConnection()) {
T result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
List<AbstractExecutionEvent> events = new LinkedList<>();
if (parameterSets.isEmpty()) {
// 生成 Event
events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));
}
for (List<Object> each : parameterSets) {
events.add(getExecutionEvent(sqlType, baseStatementUnit, each));
}
// EventBus 发布 EventExecutionType.BEFORE_EXECUTE
for (AbstractExecutionEvent event : events) {
EventBusInstance.getInstance().post(event);
}
try {
// 执行回调函数
result = executeCallback.execute(baseStatementUnit);
} catch (final SQLException ex) {
// EventBus 发布 EventExecutionType.EXECUTE_FAILURE
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);
each.setException(ex);
EventBusInstance.getInstance().post(each);
ExecutorExceptionHandler.handleException(ex);
}
return null;
}
// EventBus 发布 EventExecutionType.EXECUTE_SUCCESS
for (AbstractExecutionEvent each : events) {
each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);
EventBusInstance.getInstance().post(each);
}
return result;
}
}
result=executeCallback.execute(baseStatementUnit)
执行回调函数。StatementExecutor,PreparedStatementExecutor,BatchPreparedStatementExecutor 通过传递执行回调函数( ExecuteCallback )实现给 ExecutorEngine 实现并行执行。
public interface ExecuteCallback<T> {
/**
* 执行任务.
*
* @param baseStatementUnit 语句对象执行单元
* @return 处理结果
* @throws Exception 执行期异常
*/
T execute(BaseStatementUnit baseStatementUnit) throws Exception;
}
synchronized(baseStatementUnit.getStatement().getConnection())
,这里加锁的原因是,虽然 MySQL、Oracle 的 Connection 实现是线程安全的。但是数据库连接池实现的 Connection 不一定是线程安全,例如 Druid 的线程池 Connection 非线程安全。
3. Executor
Executor,执行器,目前一共有三个执行器。不同的执行器对应不同的执行单元 (BaseStatementUnit)。
执行器类 | 执行器名 | 执行单元 |
---|---|---|
StatementExecutor | 静态语句对象执行单元 | StatementUnit |
PreparedStatementExecutor | 预编译语句对象请求的执行器 | PreparedStatementUnit |
BatchPreparedStatementExecutor | 批量预编译语句对象请求的执行器 | BatchPreparedStatementUnit |
3.1 StatementExecutor
StatementExecutor,多线程执行静态语句对象请求的执行器,一共有三类方法:
- executeQuery() 执行 SQL 查询
public List<ResultSet> executeQuery() throws SQLException {
return executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() {
@Override
public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql());
}
});
}
- executeUpdate() 执行 SQL 更新
public int executeUpdate() throws SQLException {
return executeUpdate(new Updater() {
@Override
public int executeUpdate(final Statement statement, final String sql) throws SQLException {
return statement.executeUpdate(sql);
}
});
}
- execute() 执行 SQL
public boolean execute() throws SQLException {
return execute(new Executor() {
@Override
public boolean execute(final Statement statement, final String sql) throws SQLException {
return statement.execute(sql);
}
});
}
3.2 PreparedStatementExecutor
PreparedStatementExecutor,多线程执行预编译语句对象请求的执行器。比 StatementExecutor 多了parameters
参数,方法逻辑上基本一致,就不重复分享啦。
3.3 BatchPreparedStatementExecutor
BatchPreparedStatementExecutor,多线程执行批量预编译语句对象请求的执行器。
// BatchPreparedStatementExecutor.java
public final class BatchPreparedStatementExecutor {
private final ExecutorEngine executorEngine;
private final DatabaseType dbType;
private final SQLType sqlType;
private final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits;
private final List<List<Object>> parameterSets;
/**
* Execute batch.
*
* @return execute results
* @throws SQLException SQL exception
*/
public int[] executeBatch() throws SQLException {
return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() {
@Override
public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception {
return baseStatementUnit.getStatement().executeBatch();
}
}));
}
// 计算每个语句的更新数量
private int[] accumulate(final List<int[]> results) {
int[] result = new int[parameterSets.size()];
int count = 0;
// 每个语句按照顺序,读取到其对应的每个分片SQL影响的行数进行累加
for (BatchPreparedStatementUnit each : batchPreparedStatementUnits) {
for (Map.Entry<Integer, Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()) {
int value = null == results.get(count) ? 0 : results.get(count)[entry.getValue()];
if (DatabaseType.Oracle == dbType) {
result[entry.getKey()] = value;
} else {
result[entry.getKey()] += value;
}
}
count++;
}
return result;
}
}
眼尖的同学会发现,为什么有 BatchPreparedStatementExecutor,而没有 BatchStatementExecutor 呢?目前 Sharding-JDBC 不支持 Statement 批量操作,只能进行 PreparedStatement 的批操作。
4. ExecutionEvent
AbstractExecutionEvent,SQL 执行事件抽象接口。
public abstract class AbstractExecutionEvent {
// 事件编号
@Getter
private final String id = UUID.randomUUID().toString();
// 事件类型
@Getter
@Setter
private EventExecutionType eventExecutionType = EventExecutionType.BEFORE_EXECUTE;
@Setter
private Exception exception;
public Optional<? extends Exception> getException() {
return Optional.fromNullable(exception);
}
}
AbstractExecutionEvent 的子类关系图为:
- DMLExecutionEvent:DML类 SQL 执行时事件
- DQLExecutionEvent:DQL类 SQL 执行时事件
EventExecutionType,事件触发类型。
- BEFORE_EXECUTE:执行前
- EXECUTE_SUCCESS:执行成功
- EXECUTE_FAILURE:执行失败
4.1 EventBus
那究竟有什么用途呢? Sharding-JDBC 使用 Guava(没错,又是它)的 EventBus 实现了事件的发布和订阅。从上文 ExecutorEngine#executeInternal()
我们可以看到每个分片 SQL 执行的过程中会发布相应事件:
- 执行 SQL 前:发布类型类型为 BEFORE_EXECUTE 的事件
- 执行 SQL 成功:发布类型类型为 EXECUTE_SUCCESS 的事件
- 执行 SQL 失败:发布类型类型为 EXECUTE_FAILURE 的事件
怎么订阅事件呢(目前 Sharding-JDBC 是没有订阅这些事件的,只是提供了事件发布订阅的功能而已)?非常简单,例子如下:
EventBusInstance.getInstance().register(new Runnable() {
@Override
public void run() {
}
@Subscribe // 订阅
@AllowConcurrentEvents // 是否允许并发执行,即线程安全
public void listen(final DMLExecutionEvent event) { // DMLExecutionEvent
System.out.println("DMLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
}
@Subscribe // 订阅
@AllowConcurrentEvents // 是否允许并发执行,即线程安全
public void listen2(final DQLExecutionEvent event) { //DQLExecutionEvent
System.out.println("DQLExecutionEvent:" + event.getSql() + "\t" + event.getEventExecutionType());
}
});
- register() 任何类都可以,并非一定需要使用 Runnable 类。此处例子单纯因为方便
- @Subscribe 注解在方法上,实现对事件的订阅
- @AllowConcurrentEvents 注解在方法上,表示线程安全,允许并发执行
- 方法上的参数对应的类即是订阅的事件。例如, #listen() 订阅了 DMLExecutionEvent 事件
- EventBus#post() 发布事件,同步调用订阅逻辑
5. 结语
SQL 执行完毕之后,执行结果封装在ResultSet
对象中,如:
Statement stmt =con.createStatement( ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_UPDATABLE);
ResultSet rs = stmt.executeQuery("SELECT a, b FROM TABLE2");
多个 SQL 执行结果就会有多个ResultSet
,必然需要进行合并。下一篇文章我们将探讨 SQL 结果归并,敬请关注~