Okio源码 - 解析

2019-02-21  本文已影响0人  YocnZhao

本文基于版本

compile 'com.squareup.okio:okio:2.2.2'

前段时间看OKHttp,发现OKHttp是基于okio的,之前的时候写文件操作很多,当时也知道okio,但是并没有实际使用,现在也赶到这儿了,就啃一下okio这块骨头

二话不必说,看源码
Okio是okio开放出来的接口,理论上来说所有的调用的入口都是Okio这个类。Okio类的注释也很简单,提供了使用Okio必要的API。

/** Essential APIs for working with Okio. */
public final class Okio {
  private Okio() {
  }
  public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }
 public static BufferedSink buffer(Sink sink) {
    return new RealBufferedSink(sink);
  }

  /** Returns a sink that writes to {@code out}. */
  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }

  private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }
      @Override public void flush() throws IOException {out.flush();}
      @Override public void close() throws IOException {out.close();}
      @Override public Timeout timeout() {return timeout;}
      @Override public String toString() { return "sink(" + out + ")";}
    };
  }
}

Okio类基本上是返回了两种类型,Sink跟Source,我们知道这两个东西在Okio中就相当于java io的OutputStream跟InputStream,最基础的IO接口。然后使用buffer方法得到BufferedSource或者BufferedSink。然后调用BufferedSource或者BufferedSink的方法,我们到BufferedSource中可以看到基本上所有的RealBufferedSource的功能都是通过Buffer类来实现的,我们来看下Buffer类和他的基础类Segment。

  1. Buffer
  2. Segment
  3. SegmentPool

这几个是看下去的先决条件,So,先分析这几个的作用。从简单的看起,先看SegmentPool

final class SegmentPool {
  /** The maximum number of bytes to pool. */
  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.
  /** Singly-linked list of segments. */
  static @Nullable Segment next;
  /** Total bytes in this pool. */
  static long byteCount;

  private SegmentPool() {
  }

  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
  }

/*segment加到单链表的最前面*/
  static void recycle(Segment segment) {
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      byteCount += Segment.SIZE;//byteCount加上一个Segment的size
      segment.next = next;//新加进来的segment的next赋值为当前pool的next
      segment.pos = segment.limit = 0;//pos limit都赋值为0
      next = segment;//当前pool的next赋值为加进来的segment,也就是加到最前面
    }
  }
}

SegmentPool是保存无用的Segments的单链表,用来避免内存抖动和零填充,是个线程安全的静态单例。两个方法:

  1. recycle(Segment segment) //加segment进来
  2. take() //取一个segment走

代码很简单,in的时候最新的放到链表最前面,out的时候把之前面的取走。用一个叫做next的Segment标记应该取走哪一个。最大容量为64 * 1024,而每个segment的大小是8 * 1024,所以最多能装8个Segment。
SegmentPool就是单纯的一个避免新建Segment的容器,需要Segment的时候从这里取一个就行了,就不用new一个Segment。
new Segment的坏处是当Java开辟一块新的空间的时候,runtime必须把这块空间的所有byte都置为零才能返回给你,而很多时候这是不必要的。SegmentPool就是Okio来避免这些操作的处理。

Segment是缓存的一个片段,buffer中的segment是一个循环链表,pool中的segment是单链表。

final class Segment {
  /** The size of all segments in bytes. */
  static final int SIZE = 8192;//segment的size
  /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
  static final int SHARE_MINIMUM = 1024;//最小分割size
  final byte[] data;//存储的数据
  /** The next byte of application data byte to read in this segment. */
  int pos;//能读取到的第一个byte,也就是byte数据起点
  /** The first byte of available data ready to be written to. */
  int limit;//能被写入的第一个byte的位置,也就是最后一个数据的位置
  /** True if other segments or byte strings use the same byte array. */
  boolean shared;//是否共享
  /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
  boolean owner;//能否被修改
  /** Next segment in a linked or circularly-linked list. */
  Segment next;//下一个Segment
  /** Previous segment in a circularly-linked list. */
  Segment prev;//上一个Segment

  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  }

  Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
    this.data = data;
    this.pos = pos;
    this.limit = limit;
    this.shared = shared;
    this.owner = owner;
  }
  final Segment sharedCopy() {//返回当前的数据拷贝,new的Segment是shared的
    shared = true;
    return new Segment(data, pos, limit, true, false);
  }
  /** Returns a new segment that its own private copy of the underlying byte array. */
  final Segment unsharedCopy() {//区别于sharedCopy。返回一份clone
    return new Segment(data.clone(), pos, limit, false, true);
  }
  /**
   * Removes this segment of a circularly-linked list and returns its successor.
   * Returns null if the list is now empty.
   */
  public final @Nullable Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

