RandomAccessFile文件锁踩坑--write高并发引

2019-11-26  本文已影响0人  南风nanfeng

背景

多线程写入文件,要考虑线程同步问题,实现数据完整落盘磁盘备份。
操作系统:
win10:没问题
centos7:有问题

    public static void writeFileLock(String content, String filePath) {
        File file = new File(filePath);
        RandomAccessFile raf = null;
        FileChannel fileChannel = null;
        FileLock fileLock = null;
        try {
            raf = new RandomAccessFile(file, "rw");
            fileChannel = raf.getChannel();
            while (true) {
                try {
                    fileLock = fileChannel.tryLock();
                    if (fileLock != null) {
                        break;
                    }
                } catch (Exception e) {
                    Thread.sleep(0);
                }
            }
            raf.seek(raf.length());
            raf.write(content.getBytes());
            fileLock.release();
            fileChannel.close();
            raf.close();
        } catch (Exception e) {
            log.error("写文件异常", e);
            log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
        }
    }

RandomAccessFile建立文件连接符,raf获取文件管道,文件管道获取文件锁,tryLock方法有两个特点:第一、非阻塞,调用后立刻返回;第二、没拿到锁可能返回null,也可以能抛出异常,所以if判断循环获取,异常块捕获异常再重新尝试获取锁,注意Thread.sleep(0)的作用并不是睡0秒,而是马上加入到可执行队列,等待cpu的时间分片。

这段代码承载线上的kafka多线程备份消息的任务,用lock协调多线程的写入同步,埋点监控发现,备份数据偶发遗漏,大概2.3亿数据,会有5条偏差,就是漏了。

下面记录压测思路及过程。

准备

压测代码:

private static final ExecutorService FILE_THREADS = Executors.newFixedThreadPool(100);

public void execute(String... strings) throws Exception {

        int cnt = 100 * 100 * 100;
        int idx = 1;
        long begin = 1574305200000L;
        while (idx <= cnt) {
            Map<String, Object> map = new HashMap<>();
            map.put("id", idx);
            map.put("time", begin);
            String timeDirectory = DateUtil.getBeforeOneHour("yyyyMMddHHmm", 8, begin);
            String mm = DateUtil.getBeforeOneHour("mm", 0, begin).concat(".txt");
            String json = JsonUtil.getJosnString(map).concat(System.getProperty("line.separator"));
            FILE_THREADS.execute(new PersistThread(timeDirectory, mm , json));
            if (idx % 10000 == 0) {
                begin += 60000L;
            }
            idx++;
        }
}

private class PersistThread extends Thread {

        String time;
        String filename;
        String content;

        PersistThread(String time, String filename, String content) {
            this.time = time;
            this.filename = filename;
            this.content = content;
        }

        @Override
        public void run() {
            String folder = "/data/job_project/txt/" + time + "/";
            FileUtil.createDirectory(folder);
            FileUtil.writeFileIO(content, folder + filename);
        }
}

创建100个线程的线程池,提交写入文件Thread任务,实现多线程写入文件,且文件目录、文件是动态创建的(模拟线上),id每自增1万创建一个时间戳目录,格式是:yyyyMMddHHmm,在目录下创建一个文件,写入1万行数据,相当于100个线程,动态写入100个目录下的100个文件中,每个文件写入1万行。

首先怀疑创建目录和文件:

