JAVA IO专题二:java NIO读取文件并通过socket

2021-01-14  本文已影响0人  挡不住的柳Willow

相关IO专题

JAVA IO专题一:java InputStream和OutputStream读取文件并通过socket发送,到底涉及几次拷贝
JAVA IO专题二:java NIO读取文件并通过socket发送,最少拷贝了几次?堆外内存和所谓的零拷贝到底是什么关系
JAVA IO专题三:java的内存映射和应用场景
JAVA IO专题四:java顺序IO原理以及对应的应用场景

内核的零拷贝

内核的零拷贝,指的是不需要消耗CPU资源,完全交给DMA来处理,内核空间的数据没有多余的拷贝。主要经历了这么几个发展历程:

一、传统的read + send

1、先调用操作系统的read函数,由DMA将文件拷贝到内核,然后CPU把内核数据拷贝到用户缓冲区(堆外内存)
2、调用操作系统的send函数,由CPU把用户缓冲区的数据拷贝到socket缓冲区,最后DMA把socket缓冲区数据拷贝到网卡进行发送。

这个过程中内核数据拷贝到用户空间,用户空间又拷贝回内存,有两次多余的拷贝。

二、sendfile初始版本

直接调用sendfile来发送文件,流程如下:
1、首先通过 DMA将数据从磁盘读取到内核
2、然后通过 CPU将数据从内核拷贝到socket缓冲区
3、最终通过 DMA将socket缓冲区数据拷贝到网卡发送

sendfile 与 read + send 方式相比,少了一次 CPU的拷贝。但是从上述过程中也可以发现从内核缓冲区拷贝到socket缓冲区是没必要的。

三、sendfile改进版本,真正的零拷贝

内核为2.4或者以上版本的linux系统上,改进后的处理过程如下:
1、DMA 将磁盘数据拷贝到内核缓冲区,向socket缓冲区中追加当前要发送的数据在内核缓冲区中的位置和偏移量
2、DMA gather copy 根据 socket缓冲区中的位置和偏移量,直接将内核缓冲区中的数据拷贝到网卡上。
经过上述过程,数据只经过了 2 次 copy 就从磁盘传送出去了。并且没有CPU的参与。

java的零拷贝

一、利用directBuffer

在上一篇文章JAVA IO专题一:java InputStream和OutputStream读取文件并通过socket发送,到底涉及几次拷贝中,我们提到了基于BIO读取文件发送消息,一共涉及六次拷贝,其中堆外和堆内内存的拷贝是多余的,我们可以利用directBuffer来减少这两次拷贝:

//打开文件通道
FileChannel fileChannel = FileChannel.open(Paths.get("/test.txt"));
//申请堆外内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
//读取到堆外内存
fileChannel.read(byteBuffer);
//打开socket通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9099));
//堆外内存写入socket通道
socketChannel.write(byteBuffer);

每一行代码都有清楚的注释,我们主要来看一下fileChannel.read、socketChannel.write做了什么:

//FileChannelImpl
public int read(ByteBuffer dst) throws IOException {
    ... 忽略了一堆不重要代码
        synchronized (positionLock) {
            int n = 0;
            int ti = -1;
            try {
                do {
                   // 调用IOUtil,根据文件描述符fd读取数据到直接缓冲区dst中
                    n = IOUtil.read(fd, dst, -1, nd);
                } while ((n == IOStatus.INTERRUPTED) && isOpen());
                return IOStatus.normalize(n);
            } finally {
                threads.remove(ti);
                end(n > 0);
                assert IOStatus.check(n);
            }
        }
    }
//IOUtil
static int read(FileDescriptor fd, ByteBuffer dst, long position,
                    NativeDispatcher nd)
        throws IOException
    {
        ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
        try {
            int n = readIntoNativeBuffer(fd, bb, position, nd);
            bb.flip();
            if (n > 0)
                dst.put(bb);
            return n;
        } finally {
            Util.offerFirstTemporaryDirectBuffer(bb);
        }
    }

private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                            long position, NativeDispatcher nd)
        throws IOException
    {
        int pos = bb.position();
        int lim = bb.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        if (rem == 0)
            return 0;
        int n = 0;
        if (position != -1) {
            n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
                         rem, position);
        } else {
           //第一次读取会走到这里,否则走上面的分支
            n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
        }
        if (n > 0)
            bb.position(pos + n);
        return n;
    }
//FileDispatcherImpl
 int read(FileDescriptor fd, long address, int len) throws IOException {
        return read0(fd, address, len);
    }

