Ok I/O 简析

2019-06-09  本文已影响0人  莫库施勒

java 的NIO和阻塞I/O

  1. 阻塞I/O通信模式:调用InputStream.read()方法时是阻塞的,它会一直等到数据到来时才返回
  2. NIO通信模式:在JDK1.4开始,是一种非阻塞I/O,在Java NIO的服务端由一个专门的线程来处理所有I/O事件,并负责分发;线程之间通讯通过wait和notify等方式。

输入与输出

Sink 与 Source
在JDK里面有InputStream和OutputStream两个接口,Source和Sink类似于InputStream和OutputStream,是io操作的顶级接口类,这两个接口均实现了Closeable接口。所以可以把Source简单的看成InputStream,Sink简单看成OutputStream。
子类
其中 BufferedXXX 对 write() 方法扩展了许多参数,是一个接口,而 RealBufferedXXX 则是真正的具体实现
其他子类

Segment

okio将数据分割成一块块的片段,内部维护者固定长度的byte[]数组,同时segment拥有前面节点和后面节点,构成一个双向循环链表。
分片中使用数组存储,兼具读的连续性,以及写的可插入性,对比单一使用链表或者数组,是一种折中的方案,读写更快,而且有个好处根据需求改动分片的大小来权衡读写的业务操作,另外,segment也有一些内置的优化操作


segment

compact()方法(压缩机制)
除了写入数据之外,segment还有一个优化的技巧,因为每个segment的片段size是固定的,为了防止经过长时间的使用后,每个segment中的数据被分割的十分严重,可能一个很小的数据却占据了整个segment,所以有了一个压缩机制。

  public void compact() {
    //上一个节点就是自己,意味着就一个节点,无法压缩,抛异常
    if (prev == this) throw new IllegalStateException();
     //如果上一个节点不是自己的,所以你是没有权利压缩的
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    //能进来说明,存在上一个节点,且上一个节点是自己的,可以压缩
    //记录当前Segment具有的数据,数据大小为limit-pos
    int byteCount = limit - pos;
    // 统计前结点是否被共享,如果共享则只记录Size-limit大小,
    // 如果没有被共享,则加上pre.pos之前的空位置;
    //本行代码主要是获取前一个segment的可用空间。
    // 先判断prev是否是共享的,如果是共享的,则只记录SIZE-limit,
    // 如果没有共享则记录SIZE-limit加上prev.pos之前的空位置
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
   //判断prev的空余空间是否能够容纳Segment的全部数据,不能容纳则返回
    if (byteCount > availableByteCount) return; 
    //能容纳则将自己的这个部分数据写入上一个Segment
    writeTo(prev, byteCount);
    //讲当前Segment从Segment链表中移除
    pop();
    //回收该Segment
    SegmentPool.recycle(this);
  }

总结下上述代码:如果前面的Segment是共享的,那么不可写,也不能压缩,接着判断前一个的剩余大小是否比当前空间大,如果有足够的空间来容纳数据,调用前面的writeTo方法写入数据,写完以后,移除当前segment,并回收segment。

split()方法(共享机制)
为了减少数据复制带来的性能开销。
先把Segment一分为二,将(pos + 1, pos + btyeCount - 1)的内容给新的Segment,将(pos + byteCount, limit - 1)的内容留给自己.

SegemtnPool

SegmentPool是一个Segment池,由一个单项链表构成。该池负责Segment的回收和闲置Segment管理,也就是说Buffer使用的Segment是从Segment单项链表中取出的,这样有效的避免了GC频率.

//一个Segment记录的最大长度是8192,因此SegmentPool只能存储8个Segment
static final long MAX_SIZE = 64 * 1024;
//该SegmentPool存储了一个回收Segment的链表
static Segment next;
//该值记录了当前所有Segment的总大小,最大值是为MAX_SIZE
static long byteCount;  

