做一个高性能的java流式存储项目你需要知道的一些事儿

2023-11-22  本文已影响0人  江江的大猪

1、目前能够在网上搜到的java相关的高性能文件io文章都比较基础,想深入的话需要既了解java的文件操作api原理,又了解文件操作相关的系统调用,这就造成了学习困难
2、实际上elasticsearch、kafka、rocketmq里关于文件操作的java实现已经很好,本文也参考了其中不少代码实现
3、本文的每个结论均提供测试代码验证,方便读者在自己的机器上验证
4、本文贴出的jdk native源码版本是openjdk的jdk8-b120
5、本文需要读者对java文件操作、虚拟内存、物理内存、pagecache有一定基础了解
6、本文的测试代码需要在TEST_PATH目录下提前准备test1-test30文件(根据自己测试环境调整),可以通过fallocate -l 1G test1快速创建30个1g大小的测试文件
7、测试代码依赖如下,引入netty只是为了使用其中的PlatformDependent工具类,引入JNA是为了执行系统调用

        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>1.37</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>1.37</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.101.Final</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>5.13.0</version>
        </dependency>

基础部分

FileChannel、MappedByteBuffer的初始化和释放

    FileChannel channel = new RandomAccessFile(TEST_PATH + "test1", "rw").getChannel();
    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
    PlatformDependent.freeDirectBuffer(mappedByteBuffer);
    fileChannel.close();

完整源码:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileChannelImpl.c

    Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len)
    {
        if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
            protections = PROT_READ;
            flags = MAP_SHARED;
        } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
            protections = PROT_WRITE | PROT_READ;
            flags = MAP_SHARED;
        } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
            protections =  PROT_WRITE | PROT_READ;
            flags = MAP_PRIVATE;
        }
        mapAddress = mmap64(
                0,                    /* Let OS decide location */
                len,                  /* Number of bytes to map */
                protections,          /* File permissions */
                flags,                /* Changes are shared */
                fd,                   /* File descriptor of mapped file */
                off);                 /* Offset into file */
    }

可以看到会对parent执行close,所以对FileChannel执行close就会对RandomAccessFile执行close。其实对RandomAccessFile执行close也会对FileChannel执行close,所以这俩关闭一个就行。一般我们只把RandomAccessFile当做获取FileChannel的工具人,不会保留它的引用,所以最终只会执行FileChannel的close

    protected void implCloseChannel() throws IOException {
        if (this.fileLockTable != null) {
            Iterator var1 = this.fileLockTable.removeAll().iterator();
            while(var1.hasNext()) {
                FileLock var2 = (FileLock)var1.next();
                synchronized(var2) {
                    if (var2.isValid()) {
                        this.nd.release(this.fd, var2.position(), var2.size());
                        ((FileLockImpl)var2).invalidate();
                    }
                }
            }
        }
        this.threads.signalAndWait();
        if (this.parent != null) {
            ((Closeable)this.parent).close();
        } else {
            this.nd.close(this.fd);
        }
    }

FileChannel write和DirectBuffer的关系

    static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {
            int var5 = var1.position();
            int var6 = var1.limit();
            int var7 = var5 <= var6 ? var6 - var5 : 0;
            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
            int var10;
            try {
                var8.put(var1);
                var8.flip();
                var1.position(var5);
                int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                if (var9 > 0) {
                    var1.position(var5 + var9);
                }
                var10 = var9;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var8);
            }
            return var10;
        }
    }

FileChannel read和DirectBuffer的关系

    static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1.isReadOnly()) {
            throw new IllegalArgumentException("Read-only buffer");
        } else if (var1 instanceof DirectBuffer) {
            return readIntoNativeBuffer(var0, var1, var2, var4);
        } else {
            ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
            int var7;
            try {
                int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                var5.flip();
                if (var6 > 0) {
                    var1.put(var5);
                }
                var7 = var6;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var5);
            }
            return var7;
        }
    }

FileChannel force参数true/false的区别

完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileDispatcherImpl.c

    Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this, jobject fdo, jboolean md)
    {
        if (md == JNI_FALSE) {
            result = fdatasync(fd);
        } else {
            result = fsync(fd);
        }
    }

FileChannel的transferTo方法介绍

对该方法的解析可以看我之前的文章:https://www.jianshu.com/p/11ed05ca62ff

MappedByteBuffer load方法

    public final MappedByteBuffer load() {
        load0(mappingAddress(offset), length);
        // Read a byte from each page to bring it into memory. A checksum
        // is computed as we go along to prevent the compiler from otherwise
        // considering the loop as dead code.
        Unsafe unsafe = Unsafe.getUnsafe();
        int ps = Bits.pageSize();
        int count = Bits.pageCount(length);
        long a = mappingAddress(offset);
        byte x = 0;
        for (int i=0; i<count; i++) {
            x ^= unsafe.getByte(a);
            a += ps;
        }
        if (unused != 0)
            unused = x;
        return this;
    }