这里的调用链比较深,我们一步一步梳理:

  1. 调用fileChannel.read实际是走到了FileChannelImpl.read方法,然后走到n = IOUtil.read(fd, dst, -1, nd);调用IOUtil的read,传入了文件描述符、directBuffer
  2. IOUtil 调用自己的readIntoNativeBuffer方法,字面意思是讲数据读取到native缓存,即堆外内存
  3. IOUtil 的 readIntoNativeBuffer 方法调用n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);,即NativeDispatcher 的read方法,传入文件描述符,堆外内存地址以及要读取的长度
  4. 这里的 NativeDispatcher 实现类为 FileDispatcherImpl,实际调用的是native方法read0,并传入了文件描述符、堆外内存地址和读取长度

我们简单看一下native的read0方法做了什么:

// 以下内容来自于 jdk/src/solairs/native/sun/nio/ch/FileDispatcherImpl.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,
                             jobject fdo, jlong address, jint len)
{
    //拿到文件描述符
    jint fd = fdval(env, fdo);
    //根据地址拿到堆外内存的指针
    void *buf = (void *)jlong_to_ptr(address);
  //直接调用系统函数read把文件描述符读取到buf中
    return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}

可以看到native的read0方法是直接调用系统函数read,根据jvm传过来的堆外内存地址,将文件数据读取到堆外内存中(read方法的作用在内核零拷贝小节里已经提到了)。即直接操作堆外内存,而不使用DirectByteBuffer的时候,还需要将堆外内存拷贝到堆内进行读写JAVA IO专题一:java InputStream和OutputStream读取文件并通过socket发送,到底涉及几次拷贝),因此使用堆外内存+channel的方式,可以避免堆内外内存拷贝,一定程度上也能提高效率。

//SocketChannelImpl.java
  public int write(ByteBuffer buf) throws IOException {
        synchronized (writeLock) {
              ... 忽略不重要代码
            int n = 0;
            try {
                for (;;) {
                    //调用IOUtil.write写数据
                    n = IOUtil.write(fd, buf, -1, nd);
                    if ((n == IOStatus.INTERRUPTED) && isOpen())
                        continue;
                    return IOStatus.normalize(n);
                }
            } finally {
                writerCleanup();
            }
        }
    }
//IOUtil.java
static int write(FileDescriptor fd, ByteBuffer src, long position,
                     NativeDispatcher nd)
        throws IOException
    {
        if (src instanceof DirectBuffer)
             //directBuffer直接走这里
            return writeFromNativeBuffer(fd, src, position, nd);
    }

  private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                             long position, NativeDispatcher nd) throws IOException
    {
        int pos = bb.position();
        int lim = bb.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        int written = 0;
        if (rem == 0)
            return 0;
        if (position != -1) {
            written = nd.pwrite(fd,
                                ((DirectBuffer)bb).address() + pos,
                                rem, position);
        } else {
            //调用SocketDispatcher写数据
            written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
        }
        if (written > 0)
            bb.position(pos + written);
        return written;
    }

//SocketDispatcher.java
int write(FileDescriptor fd, long address, int len) throws IOException {
        //直接调用了FileDispatcherImpl的native方法write0
        return FileDispatcherImpl.write0(fd, address, len);
    }

在看native方法之前还是先做简单的梳理:

  1. socketChannel.write 实际调用了SocketChannelImpl.write,然后调用IOUtil.write(fd, buf, -1, nd); 传入文件描述符和堆外内存引用
  2. IOUtil.write调用自己的私有方法 writeFromNativeBuffer,内部调用了written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);,将文件描述符、堆外内存地址交给了NativeDispatcher
  3. 此处的NativeDispatcher实际是 SocketDispatcher,里面直接调用了FileDispatcherImpl.write0(fd, address, len);native方法

接着跟踪FileDispatcherImpl.write0(fd, address, len);这个native方法:

// 以下内容来自于 jdk/src/solairs/native/sun/nio/ch/FileDispatcherImpl.c

JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_write0(JNIEnv *env, jclass clazz,
                              jobject fdo, jlong address, jint len)
{
    //转换文件描述符
    jint fd = fdval(env, fdo);
    //转换为堆外内存指针
    void *buf = (void *)jlong_to_ptr(address);
    //直接调用系统函数write将堆外内存数据发送出去
    return convertReturnVal(env, write(fd, buf, len), JNI_FALSE);
}

可以看到native的write0方法是直接调用系统函数write将堆外内存数据发送出去(write方法的作用在内核零拷贝小节里已经提到了)。

二、channel.transferTo

