fescar

Fescar - RM Executor执行过程介绍

2019-02-13  本文已影响6人  晴天哥_王志

开篇

 这篇文章的目的是介绍ExecuteTemplate中调用的Executor的执行过程。

Executor的类图

Executor.png

说明:

InsertExecutor执行过程

InsertExecutor.jpg

说明:

Executor源码介绍 - 执行过程

StatementProxy

public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {

    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
            @Override
            public ResultSet execute(Statement statement, Object... args) throws SQLException {
                return statement.executeQuery((String) args[0]);
            }
        }, sql);
    }
}

说明:

ExecuteTemplate

public class ExecuteTemplate {

    public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        return execute(null, statementProxy, statementCallback, args);
    }


    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {

        if (!RootContext.inGlobalTransaction()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }

        // sqlRecognizer 执行过程中传入值为null,走的分支代码
        if (sqlRecognizer == null) {

           //  SQLVisitorFactory.get是一个核心代码,会具体分析
            sqlRecognizer = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
            rs = executor.execute(args);

        } catch (Throwable ex) {
            if (ex instanceof SQLException) {
                throw (SQLException) ex;
            } else {
                // Turn everything into SQLException
                new SQLException(ex);
            }
        }
        return rs;
    }
}

说明:

BaseTransactionalExecutor

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor {

    protected StatementProxy<S> statementProxy;
    protected StatementCallback<T, S> statementCallback;
    protected SQLRecognizer sqlRecognizer;
    private TableMeta tableMeta;

    public BaseTransactionalExecutor(StatementProxy<S> statementProxy, 
                   StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        this.statementProxy = statementProxy;
        this.statementCallback = statementCallback;
        this.sqlRecognizer = sqlRecognizer;
    }

    @Override
    public Object execute(Object... args) throws Throwable {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
        return doExecute(args);
    }

    protected abstract Object doExecute(Object... args) throws Throwable;
}

说明:

AbstractDMLBaseExecutor

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDMLBaseExecutor.class);

    public AbstractDMLBaseExecutor(StatementProxy<S> statementProxy, 
       StatementCallback<T, S> statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }

    protected T executeAutoCommitFalse(Object[] args) throws Throwable {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        T result = null;
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        LockRetryController lockRetryController = new LockRetryController();
        try {
            connectionProxy.setAutoCommit(false);
            while (true) {
                try {
                    result = executeAutoCommitFalse(args);
                    connectionProxy.commit();
                    break;
                } catch (LockConflictException lockConflict) {
                    connectionProxy.getTargetConnection().rollback();
                    lockRetryController.sleep(lockConflict);
                }
            }

        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("exception occur", e);
            throw e;
        } finally {
            connectionProxy.setAutoCommit(true);
        }
        return result;
    }
}

说明:

InsertExecutor

public class InsertExecutor<T, S extends Statement> extends AbstractDMLBaseExecutor<T, S> {

    public InsertExecutor(StatementProxy statementProxy, 
        StatementCallback statementCallback, SQLRecognizer sqlRecognizer) {
        super(statementProxy, statementCallback, sqlRecognizer);
    }

    @Override
    protected TableRecords beforeImage() throws SQLException {
        return TableRecords.empty(getTableMeta());
    }

    @Override
    protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
        SQLInsertRecognizer recogizier = (SQLInsertRecognizer)sqlRecognizer;
        List<String> insertColumns = recogizier.getInsertColumns();
        TableMeta tmeta = getTableMeta();
        TableRecords afterImage = null;
        if (tmeta.containsPK(insertColumns)) {
            // insert values including PK
            List<Object> pkValues = null;
            String pk = tmeta.getPkName();
            for (int paramIdx = 0; paramIdx < insertColumns.size(); paramIdx++) {
                if (insertColumns.get(paramIdx).equalsIgnoreCase(pk)) {
                    if (statementProxy instanceof PreparedStatementProxy) {
                        pkValues = ((PreparedStatementProxy) statementProxy).getParamsByIndex(paramIdx);
                    } else {
                        List<List<Object>> insertRows = recogizier.getInsertRows();
                        pkValues = new ArrayList<>(insertRows.size());
                        for (List<Object> row : insertRows) {
                            pkValues.add(row.get(paramIdx));
                        }
                    }
                    break;
                }
            }
            if (pkValues == null) {
                throw new ShouldNeverHappenException();
            }
            afterImage = getTableRecords(pkValues);

        } else {
            // PK is just auto generated
            Map<String, ColumnMeta> pkMetaMap = getTableMeta().getPrimaryKeyMap();
            if (pkMetaMap.size() != 1) {
                throw new NotSupportYetException();
            }
            ColumnMeta pkMeta = pkMetaMap.values().iterator().next();
            if (!pkMeta.isAutoincrement()) {
                throw new ShouldNeverHappenException();
            }

            ResultSet genKeys = null;
            try {
                genKeys = statementProxy.getTargetStatement().getGeneratedKeys();
            } catch (SQLException e) {
                // java.sql.SQLException: Generated keys not requested. You need to
                // specify Statement.RETURN_GENERATED_KEYS to
                // Statement.executeUpdate() or Connection.prepareStatement().
                if ("S1009".equalsIgnoreCase(e.getSQLState())) {
                    genKeys = statementProxy.getTargetStatement().executeQuery("SELECT LAST_INSERT_ID()");
                } else {
                    throw e;
                }
            }
            List<Object> pkValues = new ArrayList<>();
            while (genKeys.next()) {
                Object v = genKeys.getObject(1);
                pkValues.add(v);
            }

            afterImage = getTableRecords(pkValues);

        }

        if (afterImage == null) {
            throw new SQLException("Failed to build after-image for insert");
        }

        return afterImage;
    }
}

说明:

期待

 该篇文章把Executor的执行过程讲解清楚了,后续针对Executor中涉及的通用功能代码进行介绍。

上一篇 下一篇

猜你喜欢

热点阅读