jvm

从c的角度看java bio

2018-06-08  本文已影响33人  xpbob

第一次学习java的时候,学习到IO的时候总感觉很奇怪,他有三个基本字节流文件IO类,FileInputStream,FileOutputStream,RandomAccessFile。自己本身是从C 学起的,学到C++,unix编程,一直都是拿着文件指针或者文件描述符来进行操作,也是可以跳读的。感觉java的文件操作把c的给分开细化了,由于初学java,并没有仔细的去思考过这个问题。后来知道jvm还有直接内存,就很好奇直接内存到底是什么,为什么java nio中很多都和直接内存相关,我在看视频的时候,里面的老师讲java nio用直接内存拷贝文件,压根没有走到用户态,用户态程序发了一条指令,然后文件就从内核态进行拷贝了。听到这里,我感觉java玩出了新高度,我在C里完全没见过的高度。于是我找了openjdk的代码来阅读,看看是如何实现的,看过源码后,很多问题都解决了,也明白了很多都是谎言。
下面主要来说一下java的阻塞io--bio。主要是说linux的实现。里面有一些linux c的库函数,我会做简单的介绍。

文件操作的过程

具体说之前,必须先普及一个操作系统知识,文件的读取和写入的大概的流程。这里说的是一般情况,用户态没有办法直接操作文件,必须通过系统调用通过内核进行操作。例如读取文件,是从磁盘到内核主存再到用户主存,文件写入是先从用户主存到内核主存再到磁盘。内存映射也是操作的一种方法,这种情况就不需要内核进行数据的拷贝,用户态可以操作内存一样操作文件。
所以说视频上讲的先不进入用户态,直接内核态进行文件拷贝的说法,就有点比较匪夷所思了。

java的操作

以FileInputStream为例来说明,FileOutputStream,RandomAccessFile可以用类似的方法来查看。

FileInputStream

    public FileInputStream(File file) throws FileNotFoundException {
        String name = (file != null ? file.getPath() : null);
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkRead(name);
        }
        if (name == null) {
            throw new NullPointerException();
        }
        if (file.isInvalid()) {
            throw new FileNotFoundException("Invalid file path");
        }
        //文件描述符
        fd = new FileDescriptor();
        fd.attach(this);
        path = name;
        //打开文件
        open(name);
    }

    private void open(String name) throws FileNotFoundException {
        open0(name);
    }

    private native void open0(String name) throws FileNotFoundException;

java里也是维护了文件描述符的,你也看到了,他只是new了这么一个FileDescriptor对象,也没做什么操作。可能比较疑惑,但是写过jni的人都了解,jni是运行native反调java的。文件描述符的设置我们下面在native部分说明。

JNIEXPORT void JNICALL
Java_java_io_FileInputStream_open0(JNIEnv *env, jobject this, jstring path) {
    fileOpen(env, this, path, fis_fd, O_RDONLY);
}

在open函数中,直接调用了fileOpen的方法,后面就直接找c的实现了,不会再单独从java找到调用jni的c的类。fileOpen在solaris\native\java\io\io_util_md.c中。

void
fileOpen(JNIEnv *env, jobject this, jstring path, jfieldID fid, int flags)
{
    WITH_PLATFORM_STRING(env, path, ps) {
        FD fd;

#if defined(__linux__) || defined(_ALLBSD_SOURCE)
        char *p = (char *)ps + strlen(ps) - 1;
        while ((p > ps) && (*p == '/'))
            *p-- = '\0';
#endif
        //打开文件
        fd = handleOpen(ps, flags, 0666);
        if (fd != -1) {
            //设置文件表示符
            SET_FD(this, fd, fid);
        } else {
            throwFileNotFoundException(env, path);
        }
    } END_PLATFORM_STRING(env, ps);
}

在fileOpen中打开了文件,并且把文件描述符设置回去了。这里才是java对象真正获取到文件描述符的地方。