java中的零拷贝就是依赖操作系统的sendfile函数来实现的,提供了channel.transferTo方法,允许将一个channel的数据直接发送到另一个channel,接下来我们通过示例代码和具体的源码来分析和验证前面的说法。
示例代码如下:

//打开socketChannel
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9099));
//
FileChannel fileChannel = FileChannel.open(Paths.get("/test.txt"));
fileChannel.transferTo(0, fileChannel.size(), socketChannel);

只用了一行代码fileChannel.transferTo(0, fileChannel.size(), socketChannel);就把文件数据写到了socket,继续看源码:

//FileChannelImpl.java
public long transferTo(long position, long count,
                           WritableByteChannel target)
        throws IOException
    {
         ... 忽略不重要代码
        long sz = size();
        if (position > sz)
            return 0;
        int icount = (int)Math.min(count, Integer.MAX_VALUE);
        if ((sz - position) < icount)
            icount = (int)(sz - position);

        long n;
          //先尝试直接tranfer,如果内核支持的话
        if ((n = transferToDirectly(position, icount, target)) >= 0)
            return n;
        //尝试mappedTransfer,只适用于受信任的channel类型
        if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
            return n;
          //channel不受信任的话,会走最慢的方式
        return transferToArbitraryChannel(position, icount, target);
    }

// FileChannelimpl.java
private long transferToDirectly(long position, int icount,
                                    WritableByteChannel target)
        throws IOException
    {
        if (!transferSupported)
            //系统不支持就直接返回
            return IOStatus.UNSUPPORTED;

        FileDescriptor targetFD = null;
        if (target instanceof FileChannelImpl) { //如果目标是fileChannel则走这里
            if (!fileSupported)
                return IOStatus.UNSUPPORTED_CASE;
            targetFD = ((FileChannelImpl)target).fd;
        } else if (target instanceof SelChImpl) { 
            //SocketChannel实现了SelChImpl接口,因此会走这里
            if ((target instanceof SinkChannelImpl) && !pipeSupported)
                return IOStatus.UNSUPPORTED_CASE;
            //给targetFD赋值
            targetFD = ((SelChImpl)target).getFD();
        }
        if (targetFD == null)
            return IOStatus.UNSUPPORTED;
        //将fileChannel和socketChannel对应的fd转换为具体的值
        int thisFDVal = IOUtil.fdVal(fd);
        int targetFDVal = IOUtil.fdVal(targetFD);
        //不支持自己给自己传输
        if (thisFDVal == targetFDVal) 
            return IOStatus.UNSUPPORTED;

        long n = -1;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return -1;
            do { 
                //调用native方法transferTo0
                n = transferTo0(thisFDVal, position, icount, targetFDVal);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            if (n == IOStatus.UNSUPPORTED_CASE) {
                if (target instanceof SinkChannelImpl)
                    pipeSupported = false;
                if (target instanceof FileChannelImpl)
                    fileSupported = false;
                return IOStatus.UNSUPPORTED_CASE;
            }
            if (n == IOStatus.UNSUPPORTED) {
                // Don't bother trying again
                transferSupported = false;
                return IOStatus.UNSUPPORTED;
            }
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end (n > -1);
        }
    }

代码有点长:

  1. 调用FileChannelImpl的transferTo,会尝试三种情况,如果系统支持零拷贝,则走 transferToDirectly
  2. transferToDirectly 方法前面做了各种判断,其实可以理解为直接调用了n = transferTo0(thisFDVal, position, icount, targetFDVal);native方法

再来跟踪transferTo0:

// 以下内容来自于 jdk/src/solairs/native/sun/nio/ch/FileChannelImpl.c
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
                                            jint srcFD,
                                            jlong position, jlong count,
                                            jint dstFD)
{
#if defined(__linux__)
    off64_t offset = (off64_t)position;
    //直接调用sendfile
    jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
    if (n < 0) {
        if (errno == EAGAIN)
            return IOS_UNAVAILABLE;
        if ((errno == EINVAL) && ((ssize_t)count >= 0))
            return IOS_UNSUPPORTED_CASE;
        if (errno == EINTR) {
            return IOS_INTERRUPTED;
        }
        JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
        return IOS_THROWN;
    }
    return n;
}

这个方法里其实有linux、solaris、APPLE等多个平台的实现,这里只截取linux下的实现,可以看到是直接调用了系统函数sendfile来实现的数据发送,具体的拷贝次数则要看linux内核的版本了。

总结

参考文章

Java 堆外内存、零拷贝、直接内存以及针对于NIO中的FileChannel的思考
JavaIO原理剖析之 网络IO

上一篇下一篇

猜你喜欢

热点阅读