ShardingSphere-jdbc sql执行过程

2021-08-31  本文已影响0人  甜甜起司猫_

ShardingSphere-jdbc sql执行过程

执行过程

  1. 调用ShardingSphereDataSource的getConnection方法
  2. 在DriverStateContext中的STATE容器中拿到之前在构造ShardingSphereDataSource时已存入的OKDriverState
  3. 通过OKDriverState的getConnection方法构造ShardingSphereConnection
  4. 通过调用ShardingSphereConnection中的createStatement方法,构造ShardingSphereStatement
  5. 在ShardingSpherePreparedStatement的构造方法中,完成多个变量的初始化
    5.1 jdbcExecutor,本质是ShardingSphereDataSource中生成的executorEngine,存放在metaDataContexts
    5.2 ShardingSphereSQLParserEngine,sql解析引擎,根据不同的数据库类型
    5.3 BatchPreparedStatementExecutor,封装需要执行的sql单元JDBCExecutionUnit
  6. 调用jdbcExecutor中的execute方法将sql的执行封装成任务由executorEngine中的线程池调度执行
  7. 调用ListenableFuture的get方法获取sql的执行结果

方法解析

example模块例子中,调用以下sql

CREATE TABLE IF NOT EXISTS t_order (order_id BIGINT NOT NULL AUTO_INCREMENT, user_id INT NOT NULL, address_id BIGINT NOT NULL, status VARCHAR(50), PRIMARY KEY (order_id))

调用链如下:

    public Connection getConnection() {
        return DriverStateContext.getConnection(schemaName, getDataSourceMap(), contextManager, TransactionTypeHolder.get());
    }
    private static final Map<String, DriverState> STATES;
    
    static {
        // TODO add singleton cache with TypedSPI init
        ShardingSphereServiceLoader.register(DriverState.class);
        Collection<DriverState> driverStates = ShardingSphereServiceLoader.getSingletonServiceInstances(DriverState.class);
        STATES = new HashMap<>();
        for (DriverState each : driverStates) {
            STATES.put(each.getType(), each);
        }
    }

    public static Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
        return STATES.get(contextManager.getMetaDataContexts().getStateContext().getCurrentState()).getConnection(schemaName, dataSourceMap, contextManager, transactionType);
    }
public final class OKDriverState implements DriverState {
    
    @Override
    public Connection getConnection(final String schemaName, final Map<String, DataSource> dataSourceMap, final ContextManager contextManager, final TransactionType transactionType) {
        return new ShardingSphereConnection(schemaName, dataSourceMap, contextManager, TransactionTypeHolder.get());
    }
    
    @Override
    public String getType() {
        return "OK";
    }
}

DriverStateContext中的成员变量STATES,是在TypedSPI调用init的时候初始化,也就是在生成ShardingSphereDataSource的过程中,这里实际是从OKDriverState中getConnection,构建一个ShardingSphereConnection

    public PreparedStatement prepareStatement(final String sql) throws SQLException {
        return new ShardingSpherePreparedStatement(this, sql);
    }
    public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException {
        this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false);
    }
    private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql,
                                            final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys) throws SQLException {
        if (Strings.isNullOrEmpty(sql)) {
            throw new SQLException(SQLExceptionConstant.SQL_STRING_NULL_OR_EMPTY);
        }
        this.connection = connection;
        metaDataContexts = connection.getContextManager().getMetaDataContexts();
        this.sql = sql;
        statements = new ArrayList<>();
        parameterSets = new ArrayList<>();
        ShardingSphereSQLParserEngine sqlParserEngine = new ShardingSphereSQLParserEngine(
                DatabaseTypeRegistry.getTrunkDatabaseTypeName(metaDataContexts.getMetaData(connection.getSchemaName()).getResource().getDatabaseType()));
        sqlStatement = sqlParserEngine.parse(sql, true);
        parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement);
        statementOption = returnGeneratedKeys ? new StatementOption(true) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability);
        JDBCExecutor jdbcExecutor = new JDBCExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction());
        driverJDBCExecutor = new DriverJDBCExecutor(connection.getSchemaName(), metaDataContexts, jdbcExecutor);
        rawExecutor = new RawExecutor(metaDataContexts.getExecutorEngine(), connection.isHoldTransaction(), metaDataContexts.getProps());
        // TODO Consider FederateRawExecutor
        federateExecutor = new FederateJDBCExecutor(connection.getSchemaName(), metaDataContexts.getOptimizeContextFactory(), metaDataContexts.getProps(), jdbcExecutor);
        batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getSchemaName());
        kernelProcessor = new KernelProcessor();
    }

通过调用ShardingSphereConnectionprepareStatement方法,构造一个ShardingSpherePreparedStatement,在ShardingSpherePreparedStatement的构造方法中初始化各种成员变量,与后续sql的执行有关

    public <I, O> List<O> execute(final ExecutionGroupContext<I> executionGroupContext,
                                  final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback, final boolean serial) throws SQLException {
        if (executionGroupContext.getInputGroups().isEmpty()) {
            return Collections.emptyList();
        }
        return serial ? serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback)
                : parallelExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, callback);
    }
    private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> firstCallback, final ExecutorCallback<I, O> callback) throws SQLException {
        ExecutionGroup<I> firstInputs = executionGroups.next();
        Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncExecute(executionGroups, callback);
        return getGroupResults(syncExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures);
    }
    private <I, O> Collection<ListenableFuture<Collection<O>>> asyncExecute(final Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> callback) {
        Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>();
        while (executionGroups.hasNext()) {
            result.add(asyncExecute(executionGroups.next(), callback));
        }
        return result;
    }
    private <I, O> ListenableFuture<Collection<O>> asyncExecute(final ExecutionGroup<I> executionGroup, final ExecutorCallback<I, O> callback) {
        Map<String, Object> dataMap = ExecutorDataMap.getValue();
        return executorServiceManager.getExecutorService().submit(() -> callback.execute(executionGroup.getInputs(), false, dataMap));
    }
    private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException {
        List<O> result = new LinkedList<>(firstResults);
        for (ListenableFuture<Collection<O>> each : restFutures) {
            try {
                result.addAll(each.get());
            } catch (final InterruptedException | ExecutionException ex) {
                return throwException(ex);
            }
        }
        return result;
    }
  1. 把执行sql封装成线程池任务丢入线程池中调度执行
  2. 把执行sql任务的调用结果封装成future收集起来
  3. 通过future,.get获取sql执行结果
    private <T> List<T> doExecute(final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext, final Collection<RouteUnit> routeUnits,
                                  final JDBCExecutorCallback<T> callback, final SQLStatement sqlStatement) throws SQLException {
        List<T> results = jdbcExecutor.execute(executionGroupContext, callback);
        refreshMetadata(sqlStatement, routeUnits);
        return results;
    }
上一篇下一篇

猜你喜欢

热点阅读