#define open64 open

#define RESTARTABLE(_cmd, _result) do { \
    do { \
        _result = _cmd; \
    } while((_result == -1) && (errno == EINTR)); \
} while(0)

FD
handleOpen(const char *path, int oflag, int mode) {
    FD fd;
    RESTARTABLE(open64(path, oflag, mode), fd);
    if (fd != -1) {
        struct stat64 buf64;
        int result;
        RESTARTABLE(fstat64(fd, &buf64), result);
        if (result != -1) {
            if (S_ISDIR(buf64.st_mode)) {
                close(fd);
                errno = EISDIR;
                fd = -1;
            }
        } else {
            close(fd);
            fd = -1;
        }
    }
    return fd;
}

为了方便阅读,我把重要的宏定义都列举了出来,open64实际就是open,RESTARTABLE其实做的就是把第一个方法运行结果赋值给第二个参数,说白了就是 fd=open64(path, oflag, mode),里面有循环保证运行。这里就能看到实际调用的就是open函数。
再说说read。

jint
readSingle(JNIEnv *env, jobject this, jfieldID fid) {
    jint nread;
    char ret;
    FD fd = GET_FD(this, fid);
    if (fd == -1) {
        JNU_ThrowIOException(env, "Stream Closed");
        return -1;
    }
    nread = IO_Read(fd, &ret, 1);
    if (nread == 0) { /* EOF */
        return -1;
    } else if (nread == -1) { /* error */
        JNU_ThrowIOExceptionWithLastError(env, "Read error");
    }
    return ret & 0xFF;
}

#define IO_Read handleRead

ssize_t
handleRead(FD fd, void *buf, jint len)
{
    ssize_t result;
    RESTARTABLE(read(fd, buf, len), result);
    return result;
}

read中,你最后会找到一个叫IO_Read的函数,实际这个也是宏定义,上面代码中我把这个宏对应的代码贴出,你能看到最后调用的是read函数。宏声明在solaris\native\java\io\io_util_md.h中。这里确实比较绕,使用了宏,而不是直接调用方法。

java堆和native堆

FileOutputStream,RandomAccessFile也是同相同的方法去看,发现都是比较熟悉的系统api的调用。还有一个想说的就是数组的读取,在看到用数组读取的时候你能看到这样的代码,这个代码在read的实现中(带数组的重载)。

            (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);

很多人不写jni,所以看着比较迷惑,这里把c的数组的值,赋值给java的数组,java的对象一般都是在java的堆中的,而native的代码是在native的栈或者堆中的,如果java想用,那么必须有个从native的堆到java的堆中拷贝的过程。这个麻烦的地方就是DirectByteBuffer存在的意义,DirectByteBuffer虽然是java堆中的对象,但是引用native的数据,DirectByteBuffer有点类似指针的意思。

FileChannel的读取

FileInputStream可以通过getChannel获取到FileChannel的对象,我们来看看FileChannel是怎么读取数据的。

    private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                            long position, NativeDispatcher nd)
        throws IOException
    {
        int pos = bb.position();
        int lim = bb.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        if (rem == 0)
            return 0;
        int n = 0;
        if (position != -1) {
            n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
                         rem, position);
        } else {
            n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
        }
        if (n > 0)
            bb.position(pos + n);
        return n;
    }

在读取的时候会分开两种情况

#define pread64 pread

JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz,
                             jobject fdo, jlong address, jint len)
{
    jint fd = fdval(env, fdo);
    void *buf = (void *)jlong_to_ptr(address);

    return convertReturnVal(env, read(fd, buf, len), JNI_TRUE);
}

JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo,
                            jlong address, jint len, jlong offset)
{
    jint fd = fdval(env, fdo);
    void *buf = (void *)jlong_to_ptr(address);

    return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}