public final Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }
/**
   * Splits this head of a circularly-linked list into two segments. The first
   * segment contains the data in {@code [pos..pos+byteCount)}. The second
   * segment contains the data in {@code [pos+byteCount..limit)}. This can be
   * useful when moving partial segments from one buffer to another.
   *
   * <p>Returns the new head of the circularly-linked list.
   */
  public final Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

   // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
      prefix = sharedCopy();
    } else {
      prefix = SegmentPool.take();
//public static void arraycopy(Object src, int srcPos, Object dest, int destPos, int length)
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }
...
}

我们捡重点说,Segment主要数据存储在data中,这个data是一个size是8192的byte数组。pos是有效数据开始位置,limit是结束位置:


data数据开始结束

我们看上面的几个主要方法来理解它的主要参数:
pop和push很简单就是简单的单链表操作。
pop把自己从链表中拆出去,并吧自己的pre和next置null。
push把新Segment放到自己后面。


pop push图解
我们看split(int byteCount),byteCount是想要分隔的数量,先if判断。小于零或者大于实际data数据肯定是不行的。
判断是否大于SHARE_MINIMUM,是的话就直接返回copy,share自己的数据。反之,从SegmentPool中取一个unused的Segment,copy byteCount以内的数据过去(把自己data的pos开始的数据拷贝到prefix.data的0开始的位置,数量是byteCount)。然后把prefix.limit置成pos+byteCount,自己的数据开始位置pos+=byteCount,然后把新的Segment放到自己的prev后面,也就是放到prev和自己之间。 spilt方法图解 点图片看大图

Segment大致就是这样。然后来看Buffer,Buffer是个大类,我们先从简单用法入手,假如我们有个txt文件,我们要把这里面的内容读出来并打印,我们可能会这样调用

