快速归档数据库数据

2020-06-08  本文已影响0人  一直在路上_求名

背景介绍

为什么进行数据归档?因为在实际工作中,数据都是需要保存在数据库的,但是由于数据库本身的特性,当数据量太大会对数据库的性能有较大的影响,因此需要减少数据,将一些过旧的数据进行归档。比如一些操作流水记录,这些数据本身没太大意义,但是由于业务要求可能需要给客服能查询出用户的流水信息,方便处理问题,但是这个数据并不需要存太长时间,所以就需要将旧数据归档到另一个数据库,从而提升系统的性能。在实际工作中数据归档是十分普遍的场景,而且每次归档的数据一般都是比较大量的数据,如何能高效的把数据归档,是一个十分重要的事情。

归档的方式

常见方式

在实际的工作过程中,由于 dao 层都是使用现有的框架,所以在归档的时候大多采用的方式就是使用 dao 层框架提供的方式进行归档。而最常见的 dao 层框架就是 mybatis,所以使用的最多的归档方式就是使用 mybatis 提供的 <foreach> 标签,来动态生成 sql 从而完成数据的批量插入。

<!-- foreach标签介绍 -->
<!-- 
collection="tests" 循环集合的名称,默认是list
item="test" 循环后的对象别名
separator="," 循环的分割符
-->
<insert id="insertTestList">
    insert into test (
            id,
            name,
            create_time
       ) values
      <foreach collection="tests"  item="test"  separator=",">
            (
              #{test.id, jdbcType=BIGINT},
              #{test.name, jdbcType=VARCHAR},
              #{test.createTime, jdbcType=TIMESTAMP}
            )
      </foreach>
</insert>

上面的示例就是最简单的一个使用 foreach 标签批量入库的操作,在正常的小批量操作是完全可以满足要求的。但是对于归档的操作,由于归档的数据量通常比较大,所以使用 foreach 标签并不能很好的处理。因为 mybatis 在处理标签的使用需要进行解析,会生成需对的中间对象,不光会影响批量插入的性能,还会导致 GC 频繁,可能会导致 CPU 使用率飙升。使用 foreach 标签归档数据时,每次归档 50 条数据左右性能是最好的。

高效方式

在使用 dao 层框架提供的批量入库操作时,由于框架本身的限制,并不能提供很好的性能,归档的速度也会比较慢。因此要想高效的归档,就必须放弃使用框架本身提供的方式,而使用 mysql 的原生方法进行归档。刚好 mysql 也提供了一种高效的数据导入的方式 load data infile 这个方式能非常高效的插入数据库。

/**
* 往 StringBuilder 里追加数据
* @param builder StringBuilder
* @param object  数据
* @param endFlag  结束的标识,true 就是结束
*/
private void builderAppend(StringBuilder builder, Object object , boolean endFlag) {
    builder.append(objectToString(object));
    if (endFlag) {
        builder.append("\n");
    } else {
        builder.append("\t");
    }
}

/**
* append数据时的一些逻辑处理,如果数据本身含有特殊符号需要进行替换
* @param object 数据
*/
private String objectToString(Object object) {
    if (null == object) {
        return "\\N";
     }
    if (object instanceof Date) {
          return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format((Date) object);
    } else {
         String str = String.valueOf(object);
         if (str.contains("\t")) {
              str = str.replaceAll("\t", " ");
          }
          if (str.contains("\n")) {
              str = str.replaceAll("\n" ," ");
          }
          return str;
      }
}
/**
* 批量插入数据
* @param list 数据列表
* @return 插入的行数
* @throws Exception 抛出的异常
*/
@Override
public int batchInsert(List<Test> list) throws Exception{
    String[] columnArray = TABLE_COLUMNS.split(",");
    StringBuilder sb = new StringBuilder();
    for (Test operateLog :list) {
         for (int i = 0, size = columnArray.length ; i < size ; i++) {
             if (Objects.isNull(columnArray[i])) {
                 continue;
             }
            builderAppend(sb , getObjectValue(columnArray[i] , operateLog) , i == size - 1);
         }
   }
  int insertRow = insertDataByLoadData(sb);
  log.info("insert归档积分操作记录表数量insertRow:"+insertRow+" 需插入的数量:"+list.size());
  return insertRow;
}
/**
* 通过 LOAD DATA LOCAL INFILE 批量导入数据到数据库
* @param builder 拼接的数据
*/
private int insertDataByLoadData(StringBuilder builder) throws SQLException, IOException {
        int rows = 0;
        InputStream input = null;
        try {
            byte[] bytes = builder.toString().getBytes();
            if (bytes.length > 0) {
                input = new ByteArrayInputStream(bytes);
                //批量插入数据。
                long beginTime = System.currentTimeMillis();
                rows = realInsertByInputStream(input);
                long endTime = System.currentTimeMillis();
                log.info(INSERT_SAL+":【插入" + rows + "行数据至MySql中,耗时" + (endTime - beginTime) + "ms。】");
            }
        } finally {
            if (null != input) {
                input.close();
            }
            if (null != conn) {
                conn.close();
            }
        }
        return rows;
    }
/**
 * 将数据从输入流批量导入到数据库。
 * @param inputStream   输入流。
 * @return int         成功插入的行数。
 */
private int realInsertByInputStream(InputStream inputStream) throws SQLException {
        if (null == inputStream) {
            log.info("输入流为NULL,没有数据导入。");
            return 0;
        }
        conn = Objects.requireNonNull(jdbcTemplate.getDataSource()).getConnection();
        PreparedStatement statement = conn.prepareStatement(INSERT_SAL);
        //mysql-connector-java 6
        if (statement.isWrapperFor(com.mysql.cj.api.jdbc.Statement.class)) {
            com.mysql.cj.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.cj.jdbc.PreparedStatement.class);
            mysqlStatement.setLocalInfileInputStream(inputStream);
            return mysqlStatement.executeUpdate();
        }
        return 0;
}

以上是如何使用 mybatis 提供的 load data infile ,在 Java 中的操作方式,这种方式需要注意的是:
1、需要数据库的支持,可以使用以下命令查看,和设置

查看
SHOW VARIABLES LIKE '%local%';
如果 local_infile 为 OFF 则需要设置,用下面的命令设置即可
SET GLOBAL local_infile=1;

2、由于使用的时候不同的 mysql-connector-java 的版本,可能会有稍许差别,所以需要注意。

总结

数据库归档是十分常见的操作,因此使用高效的归档方式是有必要的;
在工作中由于归档操作一般是晚上执行,但是如果效率太低,执行的时间太长,很可能会到白天才执行完成。而白天需要给用户提供很好的体验,所以如果归档太慢,很可能影响用户的整个体验。
最后,我整理了一份简单的归档demo,有需要可以借鉴。
https://github.com/zhanyiping/backup-data.git

上一篇下一篇

猜你喜欢

热点阅读