调用的也就是系统函数的read和pread。
使用FileChannel并且使用了DirectByteBuffer就可以省去拷贝到java堆空间的操作了,读取速度肯定是有提高的,但是java堆的堆空间是运行时就开辟出来的,native的得开始申请,这个也是有时间消耗的,所以具体的运行速度还是看情况的,单纯看文件读取到内存这块,毕竟还是省去了一部分操作,FileChannel效果更好。

map

#define mmap64 mmap

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
                                     jint prot, jlong off, jlong len)
{
    void *mapAddress = 0;
    jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
    jint fd = fdval(env, fdo);
    int protections = 0;
    int flags = 0;

    if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
        protections = PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
        protections = PROT_WRITE | PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
        protections =  PROT_WRITE | PROT_READ;
        flags = MAP_PRIVATE;
    }
    //映射
    mapAddress = mmap64(
        0,                    /* Let OS decide location */
        len,                  /* Number of bytes to map */
        protections,          /* File permissions */
        flags,                /* Changes are shared */
        fd,                   /* File descriptor of mapped file */
        off);                 /* Offset into file */

    if (mapAddress == MAP_FAILED) {
        if (errno == ENOMEM) {
            JNU_ThrowOutOfMemoryError(env, "Map failed");
            return IOS_THROWN;
        }
        return handle(env, -1, "Map failed");
    }

    return ((jlong) (unsigned long) mapAddress);
}

FileChannel的map使用的就是mmap,这个是真正把数据映射到内存了,不需要再经过内核态的数据拷贝了。

Files.copy和FileChannel.transferTo的比较

jdk7引入了Files这个类,方便了很多文件操作,但是很多人认为这个操作过于方便,不适合大文件等等,应该使用transferTo,transferFrom。
下面我们来看看两者从理论分析上哪个更快

    public long transferTo(long position, long count,
                           WritableByteChannel target)
        throws IOException
    {
        ensureOpen();
        if (!target.isOpen())
            throw new ClosedChannelException();
        if (!readable)
            throw new NonReadableChannelException();
        if (target instanceof FileChannelImpl &&
            !((FileChannelImpl)target).writable)
            throw new NonWritableChannelException();
        if ((position < 0) || (count < 0))
            throw new IllegalArgumentException();
        long sz = size();
        if (position > sz)
            return 0;
        int icount = (int)Math.min(count, Integer.MAX_VALUE);
        if ((sz - position) < icount)
            icount = (int)(sz - position);

        long n;

        // Attempt a direct transfer, if the kernel supports it
        if ((n = transferToDirectly(position, icount, target)) >= 0)
            return n;

        // Attempt a mapped transfer, but only to trusted channel types
        if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
            return n;

        // Slow path for untrusted targets
        return transferToArbitraryChannel(position, icount, target);
    }

