MybatisJava技术升华其他零散知识点

Java 批量导入百万级数据到mysql

2020-06-28  本文已影响0人  雪飘千里

需求:把一个500M的txt文件导入mysql数据库,数据量大概有几千万。

1、项目架构

项目采用微服务架构,上传文件的后台管理系统是作为应用层,业务处理的作为服务层,应用层和服务层都有多个节点。

如果是单一节点处理,那么读取500M的文件(可能更大),并把几百万的数据要快速的写入数据库,是非常有难度的。但是我们可以利用微服务多个节点分散处理,即应用层只读取数据,读10000条就放入线程池向服务层发起请求,而服务层有多个节点,可以把应用层的请求平均到多个节点处理,在应用层每个节点先做数据解析,然后插入mysql。

2、服务层处理

2.1 文件读取

文件读取我们用BufferedReader,BufferedReader使用装饰器模式,它的IO行为是每次读进来8K的数据到缓冲区(当然,缓冲区的大小我们是可以通过构造器修改的),如果需要使用数据的时候,再直接从缓冲区里面拿出数据来使用。

而FileReader的read方法,每调用一次就会read一次file,进行一次IO。不管是多次read还是一次性的read,都不是很优雅的在read文件的方式。多次read必然会产生多次IO,一次性的read如果遇到很大的文件,对内存是极不友好的。

所以BufferedReader既能提高的读取速度,又节省了IO的次数,是一种比较优雅的读取文件的方式。
BufferedWriter和FIleWriter同理。

研究一下BufferedReader的源码,就会发现,BufferedReader中对文件的读取还是通过FileReader来实现的,BufferedReader只是对其读取到的数据做一下缓冲,api如下。其中buffer操作的api和java nio中对buffer的操作类似。

image.png

有缓冲区 VS 没有缓冲区

我们这里的需求是顺序读取,如果是随机读取,则使用RandomAccessFile。
所谓随机读取,就是说我们需要自由访问文件的任意位置(指定位置读,指定位置写),所以如果需要访问文件的部分内容,RandomAccessFile将是更好的选择。所以当我们要下载一个大文件时,可以通过多线程使用RandomAccessFile来实现。同样的,对于文件的切割、合并,使用RandomAccessFile效率都会很高。

2.2 业务处理

BufferedReader读取文件后,每读10000行,就丢入线程池,然后调用服务层处理,应用层这里不做任何业务逻辑处理,因为应用层必然是用一个节点处理业务,但是=我们通过http调用服务层后,是可以通过多个节点处理的,这样就可以提升处理效率。


ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        
BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("E:\\code\\esg-cemp-core-nonactivity-01394067\\logs\\system\\system2.log"), StandardCharsets.UTF_8));
       
            List<String> lines = new ArrayList<>();
            String lineTXT="";
            long count = 0;
            while ((lineTXT = bufferedReader.readLine()) != null) {
               if(count % 10000 == 0){
                   executorService.execute(new Runnable() {
                       @Override
                       public void run() {
                           //调用微服务
                           System.out.println("------调用微服务--------");
                       }
                   });
                   lines.clear();
               }
               lines.add(lineTXT);
               count++;
            }

            if(lines.size()>0){
                //调用微服务
                System.out.println("-------调用微服务-------");
            }
            //及时关闭线程池,对于这种使用不是很频繁的线程池使用完毕以后,可以及时关闭以节省资源
           //shutdown关闭以后,就不能再提交任务了,
            threadPoolExecutor.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }

3、业务层处理

3.1 业务处理

业务处理比较简单,就是把String 切割成多个字段,然后变换成PO对象。

3.2 mysql 插入

扩展:

1.可以在sql语句后携带分号,实现多语句执行。
2.可以执行批处理,同时发出多个SQL语句。
#{}:是以预编译的形式,将参数设置到sql语句中;PreparedStatement;防止sql注入
${}:取出的值直接拼装在sql语句中;会有安全问题;
如下所示:
select * from tbl_employee where id=${id} and last_name=#{lastName}
Preparing: select * from tbl_employee where id=2 and last_name=?

如果id为 '2 or 1=1',则会发生数据泄露,这就是sql注入。
Preparing: select * from tbl_employee where id=2 or 1=1 and last_name=?
//当前活跃的,即正在被使用的连接数
System.out.println(dataSource.getActiveCount()); ——活跃连接都在activeConnections(Map)中

//最大连接数
System.out.println("========="+dataSource.getMaxActive()); 

//poolingCount值代表剩余可用的连接数,每次从末尾拿走连接
System.out.println(dataSource.getPoolingCount());  ——剩余可用连接都在connections(数组)中

//activeCount + poolingCount只能小于等于maxActive
activeCount + poolingCount <= maxActive

上一篇下一篇

猜你喜欢

热点阅读