如何做到数据变更的自动化?
为什么会有数据变更
在以平台为级别的软件集合中,为了保证软件质量的可控,错误的可追溯,不可避免的会通过一些流程来约束、框定公司的各种条条框框,比如发布需要有流程才能发布,业界有JIRA
、ITSM
这种优秀的集成式软件来制定流程。
而其中数据变更也需要一个流程来控制,当线上需要发生数据变更的时候,团队需要先编写数据库脚本,编写完后提一个流程到DBA
这边,DBA
执行以后会告诉提变更的人具体变更的行数有多少,确认后DBA
再做COMMIT
,执行完后确认变更结果,至此一整个数据变更的业务流就完成了。
而这一切的一切都是需要人来参与的,人就代表着会出错,会有沟通成本,这显然不符合如今敏捷开发的思想,更何况你的生产网往往与办公网段处于物理隔离的时候,这种变更流程所带来的时间和空间成本变得比较大,很多很急的数据变更不能及时的相应,最后甚至会有平台内部的一些子团队会分配一个人力去专门提流程、发布。如果能把这个现有的流程做提取,自动化之后,团队的成本就能进一步的降低,解放出来的DBA
也能够做更有意义的事情。
有了问题,开发人员和架构师就会对应参与进来,企图通过软件的手段来解决这个问题,在与DBA充分沟通、整理了大致的业务流之后,发现其实核心的业务执行流程很简单,下面就简单的列出如下。
数据变更的核心业务流初步方案
本着又不是不能用的心境,先写个最简单的咯,其实就是简单的JDBC
的执行
// 录入端传入decSqlListStr
// delete from tableName1; delete from tableName2;
// 简易实现
String decSqlListStr = decSqlListStr.trim();
String[] sqlArr = decSqlListStr.split("\\n*\\s*;\\n*\\s*");
int[] count = jdbcTemplate.batchUpdate(); // 返回批量内容
这个例子直接返回了允许结果集,用户可以直接查看对应的影响行数。
改进方案
上述的实现是最初的简单想法,很显然和上方的期望业务流比只做到了一部分,并且不提供回滚策略,但至少做到了数据变更自动化,可以不需要人参与自动执行了,但现今的这个操作是一个很危险的操作,对于提数据变更的人可能会出现不敢做变更的情况,典型的执行出去的SQL
就像泼出去的水,再也回不来了!
其实只要对他的数据变更做一定的控制即可?这就诞生了第二个思路:录入数据影响范围,如果最终执行的数据变更不在提交的范围内部的话,程序会抛出异常,Spring
框架的特性导致了抛出异常之后可以自动回滚。增量的代码如下所示。
// 在执行executeBatch下执行代码块
Long allCount = 0L;
for (int i = 0; i < resCount.length; i++) {
allCount += resCount[i];
}
if (allCount < min && allCount > max) { // min、max为传入的录入参数
throw new EasiSqlException("您的语句总影响行数超过配置行数,无法操作,系统自动回滚");
}
这么做是一种折中的解决方案,来做到应用层的相对可控、可回滚。
但这么做也有一个问题,既然如此,执行数据变更的用户都尽量大了填,即使要做到数据变更范围不瞎填,还要在变更前count
一下,那就失去了做自动化变更的意义了,那有没有一种完美的方案,来充分的做到上方画的业务流图呐。像DBA
最常用的Toad
、Navicat
都是怎么做分段式提交的呐?因为Java都是同步的,无法做异步获取提交。那其实这个业务流最核心的问题就是如何做到分段式的执行和提交。
分段式提交
数据库客户端在执行成功后会返回影响行数,然后再等待
DBA
的commit
或者rollback
,这种分段式提交可以让用户做二段式确认,可以让执行有回旋的余地。也是数据变更最常用的一种方案。
最终的方案(实现)
分段就代表了异步,说到Java的异步,就想到了异步的线程执行数据变更,当执行指令发出去后,建立一个数据库的提交线程,这个线程包含数据库连接、执行的语句、影响行数、线程ID号、开始时间、是否终止以及提交状态
数据库连接
connection
数据库连接的缓存可以做到让数据库进行分段式的数据提交
影响行数
excuteCount
执行完发送notify事件的时候可以发送影响的行数,便于操作人员判断
线程ID号
threadId
标识了内容,保证notify事件的唯一性
开始时间
statTimestemp
记录开始事件,来释放因为业务原因导致没触发提交、回滚的资源
是否终止
end
由于机器的资源有限,因此需要定期清理执行完的线程,终止标识可以让清理程序抓去到后释放线程池的资源。
提交状态
commitLevel
此为异步的核心,决定了一个数据变更事件是否提交的标识,当数据变更在数据库层执行完成后,线程定期检查状态标识,不同的标识对应不同的策略,分为三种状态:
hold
的线程等待状态,此时等待着指令触发提交或者回滚,此时会一直等待指令的触发commit
的提交状态,此时触发数据提交rollback
的回滚状态,此时触发数据执行
/***
* 数据库层面的commit等待线程
*/
public class DBCommitThread extends Thread {
private final Log LOG = LogFactory.getLog(ClearDBCommintTaskConfig.class);
private final static int MAX_RETRY_TIME = 5;
// 半小时不释放,自动回滚(毫秒)
private final static Integer EXCUTE_MAX_MINUTE = 1800000;
// JDBC执行
private Connection connection;
// 执行的SQL语句
private String[] sqlList;
private String threadId;
private Long statTimestemp;
private int[] excuteCount;
private int retryTime = 0;
public boolean end = false;
// 0: hold, 1: commit, 2: rollback
private String commitLevel = "0";
public DBCommitThread (Connection connection, String[] sqlList, String threadId) {
this.connection = connection;
this.sqlList = sqlList;
this.threadId = threadId;
}
@Override
public void run () {
Statement stmt = null;
try {
connection.setAutoCommit(false);
statTimestemp = System.currentTimeMillis();
stmt = connection.createStatement();
for (int i = 0; i < sqlList.length; i++) {
stmt.addBatch(sqlList[i]);
}
if (stmt != null) {
// 执行
int[] count = stmt.executeBatch();
excuteCount = count;
notifyToServer("1", null);
while ("0".equals(this.commitLevel)) {
sleep(2000);
Long nowExcuteTime = System.currentTimeMillis() - statTimestemp;
if (DBCommitThread.EXCUTE_MAX_MINUTE < nowExcuteTime) {
// holding too long, rollback
this.commitLevel = "2";
}
}
// commit or rollback
if ("1".equals(this.commitLevel)) {
LOG.info("------BATCH DB COMMIT------");
connection.commit();
} else {
LOG.info("------BATCH DB ROLLBACK------");
connection.rollback();
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
notifyToServer("0", e.getMessage());
rollback();
} finally {
close(stmt, connection);
}
// End Thread
if (!isInterrupted()) {
isInterrupted();
this.end = true;
}
}
/***
* outer call
* @param commitLevel commit level
*/
public void setCommitLevel (String commitLevel) {
this.commitLevel = commitLevel;
}
public String getExcuteCount () {
return JsonUtils.beanToJson(excuteCount);
}
private void rollback () {
try {
connection.rollback();
} catch (SQLException se) {
LOG.error(se.getMessage(), se);
}
}
private void close (Statement stmt, Connection connection) {
try {
if (null != stmt) {
stmt.close();
}
if (null != connection && !connection.isClosed()) {
connection.setAutoCommit(true);
connection.close();
}
} catch (SQLException se) {
LOG.error(se.getMessage(), se);
}
}
private void notifyToServer (String successStatus, String errMsg) {
// 通知主程序更新数据变更状态,为第一张业务流程图里的第二级,此处实现不做展示
}
}
线程池方法实现如下,清理Task为直接调用
clearEndThread
不做展示
public class ThreadFactory {
public static Map<String, DBCommitThread> dbCommitThreadMap;
static {
ThreadFactory.dbCommitThreadMap = new ConcurrentHashMap<>();
}
// 创建线程,数据变更开始建立
public static String createDBCommitThread (Connection connection, String[] sqlList) {
// 超出内容
if (dbCommitThreadMap.size() >= 20) {
throw new EasiSqlException("Too mush executeBatch Thread");
}
String uuid = UUID.randomUUID().toString();
DBCommitThread newThread = new DBCommitThread(connection, sqlList, uuid);
newThread.start();
dbCommitThreadMap.put(uuid, newThread);
return uuid;
}
// 触发设置指令级别,二段式提交的提交指令
public static void setCurrentThreadCommitLevel (String threadId, String commitLevel) {
if (ThreadFactory.dbCommitThreadMap.containsKey(threadId)) {
DBCommitThread dbCommitThread = ThreadFactory.dbCommitThreadMap.get(threadId);
dbCommitThread.setCommitLevel(commitLevel);
}
}
// 清理程序定期清理
public static void clearEndThread () {
Set<String> mapSet = ThreadFactory.dbCommitThreadMap.keySet();
Iterator it = mapSet.iterator();
while(it.hasNext()) {
DBCommitThread dbCommitThread = (DBCommitThread) it.next();
if (dbCommitThread.end) {
it.remove();
}
}
}
}
具体的业务流程对应上述的方法如下
createDBCommitThread
开始创建notifyToServer
通知中央管理层更新变更状态和数据- 等待指令触发
setCurrentThreadCommitLevel
,分别对应提交成功和提交回滚clearEndThread
清理程序运行- 中央控制程序关闭数据变更
结语
当然,整体的软件架构远远没有描述的这么简单,平台级别的软件体系涉及到了非常庞大的数据库个数,其中对数据执行层、管理层、用户验证层做了分布式的拆分,数据执行层参照微服务里面的sidecar
设计,它是数据库的一个配套服务,执行层配合管理层做了服务发现,进行机器的统一管理,多活,对外公布API服务,死机器踢出,而数据变更自动化仅仅是数据中台设计中的一个子模块设计,究其本质,都是为了让应用的开发维护能更方便,可以给用户提供更优质、快速的数据服务。