这里使用了三种不同的尝试去拷贝文件
transferToDirectly最后调用的是transferTo0

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
                                            jobject srcFDO,
                                            jlong position, jlong count,
                                            jobject dstFDO)
{
    jint srcFD = fdval(env, srcFDO);
    jint dstFD = fdval(env, dstFDO);

#if defined(__linux__)
    off64_t offset = (off64_t)position;
    jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
    if (n < 0) {
        if (errno == EAGAIN)
            return IOS_UNAVAILABLE;
        if ((errno == EINVAL) && ((ssize_t)count >= 0))
            return IOS_UNSUPPORTED_CASE;
        if (errno == EINTR) {
            return IOS_INTERRUPTED;
        }
        JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
        return IOS_THROWN;
    }
    return n;
#elif defined (__solaris__)
    sendfilevec64_t sfv;
    size_t numBytes = 0;
    jlong result;

    sfv.sfv_fd = srcFD;
    sfv.sfv_flag = 0;
    sfv.sfv_off = (off64_t)position;
    sfv.sfv_len = count;

    result = sendfilev64(dstFD, &sfv, 1, &numBytes);

    /* Solaris sendfilev() will return -1 even if some bytes have been
     * transferred, so we check numBytes first.
     */
    if (numBytes > 0)
        return numBytes;
    if (result < 0) {
        if (errno == EAGAIN)
            return IOS_UNAVAILABLE;
        if (errno == EOPNOTSUPP)
            return IOS_UNSUPPORTED_CASE;
        if ((errno == EINVAL) && ((ssize_t)count >= 0))
            return IOS_UNSUPPORTED_CASE;
        if (errno == EINTR)
            return IOS_INTERRUPTED;
        JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
        return IOS_THROWN;
    }
    return result;
#elif defined(__APPLE__)
    off_t numBytes;
    int result;

    numBytes = count;

    result = sendfile(srcFD, dstFD, position, &numBytes, NULL, 0);

    if (numBytes > 0)
        return numBytes;

    if (result == -1) {
        if (errno == EAGAIN)
            return IOS_UNAVAILABLE;
        if (errno == EOPNOTSUPP || errno == ENOTSOCK || errno == ENOTCONN)
            return IOS_UNSUPPORTED_CASE;
        if ((errno == EINVAL) && ((ssize_t)count >= 0))
            return IOS_UNSUPPORTED_CASE;
        if (errno == EINTR)
            return IOS_INTERRUPTED;
        JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
        return IOS_THROWN;
    }

    return result;

#elif defined(_AIX)
    jlong max = (jlong)java_lang_Integer_MAX_VALUE;
    struct sf_parms sf_iobuf;
    jlong result;

    if (position > max)
        return IOS_UNSUPPORTED_CASE;

    if (count > max)
        count = max;

    memset(&sf_iobuf, 0, sizeof(sf_iobuf));
    sf_iobuf.file_descriptor = srcFD;
    sf_iobuf.file_offset = (off_t)position;
    sf_iobuf.file_bytes = count;

    result = send_file(&dstFD, &sf_iobuf, SF_SYNC_CACHE);

    /* AIX send_file() will return 0 when this operation complete successfully,
     * return 1 when partial bytes transfered and return -1 when an error has
     * Occured.
     */
    if (result == -1) {
        if (errno == EWOULDBLOCK)
            return IOS_UNAVAILABLE;
        if ((errno == EINVAL) && ((ssize_t)count >= 0))
            return IOS_UNSUPPORTED_CASE;
        if (errno == EINTR)
            return IOS_INTERRUPTED;
        if (errno == ENOTSOCK)
            return IOS_UNSUPPORTED;
        JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
        return IOS_THROWN;
    }

    if (sf_iobuf.bytes_sent > 0)
        return (jlong)sf_iobuf.bytes_sent;

    return IOS_UNSUPPORTED_CASE;
#else
    return IOS_UNSUPPORTED_CASE;
#endif
}

这里最后发现使用是sendfile的调用

  private static final long MAPPED_TRANSFER_SIZE = 8L*1024L*1024L;


   private long transferToTrustedChannel(long position, long count,
                                          WritableByteChannel target)
        throws IOException
    {
        boolean isSelChImpl = (target instanceof SelChImpl);
        if (!((target instanceof FileChannelImpl) || isSelChImpl))
            return IOStatus.UNSUPPORTED;

        // Trusted target: Use a mapped buffer
        long remaining = count;
        while (remaining > 0L) {
            long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
            try {
                MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size);
                try {
                    // ## Bug: Closing this channel will not terminate the write
                    int n = target.write(dbb);
                    assert n >= 0;
                    remaining -= n;
                    if (isSelChImpl) {
                        // one attempt to write to selectable channel
                        break;
                    }
                    assert n > 0;
                    position += n;
                } finally {
                    unmap(dbb);
                }
            } catch (ClosedByInterruptException e) {
                // target closed by interrupt as ClosedByInterruptException needs
                // to be thrown after closing this channel.
                assert !target.isOpen();
                try {
                    close();
                } catch (Throwable suppressed) {
                    e.addSuppressed(suppressed);
                }
                throw e;
            } catch (IOException ioe) {
                // Only throw exception if no bytes have been written
                if (remaining == count)
                    throw ioe;
                break;
            }
        }
        return count - remaining;
    }