它的两个重要的方法 take(), recycle()

  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
  }

  static void recycle(Segment segment) {
    //如果这个要回收的Segment被前后引用,则无法回收
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    //如果这个要回收的Segment的数据是分享的,则无法回收
    if (segment.shared) return; // This segment cannot be recycled.
    //加锁
    synchronized (SegmentPool.class) {
      //如果 这个空间已经不足以再放入一个空的Segment,则不回收
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      //设置SegmentPool的池大小
      byteCount += Segment.SIZE;
      //segment的下一个指向头
      segment.next = next;
      //设置segment的可读写位置为0
      segment.pos = segment.limit = 0;
      //设置当前segment为头
      next = segment;
    }
  }

ByteString

ByteString存储的是不可变比特序列.final btye[] data
本质就是一个byte序列(数组),以制定的编码格式进行解码。目前支持的解码规则有hex,base64和UTF-8等。

Buffer

Buffer存储的是可变比特序列,需要注意的是Buffer内部对比特数据的存储不是直接使用一个byte数组那么简单,它使用了一种新的数据类型Segment进行存储。

Buffer持有一个Segment的引用,所以通过这个引用能拿到整个链表中的所有数据。
同时Buffer实现了三个接口,读,写以及clone。

  //看到注释的第一句话,我就知道了是深拷贝,哈哈!
  /** Returns a deep copy of this buffer. */
  @Override public Buffer clone() {
    //先new了一个Buffer对象
    Buffer result = new Buffer();
    //如果size==0,说明这个Buffer是空的,所以直接返回即可
    if (size == 0) return result;
    //如果size!=0,说明这个Buffer是有数据的,然后吧head指向这个copy的head,PS大家回想下Segment的这个构造函数,里面是怎么操作的?
    result.head = new Segment(head);
    //然后设置copy的head的next和prev的值
    result.head.next = result.head.prev = result.head;
    //开始遍历这个Buffer持有的Segment链了
    for (Segment s = head.next; s != head; s = s.next) {
      result.head.prev.push(new Segment(s));
    }
    result.size = size;
    return result;
  }

超时

 private static final class Watchdog extends Thread {
    public Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }

    public void run() {
      while (true) {
        try {
          AsyncTimeout timedOut;
          synchronized (AsyncTimeout.class) {
            timedOut = awaitTimeout();

            // Didn't find a node to interrupt. Try again.
            if (timedOut == null) continue;

            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            if (timedOut == head) {
              head = null;
              return;
            }
          }

          // Close the timed out node.
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
  }

这里的WatchDog只是一个继承于Thread的一类,里面的run方法执行的就是超时的判断,之所以在socket写时采取异步超时,这完全是由socket自身的性质决定的,socket经常会阻塞自己,导致下面的事情执行不了。

enter() 和 exit()

  public final Source source(final Source source) {
    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        boolean throwOnTimeout = false;
        enter();
        try {
          long result = source.read(sink, byteCount);
          throwOnTimeout = true;
          return result;
        } catch (IOException e) {
          throw exit(e);
        } finally {
          exit(throwOnTimeout);
        }
      }
 }

private static synchronized void scheduleTimeout(
      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    //head==null,表明之前没有,本次是第一次操作,开启Watchdog守护线程
    if (head == null) {
      head = new AsyncTimeout();
      new Watchdog().start();
    }

    long now = System.nanoTime();
    //如果有 deadLine,并且超时时长不为0
    if (timeoutNanos != 0 && hasDeadline) {
      //对比最长限制和超时时长,选择最小的那个值
      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if (timeoutNanos != 0) {
      //如果没有最长限制,但是超时时长不为0,则使用超时时长
      node.timeoutAt = now + timeoutNanos;
    } else if (hasDeadline) {
      //如果有最长限制,但是超时时长为0,则使用最长限制
      node.timeoutAt = node.deadlineNanoTime();
    } else {
     //如果既没有最长限制,和超时时长,则抛异常
      throw new AssertionError();
    }

    // 按照排序顺序插入
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) {
      //如果下一个为null或者剩余时间比下一个短 就插入node
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
        node.next = prev.next;
        prev.next = node;
        if (prev == head) {
          // 唤醒 watchdog
          AsyncTimeout.class.notify(); 
        }
        break;
      }
    }
  }

Okio的特点

上一篇下一篇

猜你喜欢

热点阅读