可以看出load0的作用是告诉操作系统这段内存接下来willneed,操作系统可以对这段内存做些优化处理,比如预读和提前加载之类的,可以加快后续对每个页都触发填充的效率
完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

    Java_java_nio_MappedByteBuffer_load0(JNIEnv *env, jobject obj, jlong address, jlong len)
    {
        int result = madvise((caddr_t)a, (size_t)len, MADV_WILLNEED);
    }

MappedByteBuffer isLoad方法

    public final boolean isLoaded() {
        return isLoaded0(mappingAddress(offset), length, Bits.pageCount(length));
    }

完整源码请看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

    Java_java_nio_MappedByteBuffer_isLoaded0(JNIEnv *env, jobject obj, jlong address, jlong len, jint numPages)
    {
        jboolean loaded = JNI_TRUE;
        unsigned char *vec = (unsigned char *)malloc(numPages * sizeof(char));
        mincore(address, (size_t)len, vec);
        for (i=0; i<numPages; i++) {
            if (vec[i] == 0) {
                loaded = JNI_FALSE;
                break;
            }
        }
        return loaded;
    }

调用了MappedByteBuffer的load方法后,isLoad不一定一直为true

当pagecache不够用了的时候,操作系统会将cache按照规则释放或者放入swap区,测试代码如下,如果测试电脑的可用内存小于30g,基本上最后会显示false

    public static void main(String[] args) throws IOException, InterruptedException {
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "/test1", "rw");
        FileChannel fileChannel = r.getChannel();
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        byteBuffer.load();
        new Thread(() -> {
            try {
                for (int i = 2; i < 31; i++) {
                    RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                    FileChannel channel = f.getChannel();
                    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                    buffer.load();
                }
                TimeUnit.DAYS.sleep(1);
            } catch (Exception e) {
            }
        }).start();
        TimeUnit.SECONDS.sleep(20);
        System.out.println(byteBuffer.isLoaded());
    }

FileChannel和MappedByteBuffer读写测试

http://thinkinjava.cn/2019/05/12/2019/05-12-java-nio/

https://developer.aliyun.com/article/899469
https://dunwu.github.io/java-tutorial/pages/747d3e/#warmup

从FileChannel读到HeapByteBuffer
从FileChannel读到DirectByteBuffer
从HeapByteBuffer写入FileChannel
从DirectByteBuffer写入FileChannel
从DirectByteBuffer写入FileChannel并force(false)
从DirectByteBuffer写入FileChannel并force(true)
从MappedByteBuffer读到数组
从数组写入MappedByteBuffer
从HeapByteBuffer写入MappedByteBuffer
从DirectByteBuffer写入MappedByteBuffer
从DirectByteBuffer写入MappedByteBuffer并force

FileChannel读到DirectBuffer比读到HeapBuffer性能好,如上文所述,符合预期
FileChannel写入DirectBuffer比HeapBuffer,如上文所述,符合预期
FileChannel force(false)比force(true)好一些
MappedByteBuffer读到数组中有压倒性读取优势,比FileChannel高两个数量级
MappedByteBuffer写入数组数据性能好一些,写入DirectBuffer和HeapBuffer差不多
MappedByteBuffer比FileChannel的写入性能、force性能都好
mac上测试结果差不多,但是MappedByteBuffer从数组、HeapBuffer、DirectBuffer写入差距不像linux上明显

Benchmark                                               Mode  Cnt      Score   Error   Units
MyBenchmark.fileChannelRead2DirectBuffer               thrpt         968.349          ops/ms
MyBenchmark.fileChannelRead2HeapBuffer                 thrpt         669.176          ops/ms
MyBenchmark.fileChannelWriteFromDirectBuffer           thrpt         286.462          ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForce      thrpt           3.992          ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForceMeta  thrpt           3.169          ops/ms
MyBenchmark.fileChannelWriteFromHeapBuffer             thrpt         258.994          ops/ms
MyBenchmark.mappedRead2Array                           thrpt       67872.596          ops/ms
MyBenchmark.mappedWriteFromArray                       thrpt        1675.428          ops/ms
MyBenchmark.mappedWriteFromDirectBuffer                thrpt        1585.909          ops/ms
MyBenchmark.mappedWriteFromDirectBufferForce           thrpt           6.651          ops/ms
MyBenchmark.mappedWriteFromHeapBuffer                  thrpt        1559.150          ops/ms
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
@Warmup(iterations = 3, time = 10)
@Measurement(iterations = 1, time = 10)
@Threads(16)
@State(Scope.Benchmark)
public class MyBenchmark {
    private static final int SIZE = pageSize;
    private FileChannel fileChannel;
    private MappedByteBuffer mappedByteBuffer;
    private final ThreadLocal<ByteBuffer> fileChannelRead2HeapBufferThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<ByteBuffer> fileChannelRead2DirectBufferThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<byte[]> mappedRead2ArrayThreadLocal = new ThreadLocal<>();
    private final byte[] srcByteArray = new byte[SIZE];
    private final ByteBuffer srcHeapBuffer = ByteBuffer.allocate(SIZE);
    private final ByteBuffer srcDirectBuffer = ByteBuffer.allocateDirect(SIZE);