transferToTrustedChannel是通过了mmap,一次最大是使用8m。

transferToArbitraryChannel下面代码有个一次分配的最大值8192。只选取长度小的来申请空间。

    private static final int TRANSFER_SIZE = 8192;

    private long transferFromArbitraryChannel(ReadableByteChannel src,
                                              long position, long count)
        throws IOException
    {
        // Untrusted target: Use a newly-erased buffer
        int c = (int)Math.min(count, TRANSFER_SIZE);
        ByteBuffer bb = Util.getTemporaryDirectBuffer(c);
        long tw = 0;                    // Total bytes written
        long pos = position;
        try {
            Util.erase(bb);
            while (tw < count) {
                bb.limit((int)Math.min((count - tw), (long)TRANSFER_SIZE));
                // ## Bug: Will block reading src if this channel
                // ##      is asynchronously closed
                int nr = src.read(bb);
                if (nr <= 0)
                    break;
                bb.flip();
                int nw = write(bb, pos);
                tw += nw;
                if (nw != nr)
                    break;
                pos += nw;
                bb.clear();
            }
            return tw;
        } catch (IOException x) {
            if (tw > 0)
                return tw;
            throw x;
        } finally {
            Util.releaseTemporaryDirectBuffer(bb);
        }
    }

重要的方法就是里面的read和write了。

    private int readInternal(ByteBuffer dst, long position) throws IOException {
        assert !nd.needsPositionLock() || Thread.holdsLock(positionLock);
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return -1;
            do {
                n = IOUtil.read(fd, dst, position, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }

read走到了IOUtil.read,最后就是上面readIntoNativeBuffer的方法,最后调用的就是底层的read和pread。write最后走到的就是pwrite和write的系统调用。方法的位置在solaris\native\sun\nio\ch\FileDispatcherImpl.c
Files的实现在sun\nio\fs\UnixCopyFile.java中调用了native方法transfer。

JNIEXPORT void JNICALL
Java_sun_nio_fs_UnixCopyFile_transfer
    (JNIEnv* env, jclass this, jint dst, jint src, jlong cancelAddress)
{
    char buf[8192];
    volatile jint* cancel = (jint*)jlong_to_ptr(cancelAddress);

    for (;;) {
        ssize_t n, pos, len;
        RESTARTABLE(read((int)src, &buf, sizeof(buf)), n);
        if (n <= 0) {
            if (n < 0)
                throwUnixException(env, errno);
            return;
        }
        if (cancel != NULL && *cancel != 0) {
            throwUnixException(env, ECANCELED);
            return;
        }
        pos = 0;
        len = n;
        do {
            char* bufp = buf;
            bufp += pos;
            RESTARTABLE(write((int)dst, bufp, len), n);
            if (n == -1) {
                throwUnixException(env, errno);
                return;
            }
            pos += n;
            len -= n;
        } while (len > 0);
    }
}

这里的buffer也一样是8192。系统调用也是read和write。
相比之下transferTo的效果要更好一些。
笔者以前根据jdk7的IO特性,写了一个工具包https://gitee.com/xpbob/commonIO里面有响应的代码,可以在不同的环境下做一下测试。

总结

java bio中最终都是系统函数的调用,外面说的各种神奇的地方或多或少都有偏差,所以想更好的理解java,一定的c功底还是需要的。
很多人理解java nio直接就是非阻塞io,其实nio是new io的简称,从代码的角度看,旧的io是所有的数据都在java堆中的,而新的io其实更多的io数据在直接内存里,减少了native堆到java堆的拷贝。

上一篇下一篇

猜你喜欢

热点阅读