工作生活NIO

Java Nio 之高级搬运工(FileChannel)二

2019-09-25  本文已影响0人  Unyielding_L

Java Nio 系列
Java Nio 之Buffer
Java Nio 之直接内存
Java Nio 之高级搬砖工(FileChannel) 一

前言

前段时间同事分享了一篇文章给我:为什么Kafka速度这么快? ,这篇文章相信大家也都看了。这篇文章说Kafka 有个作弊的技能 :直接从文件某个位置处读取某个长度的字节直接发送给消费者,不需要读到应用程序里然后缓存在ByteBuffer 然后再往 客户端写;当时就对这项技术很着迷,上网搜了很多资料 很纳闷它是怎么实现的;上周在介绍FileChannel 的时候本来想只写一篇文章的,后来看到了它的map 、transeferTo以及TranseferFrom 方法就觉得一篇文章写不完,因为冗长的文章谁都不想看,所以另写一篇来研究一下 FileChannel 的高性能之处,以及介绍下Kafka是怎么使用的。

谈谈零拷贝

牢骚一下

Kafka 的高性能的重要点之一就在零拷贝上。零拷贝不是真的零拷贝,只不过是减少了拷贝的次数,为的不是减少DMA的拷贝次数,而是CPU 的拷贝次数,为啥呢?因为拷贝是个很简单的操作,占着CPU 的时间片简直就是高射炮打蚊子。

传统 Linux 服务器 传输数据 的流程

零拷贝的两种实现方式

mmap + write 方式

void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
int munmap(void *addr, size_t length);
调用该函数可以解除 映射对象与addr 处开始的length 长度的内存空间的映射关系

addr mmap 函数返回的映射区域首地址
length 映射区域的长度

int msync ( void * addr , size_t len, int flags) ;
一般情况下 对映射空间的共享内容更改不会直接写到文件里,当然执行完 munmap 函数也可以,除了执行它,还可以执行 msync 函数来将修改的共享内容同步到文件

说说mmap + write 的 流程

来看看Java 下的mmap

抽象类 MappedByteBuffer

定义

直接字节缓冲区,其内容是文件的内存映射区域。可由FileChannel#map 方法创建。该类通过增加对内存映射区域的特定操作扩展了ByteFuffer 类。映射字节缓冲区与它所映射的文件直到它自己被垃圾回收之前都是存在的。

tips

映射字节缓冲区的内容任何时候都可以被修改,例如,映射文件对应的区域被当前程序或者其他程序所更改。至于是否发生或者什么时候发生,都由操作系统来决定。
映射字节缓冲区的部分或者全部在任何时候都会变得不可访问,例如,映射的文件被截断了。尝试访问不可访问的映射字节的缓冲区的那一部分,将会有不友好的异常抛出。需要强烈的提醒,避免让当前程序或者其他程序对这个映射文件进行操作,除了读或者写它的内容。

方法

load() 该方法会尽最大可能将映射文件里的内容加载到物理内存中,可能会在加载的时候导致一些页面错误和IO操作。
isLoad() 返回 映射文件内容是否驻留在物理内存中。
force() 对映射内存区域的写入,并不会直接同步到文件中, 在解除映射关系的时候修改的内容才会同步到文件中。 调用该方法会将对映射区域的修改同步到磁盘,这就与上面的方法msync方法对应。

FileChannel # map 方法

方法签名

public abstract MappedByteBuffer map(MapMode mode,long position, long size)  throws IOException;

参数小解

mode 为MapMode 中的READ_ONLY,READ_WRITE,PRIVATE中的其中一个,分别表示 只读,可读可写和写时复制与上述 mmap 方法中flags参数对应
position 从文件的哪里开始映射,对应上述 mmap 方法 中的offset参数
size 从文件position处开始映射多少个字节

Java 语言实现 mmap+write

简述:将文件a.txt 中的0到14个字节发给服务端

package zym.netty.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

/**
 * file channel map study
 *
 * @author 24160
 */
public class FileChannelMapStudy {

    public static final String FILE_CHANNEL_MAP_STUDY_TXT = "a.txt";
    public static final int INT_BYTES_LENGTH = 4;

    public static void main(String[] args) {
        prepareEnviroment();
        try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.READ)) {
            long size = fileChannel.size();
            //将a.txt 文件映射到内存缓冲区,从0位置处映射,映射10个字节长度,该映射内存缓冲区只可读
            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 14);
            //创建一个SocketChannel实例
            SocketChannel client = SocketChannel.open();
            //连接服务端
            client.connect(new InetSocketAddress("127.0.0.1", 8080));
            //写文件内容到服务端
            client.write(mappedByteBuffer);
            //读取文件内容 网络协议为 head + body  如6zengyi
            ByteBuffer head = ByteBuffer.allocate(INT_BYTES_LENGTH);
            while (client.read(head) != 0) {}
            //切换读写模式
            head.flip();
            //读取body
            ByteBuffer body = ByteBuffer.allocate(head.getInt());
            while (client.read(body) != 0) {}
            //切换读写模式
            body.flip();
            System.out.println(String.format("发送字节成功,服务端返回:%s", new String(body.array())));

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void prepareEnviroment() {
        try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.CREATE,StandardOpenOption.READ, StandardOpenOption.WRITE)) {
            //将a.txt 映射文件到内存映射区域,模式为可读可写
            MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 14);
            //放进去一个int 为10
            mappedByteBuffer.putInt(10);
            mappedByteBuffer.put("zengyiming".getBytes());
            //强制刷盘
            mappedByteBuffer.force();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

服务端代码详见:NioServer.java
下面我们来看看kafka 是如何使用mmap,kafka AbstractIndex.scala 代码片段

 @volatile
  protected var mmap: MappedByteBuffer = {
    val newlyCreated = file.createNewFile()
    val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
    try {
      /* 如果是新创建则给file 预留分配空间 maxIndexSize 不超过50MB 单位为字节 */
      if(newlyCreated) {
        if(maxIndexSize < entrySize)
          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
        raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
      }

      /* memory-map the file */
    /* 开始内存映射文件*/
      _length = raf.length()
      val idx = {
        if (writable)
          /*如果可写,则映射模式为可读可写*/
          raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
        else
        /*若可读,则映射模式为可读*/
          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
      }
      /* set the position in the index for the next entry */
    /*为下一个条目 设置 buffer 中的position值*/
      if(newlyCreated)
        idx.position(0)
      else
        // if this is a pre-existing index, assume it is valid and set position to last entry
        //如果这是一个预先存在的索引,则假设它有效并将位置设置为最后一个条目
        idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
      idx
    } finally {
      CoreUtils.swallow(raf.close(), AbstractIndex)
    }
  }

kafka 的索引文件是映射到内存映射区域的,对消息偏移量的读写都是基于MappedByteBuffer 之上,当然牛逼的kafka 作者们 发明了一个简单且缓存命中友好的二叉查找算法,这个算法有机会和大家聊下。

上一篇 下一篇

猜你喜欢

热点阅读