如何做到数据变更的自动化?

2019-06-08  本文已影响0人  merjiezo

为什么会有数据变更

在以平台为级别的软件集合中,为了保证软件质量的可控,错误的可追溯,不可避免的会通过一些流程来约束、框定公司的各种条条框框,比如发布需要有流程才能发布,业界有JIRAITSM这种优秀的集成式软件来制定流程。

而其中数据变更也需要一个流程来控制,当线上需要发生数据变更的时候,团队需要先编写数据库脚本,编写完后提一个流程到DBA这边,DBA执行以后会告诉提变更的人具体变更的行数有多少,确认后DBA再做COMMIT,执行完后确认变更结果,至此一整个数据变更的业务流就完成了。

而这一切的一切都是需要人来参与的,人就代表着会出错,会有沟通成本,这显然不符合如今敏捷开发的思想,更何况你的生产网往往与办公网段处于物理隔离的时候,这种变更流程所带来的时间和空间成本变得比较大,很多很急的数据变更不能及时的相应,最后甚至会有平台内部的一些子团队会分配一个人力去专门提流程、发布。如果能把这个现有的流程做提取,自动化之后,团队的成本就能进一步的降低,解放出来的DBA也能够做更有意义的事情。

有了问题,开发人员和架构师就会对应参与进来,企图通过软件的手段来解决这个问题,在与DBA充分沟通、整理了大致的业务流之后,发现其实核心的业务执行流程很简单,下面就简单的列出如下。

数据变更的核心业务流

初步方案

本着又不是不能用的心境,先写个最简单的咯,其实就是简单的JDBC的执行

// 录入端传入decSqlListStr
// delete from tableName1; delete from tableName2;
// 简易实现
String decSqlListStr = decSqlListStr.trim();
String[] sqlArr = decSqlListStr.split("\\n*\\s*;\\n*\\s*");
int[] count = jdbcTemplate.batchUpdate(); // 返回批量内容

这个例子直接返回了允许结果集,用户可以直接查看对应的影响行数。

改进方案

上述的实现是最初的简单想法,很显然和上方的期望业务流比只做到了一部分,并且不提供回滚策略,但至少做到了数据变更自动化,可以不需要人参与自动执行了,但现今的这个操作是一个很危险的操作,对于提数据变更的人可能会出现不敢做变更的情况,典型的执行出去的SQL就像泼出去的水,再也回不来了!

其实只要对他的数据变更做一定的控制即可?这就诞生了第二个思路:录入数据影响范围,如果最终执行的数据变更不在提交的范围内部的话,程序会抛出异常,Spring框架的特性导致了抛出异常之后可以自动回滚。增量的代码如下所示。

// 在执行executeBatch下执行代码块
Long allCount = 0L;
for (int i = 0; i < resCount.length; i++) {
  allCount += resCount[i];
}
if (allCount < min && allCount > max) {  // min、max为传入的录入参数
  throw new EasiSqlException("您的语句总影响行数超过配置行数,无法操作,系统自动回滚");
}

这么做是一种折中的解决方案,来做到应用层的相对可控、可回滚。

但这么做也有一个问题,既然如此,执行数据变更的用户都尽量大了填,即使要做到数据变更范围不瞎填,还要在变更前count一下,那就失去了做自动化变更的意义了,那有没有一种完美的方案,来充分的做到上方画的业务流图呐。像DBA最常用的ToadNavicat都是怎么做分段式提交的呐?因为Java都是同步的,无法做异步获取提交。那其实这个业务流最核心的问题就是如何做到分段式的执行和提交。

分段式提交

数据库客户端在执行成功后会返回影响行数,然后再等待DBAcommit或者rollback,这种分段式提交可以让用户做二段式确认,可以让执行有回旋的余地。也是数据变更最常用的一种方案。

最终的方案(实现)

分段就代表了异步,说到Java的异步,就想到了异步的线程执行数据变更,当执行指令发出去后,建立一个数据库的提交线程,这个线程包含数据库连接、执行的语句、影响行数、线程ID号、开始时间、是否终止以及提交状态

数据库连接connection

数据库连接的缓存可以做到让数据库进行分段式的数据提交

影响行数excuteCount

执行完发送notify事件的时候可以发送影响的行数,便于操作人员判断

线程ID号threadId

标识了内容,保证notify事件的唯一性

开始时间statTimestemp

记录开始事件,来释放因为业务原因导致没触发提交、回滚的资源

是否终止end

由于机器的资源有限,因此需要定期清理执行完的线程,终止标识可以让清理程序抓去到后释放线程池的资源。

提交状态commitLevel

此为异步的核心,决定了一个数据变更事件是否提交的标识,当数据变更在数据库层执行完成后,线程定期检查状态标识,不同的标识对应不同的策略,分为三种状态:

  1. hold的线程等待状态,此时等待着指令触发提交或者回滚,此时会一直等待指令的触发
  2. commit的提交状态,此时触发数据提交
  3. rollback的回滚状态,此时触发数据执行
/***
 * 数据库层面的commit等待线程
 */
public class DBCommitThread extends Thread {

    private final Log LOG = LogFactory.getLog(ClearDBCommintTaskConfig.class);

    private final static int MAX_RETRY_TIME = 5;

    // 半小时不释放,自动回滚(毫秒)
    private final static Integer EXCUTE_MAX_MINUTE = 1800000;