private static void readSimpleString(File file) {
        try {
            Source source = Okio.source(file);
            BufferedSource bufferedSource = Okio.buffer(source);
            String read = bufferedSource.readString(Charset.forName("utf-8"));
            LogUtil.d("read->" + read);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

我们实际上在拿着BufferedSource在做操作,我们去看BufferedSource的readString

 @Override public String readString(Charset charset) throws IOException {
    if (charset == null) throw new IllegalArgumentException("charset == null");

    buffer.writeAll(source);
    return buffer.readString(charset);
  }

  @Override public String readString(long byteCount, Charset charset) throws IOException {
    require(byteCount);
    if (charset == null) throw new IllegalArgumentException("charset == null");
    return buffer.readString(byteCount, charset);
  }

我们看实际上是调了buffer的readString,我们看Buffer类:

@Override public String readString(Charset charset) {
    try {
      return readString(size, charset);
    } catch (EOFException e) {
      throw new AssertionError(e);
    }
  }

  @Override public String readString(long byteCount, Charset charset) throws EOFException {
    checkOffsetAndCount(size, 0, byteCount);
    if (charset == null) throw new IllegalArgumentException("charset == null");
    if (byteCount > Integer.MAX_VALUE) {
      throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
    }
    if (byteCount == 0) return "";

    Segment s = head;
    if (s.pos + byteCount > s.limit) {//如果大于head的limit说明包含多个segment,调用readByteArray
      // If the string spans multiple segments, delegate to readBytes().
      return new String(readByteArray(byteCount), charset);
    }
//一个Segment就能搞定的情况
    String result = new String(s.data, s.pos, (int) byteCount, charset);
    s.pos += byteCount;
    size -= byteCount;
//这个Segment已经读完了,就扔到SegmentPool里面并且得到新的head
    if (s.pos == s.limit) {
      head = s.pop();
      SegmentPool.recycle(s);
    }

    return result;
  }

我们看判断了count,如果count小于head的limit,也就是一个segment就能搞定就完事儿了,当然大部分情况一个Segment是搞不定了,那我们继续往下看readByteArray:

@Override public byte[] readByteArray(long byteCount) throws EOFException {
    checkOffsetAndCount(size, 0, byteCount);
    if (byteCount > Integer.MAX_VALUE) {
      throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
    }

    byte[] result = new byte[(int) byteCount];
    readFully(result);
    return result;
  }

new了一个count size的byte数组,调用了readFully

 @Override public void readFully(byte[] sink) throws EOFException {
    int offset = 0;
    while (offset < sink.length) {
      int read = read(sink, offset, sink.length - offset);
      if (read == -1) throw new EOFException();
      offset += read;
    }
  }

这里用了while循环,使用offest作为判断条件,offest不到length不罢休。这里我们继续看到read里面

 @Override public int read(byte[] sink, int offset, int byteCount) {
    checkOffsetAndCount(sink.length, offset, byteCount);
 //找到当前的head Segment
    Segment s = head;
    if (s == null) return -1;
    int toCopy = Math.min(byteCount, s.limit - s.pos);//得到需要拷贝的数量,是head的有效数据容量还是byteCount
    System.arraycopy(s.data, s.pos, sink, offset, toCopy);//数据拷贝

    s.pos += toCopy;//head 的pos加上已经拷贝的数量
    size -= toCopy;//需要读的总的数量减去已经读到的

    if (s.pos == s.limit) {//读过的segment扔到SegmentPool缓存或者丢掉
      head = s.pop();//s的next作为新的head
      SegmentPool.recycle(s);
    }

    return toCopy;
  }

read方法的三个参数(目标byte数组,已经读了多少,还要读多少),这里读取的逻辑就清晰了,当前head的数据拷贝到目标byte数组sink里面,使用上面讲过的Segment.pop()方法找到next作为新的head等while继续读,读完为止。
这样我们就看完了Okio读一个String的源码。
下面我们看下写:

final class RealBufferedSink implements BufferedSink {
  public final Buffer buffer = new Buffer();
  public final Sink sink;
  boolean closed;

  RealBufferedSink(Sink sink) {
    if (sink == null) throw new NullPointerException("sink == null");
    this.sink = sink;
  }

  @Override public Buffer buffer() {
    return buffer;
  }
...
  @Override public BufferedSink writeString(String string, Charset charset) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.writeString(string, charset);
    return emitCompleteSegments();
  }

@Override public BufferedSink writeByte(int b) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.writeByte(b);
    return emitCompleteSegments();
  }

@Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }
...
}

我们看每个write都是调用到了emitCompleteSegments()这个方法,先获得要写入的count

/**
   * Returns the number of bytes in segments that are not writable. This is the
   * number of bytes that can be flushed immediately to an underlying sink
   * without harming throughput.
   */
  public final long completeSegmentByteCount() {
    long result = size;
    if (result == 0) return 0;

    // Omit the tail if it's still writable.
    Segment tail = head.prev;
    if (tail.limit < Segment.SIZE && tail.owner) {
      result -= tail.limit - tail.pos;
    }

    return result;
  }

再调用sink.write写入,我们应该还记得Okio里获得获取Sink的时候都是返回的一个匿名内部类,这里实现的write

@Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }

还是使用了Segment,循环写入到os里面,完事儿调flush close就写进去了~

这里我们先总结一下看过的东西:

  1. Okio类是我们调用的总的入口,我们得到BufferedSource做读取,BufferedSource最终使用了Buffer类来做实际逻辑操作。
  2. Segment是实际存储byte数组的类,并提供了几个基本方法
    2.1. pop:链表中弹出自己并返回下一个Segment
    2.2. push:添加新的Segment到自己后面
    2.3. spilt:根据SHARE_MINIMUM 切割一个新的Segment出来并放到链表里面(是否共享)
  3. SegmentPool保存已经使用完的Segment来避免内存抖动和零填充(JVM开辟内存时置零的消耗),SegmentPool只能存8个Segment,多了直接丢掉。
    3.1. recycle(Segment segment) //加新的segment进来
    3.2. take() //取一个segment走
  4. Buffer类是实际做读写逻辑的类,readString的时候是使用readFully里面的while循环read,read里面使用Segment的pop方法不断获取next的Segment并读取data的内容一直到结束。
上一篇下一篇

猜你喜欢

热点阅读