代码如下:

    public static File createDirectory(String path) {
        File file = new File(path);
        if (!file.exists() && !file.isDirectory()) {
             file.mkdirs();
        }
        return file;
    }

    public static File createFile(String file) {
        File f = null;
        try {
            f = new File(file);
            if (!f.exists()) {
                f.createNewFile();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return f;
    }

创建目录和文件,逻辑都是先检查再创建,显然不是原子的,所以怀疑有没有可能是多线程环境中,目录重复创建导致,所以把代码优化成两次判断的同步方式,如下:

    public static File createDirectory(String path) {
        File file = new File(path);
        if (!file.exists() && !file.isDirectory()) {
            synchronized (FileUtil.class) {
                if (!file.exists() && !file.isDirectory()) {
                    file.mkdirs();
                }
            }
        }
        return file;
    }

    public static File createFile(String file) {
        File f = null;
        try {
            f = new File(file);
            if (!f.exists()) {
                synchronized (FileUtil.class) {
                    if (!f.exists()) {
                        f.createNewFile();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return f;
    }

压入100w数据,观察结果,大失所望:

/data/job_project/txt/201911211100/00.txt lines: 9989
/data/job_project/txt/201911211101/01.txt lines: 9996
/data/job_project/txt/201911211102/02.txt lines: 9984
/data/job_project/txt/201911211103/03.txt lines: 9984
/data/job_project/txt/201911211104/04.txt lines: 9982

事实是绝大部分文件都漏了,下面把所有的目录和文件全部规划好,再试。
规划目录脚本:

#!/bin/sh
txt=/data/job_project/txt/*
for folder in $txt;do
    filename=${folder##*/}
    if [[ $filename = "f.sh" ]] || [[ $filename = "search.sh" ]];then
        echo "$filename is a shell file"
    else
        filename=${filename:10}
        filepath=${folder}/${filename}.txt
        #rm -f $filepath
        #touch $filepath
        lines=$(wc -l ${filepath} | awk '{print $1}')
        if [ $lines -ne 10000 ];then
            echo "$filepath lines: $lines"
        fi
    fi
done

结果仍然会漏数据。

为了彻底屏蔽创建目录和文件带来的影响,下面的压测前都创建好了文件和目录。

使用RandomAccessFile的rws方式同步写入文件。

测试结果:

/data/job_project/txt/201911211101/01.txt lines: 9998
/data/job_project/txt/201911211106/06.txt lines: 9999
/data/job_project/txt/201911211107/07.txt lines: 9999
/data/job_project/txt/201911211109/09.txt lines: 9999
/data/job_project/txt/201911211112/12.txt lines: 9999
/data/job_project/txt/201911211116/16.txt lines: 9998
/data/job_project/txt/201911211119/19.txt lines: 9999
/data/job_project/txt/201911211120/20.txt lines: 9998
...

压测过程十分缓慢,写入性能非常差,但是结果震惊,仍然漏了,仔细看了官网api注解:

     * <p>The <tt>"rwd"</tt> mode can be used to reduce the number of I/O
     * operations performed.  Using <tt>"rwd"</tt> only requires updates to the
     * file's content to be written to storage; using <tt>"rws"</tt> requires
     * updates to both the file's content and its metadata to be written, which
     * generally requires at least one more low-level I/O operation.
     *
     * <p>If there is a security manager, its {@code checkRead} method is
     * called with the pathname of the {@code file} argument as its
     * argument to see if read access to the file is allowed.  If the mode
     * allows writing, the security manager's {@code checkWrite} method is
     * also called with the path argument to see if write access to the file is
     * allowed.

rwd模式同步文件内容,rws模式同步文件内容和文件元数据,压测首选当然选择更严格的rws,结果仍然遗漏,此时已经开始怀疑jdk源码了。

调整close顺序,校验lock

第一处改动:
    if (fileLock != null) {
        break;
    }
多加一层校验,改成
    if (fileLock != null && fileLock.isValid()) {
        break;
    }

第二处改动:
    fileLock.release();
    fileChannel.close();
    raf.close();
调整close顺寻,改成:
    fileLock.release();
    raf.close();
    fileChannel.close();

测试结果:

/data/job_project/txt/201911211100/00.txt lines: 9989
/data/job_project/txt/201911211101/01.txt lines: 9996
/data/job_project/txt/201911211102/02.txt lines: 9984
/data/job_project/txt/201911211103/03.txt lines: 9984
/data/job_project/txt/201911211104/04.txt lines: 9982
...

结果显示,反而漏了更多数据,此时已经自闭了,但是还要接着撸。

使用channel写入缓冲区

public static void writeFileLock(String content, String filePath, String time) {
        File file = createFile(filePath);
        RandomAccessFile raf = null;
        FileChannel fileChannel = null;
        FileLock fileLock = null;
        try {
            raf = new RandomAccessFile(file, "rw");
            fileChannel = raf.getChannel();
            while (true) {
                try {
                    fileLock = fileChannel.tryLock();
                    if (fileLock != null && fileLock.isValid()) {
                        break;
                    }
                } catch (Exception e) {
                    Thread.sleep(0);
                }
            }
            fileChannel.write(ByteBuffer.wrap(content.getBytes()), fileChannel.size());
            fileLock.release();
            raf.close();
            fileChannel.close();
        } catch (Exception e) {
            log.error("写文件异常", e);
            log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
        }
    }

改变写入方式,用nio的管道channel写入数据,结果仍然失望。

日志埋点——使用redis计数器

埋点代码:

    public static void writeFileLock(String content, String filePath, String time) {
        File file = createFile(filePath);
        RandomAccessFile raf = null;
        FileChannel fileChannel = null;
        FileLock fileLock = null;
        try {
            redisHelper.incr("filelock0:".concat(time));
            raf = new RandomAccessFile(file, "rw");
            fileChannel = raf.getChannel();
            while (true) {
                try {
                    fileLock = fileChannel.tryLock();
                    if (fileLock != null && fileLock.isValid()) {
                        break;
                    }
                } catch (Exception e) {
                    Thread.sleep(0);
                }
            }

            redisHelper.incr("filelock1:".concat(time));
            raf.seek(raf.length());
            redisHelper.incr("filelock2:".concat(time));
            raf.write(content.getBytes());
            redisHelper.incr("filelock3:".concat(time));
            fileLock.release();
            redisHelper.incr("filelock4:".concat(time));
            raf.close();
            redisHelper.incr("filelock5:".concat(time));
            fileChannel.close();
            redisHelper.incr("filelock6:".concat(time));
        } catch (Exception e) {
            log.error("写文件异常", e);
            log.error("写入文件路径:{}, 文件内容:{}", filePath, content);
        }
    }

此时对这段代码彻底失望,得找到数据在哪个位置漏掉的,所以使用了redis计数器,incr是线程安全得,所以能够很快发现到底哪里出问题了,问题马上浮出水面,心中窃喜。
再说明一下:redis的key包含目录名称,即一个目录一个文件一个key,埋点的密集显示出来必胜的信心。
结果是所有key的value都是完美的10000,毫无破绽,心如死灰,于是有同事提议,搞个反查,看看RangdomAccessFile的指针到底有没有更新。

判断RandomAccessFile的文件指针,是不是有没更新指针的情况

    long filelength = raf.length();
    raf.seek(filelength);
    raf.write(content.getBytes());
    if(filelength == raf.length()){
        log.error ( "errorrrrrrrrrrrrr: "+ content);
    }

如果write方法没有写入文件,那么文件指针必然没有更新,调用write后再反查文件指针是否更新,就能判断write是否有写入。结果仍然失望,预期的日志没有打印,说明write确实更新了文件指针,但是就是漏掉了几行数据,结合上述redis计数器埋点和文件指针判断,压测已经走进了死胡同,所有的情况都试过了,至少可以说两点:第一、文件锁没有问题,锁的线程没有逃逸出while循环;第二、测试的每一行代码都执行了到位了,没有哪一行没有执行的。百思不得其解,那就下班,次日再战。

java.io包+可重入锁的方式

昨天的压测可以说把所有情况都试过了,还有试过lock阻塞方式,fileChannel方式写入缓冲区,此处不表。今天决定换个思路,拒绝花里胡哨,就用jdk1.0版本的java.io包+ReentrantLock可重入锁的方式写,代码如下:

    public static void writeSyncFile(String content, String filePath) {
        try {
            fileLock.lock();
            File file = createFile(filePath);
            FileWriter fw = new FileWriter(file, true);
            BufferedWriter bw = new BufferedWriter(fw);
            bw.write(content);
            bw.flush();
            fw.close();
            bw.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            fileLock.unlock();
        }
    }

结果可想而知,每个目录的每个文件,都是完美的10000行,且由于使用了缓冲区,文件写入效率大幅提升,具体提升幅度没有严格计算,使用同步块的方式+写入buffer的方式大概2分钟就能写完,而使用上述方式可能要1小时以上,效率杠杠的。普通的文件io方式没有问题,于是同事提议,用FileOutputStream替代RandomAccessFile看看。

替换RandomAccessFile,使用FileOutputStream获取channel

决定抛弃RandomAccessFile,使用FileOutputStream获取channel,代码如下:

    public static void writeFileIO(String content, String path) {
        FileLock lock = null;
        try {
            FileChannel channel = new FileOutputStream(path, true).getChannel();
            while (true) {
                try {
                    lock = channel.lock();
                    if (lock != null && lock.isValid()) {
                        break;
                    }
                } catch (Exception e) {
                    Thread.sleep(10);
                }
            }
            channel.write(ByteBuffer.wrap(content.getBytes()));
            lock.release();
            channel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

RandomAccessFile是任意读写的类,而FileOutputStream没有这个功能,要想追加写入文件末尾,在构造方法加个true就行,同样能实现我们想要的功能,第一次压测后,3分钟就出结果,100w数据压入100个文件,每个文件10000行,与预期结果完全相符,完美!乘胜追亚,再压1000w发现数据有误,结果是oom,压入的数据全部写入线程池的阻塞队列中了,于是调大内存到6g,还是如此,奈何机器资源有限,改压400w,结果数据与预期完全符合,此时水落石出,没有想到坑在RandomAccessFile这里,回过头来看这个类,虽然这个类的注释已经被看烂了,比较诡异的是jdk1.0就出的,但是作者未知,可能怕被人喷,嘿嘿嘿。

总结

1、代码不是复制粘特,光搜索谷歌百度,往往很多噪音。
2、高并发场景要多次严格压测,保证数据质量。
3、千万区分windows系统和linux系统,二者的文件系统完全不同,上述代码在windows完全没问题,但是linux就是状况百出。
4、怀疑精神,代码都是人写的,就会有bug,测试用例覆盖所有场景,测试各种可能性。

上一篇下一篇

猜你喜欢

热点阅读