    // JDBC执行
    private Connection connection;

    // 执行的SQL语句
    private String[] sqlList;

    private String threadId;

    private Long statTimestemp;

    private int[] excuteCount;

    private int retryTime = 0;

    public boolean end = false;

    // 0: hold, 1: commit, 2: rollback
    private String commitLevel = "0";

    public DBCommitThread (Connection connection, String[] sqlList, String threadId) {
        this.connection = connection;
        this.sqlList = sqlList;
        this.threadId = threadId;
    }

    @Override
    public void run () {
        Statement stmt = null;
        try {
            connection.setAutoCommit(false);
            statTimestemp = System.currentTimeMillis();
            stmt = connection.createStatement();
            for (int i = 0; i < sqlList.length; i++) {
                stmt.addBatch(sqlList[i]);
            }
            if (stmt != null) {
                // 执行
                int[] count = stmt.executeBatch();
                excuteCount = count;
                notifyToServer("1", null);

                while ("0".equals(this.commitLevel)) {
                    sleep(2000);
                    Long nowExcuteTime = System.currentTimeMillis() - statTimestemp;
                    if (DBCommitThread.EXCUTE_MAX_MINUTE < nowExcuteTime) {
                        // holding too long, rollback
                        this.commitLevel = "2";
                    }
                }
                // commit or rollback
                if ("1".equals(this.commitLevel)) {
                    LOG.info("------BATCH DB COMMIT------");
                    connection.commit();
                } else {
                    LOG.info("------BATCH DB ROLLBACK------");
                    connection.rollback();
                }
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            notifyToServer("0", e.getMessage());
            rollback();
        } finally {
            close(stmt, connection);
        }

        // End Thread
        if (!isInterrupted()) {
            isInterrupted();
            this.end = true;
        }
    }

    /***
     * outer call
     * @param commitLevel commit level
     */
    public void setCommitLevel (String commitLevel) {
        this.commitLevel = commitLevel;
    }

    public String getExcuteCount () {
        return JsonUtils.beanToJson(excuteCount);
    }

    private void rollback () {
        try {
            connection.rollback();
        } catch (SQLException se) {
            LOG.error(se.getMessage(), se);
        }
    }

    private void close (Statement stmt, Connection connection) {
        try {
            if (null != stmt) {
                stmt.close();
            }
            if (null != connection && !connection.isClosed()) {
                connection.setAutoCommit(true);
                connection.close();
            }
        } catch (SQLException se) {
            LOG.error(se.getMessage(), se);
        }
    }

    private void notifyToServer (String successStatus, String errMsg) {
      // 通知主程序更新数据变更状态,为第一张业务流程图里的第二级,此处实现不做展示
    }

}

线程池方法实现如下,清理Task为直接调用clearEndThread不做展示

public class ThreadFactory {

    public static Map<String, DBCommitThread> dbCommitThreadMap;

    static {
        ThreadFactory.dbCommitThreadMap = new ConcurrentHashMap<>();
    }

    // 创建线程,数据变更开始建立
    public static String createDBCommitThread (Connection connection, String[] sqlList) {
        // 超出内容
        if (dbCommitThreadMap.size() >= 20) {
            throw new EasiSqlException("Too mush executeBatch Thread");
        }
        String uuid = UUID.randomUUID().toString();
        DBCommitThread newThread = new DBCommitThread(connection, sqlList, uuid);
        newThread.start();
        dbCommitThreadMap.put(uuid, newThread);
        return uuid;
    }

    // 触发设置指令级别,二段式提交的提交指令
    public static void setCurrentThreadCommitLevel (String threadId, String commitLevel) {
        if (ThreadFactory.dbCommitThreadMap.containsKey(threadId)) {
            DBCommitThread dbCommitThread = ThreadFactory.dbCommitThreadMap.get(threadId);
            dbCommitThread.setCommitLevel(commitLevel);
        }
    }

    // 清理程序定期清理
    public static void clearEndThread () {
        Set<String> mapSet = ThreadFactory.dbCommitThreadMap.keySet();
        Iterator it = mapSet.iterator();
        while(it.hasNext()) {
            DBCommitThread dbCommitThread = (DBCommitThread) it.next();
            if (dbCommitThread.end) {
                it.remove();
            }
        }
    }
}

具体的业务流程对应上述的方法如下

  1. createDBCommitThread开始创建
  2. notifyToServer通知中央管理层更新变更状态和数据
  3. 等待指令触发setCurrentThreadCommitLevel,分别对应提交成功和提交回滚
  4. clearEndThread清理程序运行
  5. 中央控制程序关闭数据变更

结语

当然,整体的软件架构远远没有描述的这么简单,平台级别的软件体系涉及到了非常庞大的数据库个数,其中对数据执行层、管理层、用户验证层做了分布式的拆分,数据执行层参照微服务里面的sidecar设计,它是数据库的一个配套服务,执行层配合管理层做了服务发现,进行机器的统一管理,多活,对外公布API服务,死机器踢出,而数据变更自动化仅仅是数据中台设计中的一个子模块设计,究其本质,都是为了让应用的开发维护能更方便,可以给用户提供更优质、快速的数据服务。

顶层的代码设计,Hunter为管理层,watch dog为执行层,双方均支持可扩展
上一篇下一篇

猜你喜欢

热点阅读