程序员

浅析MySQL的批量提交的优化

2018-12-22  本文已影响3人  ElseF

公众号请关注ElseF,ElseF(Else Figure),是一个专注于「分享」国内外高质量的科技类文章和新闻的自媒体。维护者主要来自国内外大型互联网公司和创业公司,主要专注于与互联网相关的精彩内容。


微信公众号

Batch提交

Batch提交是用一个Connection来对若干个SQL进行一次性、批量提交给MySQL处理。但有一个问题是,如果某个SQL 挂了而其他的并不会受影响,这样就无法保证原子性,只能再封装一层Transaction来实现。 同时,Batch处理并不是每个Server都支持的,需要看具体的产品和版本号。

代码

我们通过观察具体的客户端实现代码来了解批量提交是如何实现的 spring-jdbc-5.1.3.RELEASE中的实现如下:

@Override
    public int[] batchUpdate(String sql, SqlParameterSource[] batchArgs) {
        if (batchArgs.length == 0) {
            return new int[0];
        }

        ParsedSql parsedSql = getParsedSql(sql);
        PreparedStatementCreatorFactory pscf = getPreparedStatementCreatorFactory(parsedSql, batchArgs[0]);
        // 调用另外一个batchUpdate方法
        return getJdbcOperations().batchUpdate(
                pscf.getSql(),
                new BatchPreparedStatementSetter() {
                    @Override
                    public void setValues(PreparedStatement ps, int i) throws SQLException {
                        Object[] values = NamedParameterUtils.buildValueArray(parsedSql, batchArgs[i], null);
                        pscf.newPreparedStatementSetter(values).setValues(ps);
                    }
                    @Override
                    public int getBatchSize() {
                        return batchArgs.length;
                    }
                });
    }

第二个batchUpdate方法如下:

@Override
    public int[] batchUpdate(String sql, final BatchPreparedStatementSetter pss) throws DataAccessException {
        if (logger.isDebugEnabled()) {
            logger.debug("Executing SQL batch update [" + sql + "]");
        }

        // 调用execute方法,传入sql和回调接口
        int[] result = execute(sql, (PreparedStatementCallback<int[]>) ps -> {
            try {
                int batchSize = pss.getBatchSize();
                InterruptibleBatchPreparedStatementSetter ipss =
                        (pss instanceof InterruptibleBatchPreparedStatementSetter ?
                        (InterruptibleBatchPreparedStatementSetter) pss : null);
                if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
                    // 如果支持批处理操作则依次将每个SQL的参数进行填充
                    for (int i = 0; i < batchSize; i++) {
                        pss.setValues(ps, i);
                        if (ipss != null && ipss.isBatchExhausted(i)) {
                            break;
                        }
                        ps.addBatch();
                    }
                    // 执行批处理函数(底层交给jdbc driver的实现类去执行,如mysql-connector-java)
                    return ps.executeBatch();
                }
                else {
                    List<Integer> rowsAffected = new ArrayList<>();
                    for (int i = 0; i < batchSize; i++) {
                        pss.setValues(ps, i);
                        if (ipss != null && ipss.isBatchExhausted(i)) {
                            break;
                        }
                        rowsAffected.add(ps.executeUpdate());
                    }
                    int[] rowsAffectedArray = new int[rowsAffected.size()];
                    for (int i = 0; i < rowsAffectedArray.length; i++) {
                        rowsAffectedArray[i] = rowsAffected.get(i);
                    }
                    return rowsAffectedArray;
                }
            }
            finally {
                if (pss instanceof ParameterDisposer) {
                    ((ParameterDisposer) pss).cleanupParameters();
                }
            }
        });

        Assert.state(result != null, "No result array");
        return result;
    }

调用的execute方法如下:

public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
            throws DataAccessException {

        Assert.notNull(psc, "PreparedStatementCreator must not be null");
        Assert.notNull(action, "Callback object must not be null");
        if (logger.isDebugEnabled()) {
            String sql = getSql(psc);
            logger.debug("Executing prepared SQL statement" + (sql != null ? " [" + sql + "]" : ""));
        }

        Connection con = DataSourceUtils.getConnection(obtainDataSource());
        PreparedStatement ps = null;
        try {
            ps = psc.createPreparedStatement(con);
            applyStatementSettings(ps);
            T result = action.doInPreparedStatement(ps);
            handleWarnings(ps);
            return result;
        }
        catch (SQLException ex) {
            // Release Connection early, to avoid potential connection pool deadlock
            // in the case when the exception translator hasn't been initialized yet.
            if (psc instanceof ParameterDisposer) {
                ((ParameterDisposer) psc).cleanupParameters();
            }
            String sql = getSql(psc);
            JdbcUtils.closeStatement(ps);
            ps = null;
            DataSourceUtils.releaseConnection(con, getDataSource());
            con = null;
            throw translateException("PreparedStatementCallback", sql, ex);
        }
        finally {
            if (psc instanceof ParameterDisposer) {
                ((ParameterDisposer) psc).cleanupParameters();
            }
            JdbcUtils.closeStatement(ps);
            DataSourceUtils.releaseConnection(con, getDataSource());
        }
    }

上面是spring-jdbc的batchUpdate逻辑的实现,但是具体的业务执行是由Server来执行的,所以不同的Server的实现机制不同可能对性能有较大影响。 以MySQL为例,在mysql-connector-java-5.1.43的实现如下:

    public boolean canRewriteAsMultiValueInsertAtSqlLevel() throws SQLException {
        return this.parseInfo.canRewriteAsMultiValueInsert;
    }

    @Override
        protected long[] executeBatchInternal() throws SQLException {
            synchronized (checkClosed().getConnectionMutex()) {

                if (this.connection.isReadOnly()) {
                    throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"),
                            SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
                }

                if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
                    return new long[0];
                }

                // we timeout the entire batch, not individual statements
                int batchTimeout = this.timeoutInMillis;
                this.timeoutInMillis = 0;

                resetCancelledState();

                try {
                    statementBegins();

                    clearWarnings();
                    // 注意这里的条件,batchHasPlainStatements默认初始化就是false,而rewriteBatchedStatements
                    // 需要在jdbc url里制定,否则不会走合并SQL的逻辑,如jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&rewriteBatchedStatements=true
                    if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) {

                        if (canRewriteAsMultiValueInsertAtSqlLevel()) {
                            return executeBatchedInserts(batchTimeout);
                        }

                        if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null
                                && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */) {
                            return executePreparedBatchAsMultiStatement(batchTimeout);
                        }
                    }

                    return executeBatchSerially(batchTimeout);
                } finally {
                    this.statementExecuting.set(false);

                    clearBatch();
                }
            }
        }

    /** * Rewrites the already prepared statement into a multi-value insert * statement of 'statementsPerBatch' values and executes the entire batch * using this new statement. * * @return update counts in the same fashion as executeBatch() * * @throws SQLException */
    protected long[] executeBatchedInserts(int batchTimeout) throws SQLException {
        ......
    }

如何整合

根据DML的类型是insert、update还是delete,会走不同方法:

总结

如果想要达到MySQL真正batchUpdate效果,需要有以下几个条件:

References

本文首次发布于 ElseF’s Blog, 作者 @stuartlau ,转载请保留原文链接.

上一篇下一篇

猜你喜欢

热点阅读