    @Setup
    public void setup() throws IOException {
        for (int i = 0; i < SIZE; i++) {
            srcByteArray[i] = 9;
        }
        srcHeapBuffer.put(srcByteArray, 0, SIZE);
        srcHeapBuffer.flip();
        srcDirectBuffer.put(srcByteArray, 0, SIZE);
        srcDirectBuffer.flip();
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
        fileChannel = r.getChannel();
        mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
    }

    @TearDown
    public void tearDown() throws IOException {
        PlatformDependent.freeDirectBuffer(mappedByteBuffer);
        fileChannel.close();
    }

    @Benchmark
    public void fileChannelRead2HeapBuffer(Blackhole blackhole) throws IOException {
        ByteBuffer byteBuffer = fileChannelRead2HeapBufferThreadLocal.get();
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocate(SIZE);
            fileChannelRead2HeapBufferThreadLocal.set(byteBuffer);
        }
        blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelRead2DirectBuffer(Blackhole blackhole) throws IOException {
        ByteBuffer byteBuffer = fileChannelRead2DirectBufferThreadLocal.get();
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocateDirect(SIZE);
            fileChannelRead2DirectBufferThreadLocal.set(byteBuffer);
        }
        blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromHeapBuffer(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcHeapBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromDirectBuffer(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromDirectBufferForce(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        fileChannel.force(false);
    }

    @Benchmark
    public void fileChannelWriteFromDirectBufferForceMeta(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        fileChannel.force(true);
    }

    @Benchmark
    public void mappedRead2Array(Blackhole blackhole) {
        byte[] array = mappedRead2ArrayThreadLocal.get();
        if (array == null) {
            array = new byte[SIZE];
            mappedRead2ArrayThreadLocal.set(array);
        }
        blackhole.consume(mappedByteBuffer.slice().get(array, 0, SIZE));
    }

    @Benchmark
    public void mappedWriteFromArray(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcByteArray, 0, SIZE));
    }

    @Benchmark
    public void mappedWriteFromHeapBuffer(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcHeapBuffer.slice()));
    }

    @Benchmark
    public void mappedWriteFromDirectBuffer(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
    }

    @Benchmark
    public void mappedWriteFromDirectBufferForce(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
        mappedByteBuffer.force();
    }
}

进阶部分

java工程中通过JNA调用libc标准库函数

对JNA不做展开介绍,读者可以自行查阅学习:https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.NativeLong;
import com.sun.jna.Platform;
import com.sun.jna.Pointer;

public interface LibC extends Library {
    LibC INSTANCE = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
    // 是不是很熟悉,大家学的第一个函数应该就是这个吧:)
    void printf(String format, Object... args);
    // 本文的核心,后面详细介绍
    int mlock(Pointer var1, NativeLong var2);
}

通过mlock将锁定物理内存,避免内存被swap,保持物理内存常驻

mlock不做详细展开,读者可以自行查阅,推荐文章如下
http://www.daileinote.com/computer/linux_sys/32
https://www.cnblogs.com/linhaostudy/p/15972330.html

    public static void main(String[] args) throws IOException, InterruptedException {
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
        FileChannel fileChannel = r.getChannel();
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        long address = PlatformDependent.directBufferAddress(byteBuffer);
        Pointer p = new Pointer(address);
        System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
        System.out.println(byteBuffer.isLoaded());
}

该测试程序可以永远执行下去,mlock的返回值也一直会是0(系统调用返回值0代表正常)
读者可以试试只调用mlock,但是不释放MappedByteBuffer的情况,小心电脑死机 : )

    public static void main(String[] args) throws IOException, InterruptedException {
        try {
            while (true) {
                for (int i = 1; i < 31; i++) {
                    RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                    FileChannel channel = f.getChannel();
                    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                    long add = PlatformDependent.directBufferAddress(buffer);
                    Pointer p = new Pointer(add);
                    System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
                    channel.close();
                    PlatformDependent.freeDirectBuffer(buffer);
                }
            }
        } catch (Exception e) {
        }

https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

https://github.com/elastic/elasticsearch/blob/951640b73f71909013f57645cd30e1f19d8c2323/server/src/main/java/org/elasticsearch/bootstrap/JNAKernel32Library.java#L30

获取程序运行机器的操作系统页大小

    private int pageSize = AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
        try {
            Class<?> bitsClass = Class.forName("java.nio.Bits", false, PlatformDependent.getSystemClassLoader());
            Method pageSizeMethod = bitsClass.getDeclaredMethod("pageSize");
            pageSizeMethod.setAccessible(true);
            return (Integer) pageSizeMethod.invoke(null);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });

总结

上一篇 下一篇

猜你喜欢

热点阅读