Java

Okio源码框架分析

2021-09-05  本文已影响0人  wo883721

android 开发的一定听过 square 这家公司,也肯定使用过他们家的产品,比如 retrofit,picasso,okhttp,okio等等。
今天我们就来分析 Okio 这个开源框架的实现原理,它本来是大名鼎鼎的处理http 请求的开源框架 OkHttp 的一部分,后来被独立出来,专门来处理 javaIO 流的。

说起这个框架,曾经在几年前详细阅读过它的源码,还发现了源码中的一个 BUG,提交给官方,让他们修改了,当时感觉有点成就感,后来养成了阅读源码的习惯。

为了让 java 后端的人员也能使用 OkhttpOkio 这些框架,本文分析的 Okio 源码是基于1.17.5 版本的,而不是 2.0 以上 Kotlin 版本。

一. Source 和 Sink

Okio 是用来处理 IO 流的,那么最重要的两个类就是 SourceSinkSource 相当于 InputStream, Sink 相当于 OutputStream

很多人容易弄混 SourceSink,下面我们就好好分析分析。

1.1 Source 和 InputStream

我们说 Source 相当于 InputStream,也就是输入流,它的作用就是从流中读取数据。
怎么读取数据,就先看看它们两个的源码:

public interface Source extends Closeable {
  /**
   * Removes at least 1, and up to {@code byteCount} bytes from this and appends
   * them to {@code sink}. Returns the number of bytes read, or -1 if this
   * source is exhausted.
   */
  long read(Buffer sink, long byteCount) throws IOException;

  /** Returns the timeout for this source. */
  Timeout timeout();

  /**
   * Closes this source and releases the resources held by this source. It is an
   * error to read a closed source. It is safe to close a source more than once.
   */
  @Override void close() throws IOException;
}

Source 只有三个方法:

public abstract class InputStream implements Closeable {
    public abstract int read() throws IOException;
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
    }
    public int read(byte b[], int off, int len) throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        int c = read();
        if (c == -1) {
            return -1;
        }
        b[off] = (byte)c;

        int i = 1;
        try {
            for (; i < len ; i++) {
                c = read();
                if (c == -1) {
                    break;
                }
                b[off + i] = (byte)c;
            }
        } catch (IOException ee) {
        }
        return i;
    }
    public long skip(long n) throws IOException {

        long remaining = n;
        int nr;

        if (n <= 0) {
            return 0;
        }

        int size = (int)Math.min(MAX_SKIP_BUFFER_SIZE, remaining);
        byte[] skipBuffer = new byte[size];
        while (remaining > 0) {
            nr = read(skipBuffer, 0, (int)Math.min(size, remaining));
            if (nr < 0) {
                break;
            }
            remaining -= nr;
        }

        return n - remaining;
    }
    public int available() throws IOException {
        return 0;
    }
    public void close() throws IOException {}
    public synchronized void mark(int readlimit) {}
    public synchronized void reset() throws IOException {
        throw new IOException("mark/reset not supported");
    }
    public boolean markSupported() {
        return false;
    }
}

InputStream 的方法比较多:

仔细比较 SourceInputStream ,你会发现:

1.2 Sink 和 OutputStream

我们说 Sink 相当于OutputStream,也就是输出流,它的作用就是向流中写入数据。
怎么写入数据,就先看看它们两个的源码:

public interface Sink extends Closeable, Flushable {
  /** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
  void write(Buffer source, long byteCount) throws IOException;

  /** Pushes all buffered bytes to their final destination. */
  @Override void flush() throws IOException;

  /** Returns the timeout for this sink. */
  Timeout timeout();

  /**
   * Pushes all buffered bytes to their final destination and releases the
   * resources held by this sink. It is an error to write a closed sink. It is
   * safe to close a sink more than once.
   */
  @Override void close() throws IOException;
}

Sink 只有四个方法:

public abstract class OutputStream implements Closeable, Flushable {
    public abstract void write(int b) throws IOException;
    public void write(byte b[]) throws IOException {
        write(b, 0, b.length);
    }
    public void write(byte b[], int off, int len) throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if ((off < 0) || (off > b.length) || (len < 0) ||
                   ((off + len) > b.length) || ((off + len) < 0)) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return;
        }
        for (int i = 0 ; i < len ; i++) {
            write(b[off + i]);
        }
    }
    public void flush() throws IOException {
    }
    public void close() throws IOException {
    }
}

仔细比较 SinkOutputStream ,你会发现:

1.3 小结

SourceSink 都比较简单,方法很少。

你会发现有一个重要类 Buffer ,它在SourceSink 的方法中都出现了,那它的作用是什么?

Buffer 代表缓存区,它其实是Okio框架的精髓。

而仔细阅读它的源码,你会惊讶地发现,虽然Buffer 中方法非常多,但是它的成员变量只有两个:

所以我们先介绍 Segment

二. Segment

我们知道 Buffer 靠一个链表来存储缓存区的字节数据,而这个链表中的每个节点都是一个 Segment 对象。
Segment 表示缓存区Buffer的一个片段,下面我们来分析这个类。

2.1 成员属性

  /** The size of all segments in bytes. */
  static final int SIZE = 8192;

  /** Segments will be shared when doing so avoids {@code arraycopy()} of this many bytes. */
  static final int SHARE_MINIMUM = 1024;

  final byte[] data;

  /** The next byte of application data byte to read in this segment. */
  int pos;

  /** The first byte of available data ready to be written to. */
  int limit;

  /** True if other segments or byte strings use the same byte array. */
  boolean shared;

  /** True if this segment owns the byte array and can append to it, extending {@code limit}. */
  boolean owner;

  /** Next segment in a linked or circularly-linked list. */
  Segment next;

  /** Previous segment in a circularly-linked list. */
  Segment prev;

这些字段的作用:

2.2 构造方法

  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;
    this.shared = false;
  }

  Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
    this.data = data;
    this.pos = pos;
    this.limit = limit;
    this.shared = shared;
    this.owner = owner;
  }

如果是空惨构造,那么创建的Segment就是独享的片段;否则就根据传递的参数来创建Segment

2.3 sharedCopy 方法

  final Segment sharedCopy() {
    shared = true;
    return new Segment(data, pos, limit, true, false);
  }

返回当前片段的共享片段,当前片段的 shared 属性也设置为 true

共享片段的好处就是不用进行字节数组的复制,共享同一个字节数组,在某些只需要读的场景非常有效。
共享片段也分为两种:

  • 一种是拥有共享数据的片段(即拥有data 字节数组权限的片段,ownertrue),它可以继续写入数据,不会影响共享数据的。
    注意它不一定是当前片段,因为当前片段可能 ownerfalse,也可以继续生成共享片段的。
  • 一种是获取共享数据的片段(这里就是方法返回值),它只能读取共享数据,不能向片段中写入任何数据,因为它没有这个 data 字节数据的权限,ownerfalse

2.4 unsharedCopy 方法

  /** Returns a new segment that its own private copy of the underlying byte array. */
  final Segment unsharedCopy() {
    return new Segment(data.clone(), pos, limit, false, true);
  }

用当前片段生成一个独享片段,所以使用 data.clone() 复制出一个新的字节数组,这样就与当前片段的字节数组data 没有关联了。

当前片段可以是共享片段,也可以是独享片段。

2.5 pop 方法

  /**
   * Removes this segment of a circularly-linked list and returns its successor.
   * Returns null if the list is now empty.
   */
  public final @Nullable Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

将当前片段从双向链表中删除,并返回链表中下一个片段。

next != this 来判断链表是否有后继片段,因为是双向链表,当链表中只有一个数据的时候, next == prev == this
记住双向链表中要删除一个元素,那么就要改变四条引用线,就如上面代码显示。
注意返回值是当前被删除元素的下一个元素。

2.6 push 方法

  /**
   * Appends {@code segment} after this segment in the circularly-linked list.
   * Returns the pushed segment.
   */
  public final Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }

在双向链表中,当前片段后面插入传入片段 segment,并返回这个插入片段segment

在当前片段后面插入 segment,那么也是改变四条引用线。
返回插入的片段 segment

2.7 split 方法

  public final Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
      prefix = sharedCopy();
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }

将当前片段根据 byteCount 值分成两个片段,第一个片段包含[pos..pos+ byteccount]中的数据。第二个片段包含[pos+ byteccount ..limit]中的数据。

这个方法在两个缓存区Buffer 部分片段数据移动的时候非常有用,后面我们会介绍。

我们来分析方法流程:

2.8 compact 方法

  /**
   * Call this when the tail and its predecessor may both be less than half
   * full. This will copy data so that segments can be recycled.
   */
  public final void compact() {
    if (prev == this) throw new IllegalStateException();
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    writeTo(prev, byteCount);
    pop();
    SegmentPool.recycle(this);
  }

将当前片段和它的前一个片段的可用数据进行合并,来回收多余片段。
方法流程:

2.9 writeTo 方法

  public final void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }

    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
  }

byteCount 个字节数据从当前片段移动到片段sink 中。
方法流程:

三. SegmentPool

分析完 Segment,我们简单分析一下 SegmentPool, 它表示 Segment 的实例池。

实例池的作用就是回收使用过的对象实例,当需要新对象时,可以从池中获取,而不用新的创建。
针对大对象的时候,会使用这个方式,避免频繁 GC
例如这里的 Segment对象,因为它的实例中有 8192 个字节的 data

这个类基本上都是静态属性和静态方法。

3.1 静态属性

  /** The maximum number of bytes to pool. */
  // TODO: Is 64 KiB a good maximum size? Do we ever have that many idle segments?
  static final long MAX_SIZE = 64 * 1024; // 64 KiB.

  /** Singly-linked list of segments. */
  static @Nullable Segment next;

  /** Total bytes in this pool. */
  static long byteCount;

3.2 take 方法

  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
  }

从实例池中获取 Segment 对象实例,如果 next == null,表示实例池为空,那么就创建一个新的Segment 对象实例返回。

3.3 recycle 方法

  static void recycle(Segment segment) {
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    if (segment.shared) return; // This segment cannot be recycled.
    synchronized (SegmentPool.class) {
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      byteCount += Segment.SIZE;
      segment.next = next;
      segment.pos = segment.limit = 0;
      next = segment;
    }
  }

回收Segment 对象实例。
方法流程

四. Buffer

缓存区Buffer,你可以把它想象成一个无限的字节数组(Segment 的链表实现)。
那么对缓存区Buffer就两种:

那么有人要问了,那我干嘛要用Buffer,直接用字节数组byte[] 不就可以了么?

是的,你就直接创建一个字节数组byte[],直接存入字节数据,然后读取存入的字节数据,是完全可以的。

但是直接创建字节数组byte[]有两个问题:

  • 首先该创建多大的字节数组,假如我们完全读取输入流中的数据,但是我们不知道输入流数据的大小,就不知道创建长度是多少的字节数组。也就是说单纯的字节数组byte[],没办法动态调节大小的能力。
  • 直接创建字节数组byte[],使用完了之后,它就会被丢弃,等待GC 回收。但是频繁地创建和回收字节数组,本身是很耗性能的,其实使用完的字节数组,我们完全可以重复使用它。

而缓存区Buffer 就解决了上面问题,通过Segment链表来动态调整存储数据大小,通过SegmentPool 来重复使用片段Segment

并且缓存区Buffer提供了很多方法,将各个来源的字节数据存入到缓存区中,或者将缓存区中存储的字节数据写入到其他地方。

4.1 与IO流的交互

它既可以从输入流中读取数据,存到它字节数组中;也可以将存储的数据写入到输出流中。

4.1.1 输入流中读取数据

  /** Read and exhaust bytes from {@code in} to this. */
  public final Buffer readFrom(InputStream in) throws IOException {
    readFrom(in, Long.MAX_VALUE, true);
    return this;
  }

  /** Read {@code byteCount} bytes from {@code in} to this. */
  public final Buffer readFrom(InputStream in, long byteCount) throws IOException {
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    readFrom(in, byteCount, false);
    return this;
  }

  private void readFrom(InputStream in, long byteCount, boolean forever) throws IOException {
    if (in == null) throw new IllegalArgumentException("in == null");
    while (byteCount > 0 || forever) {
      Segment tail = writableSegment(1);
      int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
      int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
      if (bytesRead == -1) {
        if (tail.pos == tail.limit) {
          // We allocated a tail segment, but didn't end up needing it. Recycle!
          head = tail.pop();
          SegmentPool.recycle(tail);
        }
        if (forever) return;
        throw new EOFException();
      }
      tail.limit += bytesRead;
      size += bytesRead;
      byteCount -= bytesRead;
    }
  }

主要分析最后一个方法,它有三个参数:

方法流程分析

你们有没有注意到,这个方法实现逻辑上是有小问题的,判断是否要继续从输入流中读取数据,是通过 byteCount > 0 || forever

  • byteCount > 0 表示没有读取想要的字节个数,那么继续从输入流中读取。
  • forever = true 说明 while 循环为true,一直要读取到 bytesRead == -1,即输入流结尾,才能返回。

但是不管forever 是否为true,方法结尾都调用了byteCount -= bytesRead;,都将byteCount的值减去bytesRead,那么循环下去,会将这个值byteCount变成0
这个时候你会发现陷入了死循环,因为 in.read(tail.data, tail.limit, maxToCopy) 方法,当maxToCopy == 0 的时候,返回值 bytesRead 也是 0,那么就会一直死循环。
虽然当前实现中,forever == true 只有调用 readFrom(InputStream in) 方法才会出现,并且将byteCount 设置成了Long.MAX_VALUE,足够读取了。但是从代码实现逻辑来说,这里就是bug;当forever == true 的时候,就不需要执行byteCount -= bytesRead

后来我将这个问题反馈给了官方,官方给了我下面的问答:


Okio_2.jpg

4.1.2 向输出流中写入数据

  /** Write the contents of this to {@code out}. */
  public final Buffer writeTo(OutputStream out) throws IOException {
    return writeTo(out, size);
  }

  /** Write {@code byteCount} bytes from this to {@code out}. */
  public final Buffer writeTo(OutputStream out, long byteCount) throws IOException {
    if (out == null) throw new IllegalArgumentException("out == null");
    checkOffsetAndCount(size, 0, byteCount);

    Segment s = head;
    while (byteCount > 0) {
      int toCopy = (int) Math.min(byteCount, s.limit - s.pos);
      out.write(s.data, s.pos, toCopy);

      s.pos += toCopy;
      size -= toCopy;
      byteCount -= toCopy;

      if (s.pos == s.limit) {
        Segment toRecycle = s;
        head = s = toRecycle.pop();
        SegmentPool.recycle(toRecycle);
      }
    }

    return this;
  }

将缓存区Buffer 中的byteCount 个字节数据写入输出流out 中。
方法流程

4.1.3 copy数据到输出流

 /** Copy the contents of this to {@code out}. */
  public final Buffer copyTo(OutputStream out) throws IOException {
    return copyTo(out, 0, size);
  }

  /**
   * Copy {@code byteCount} bytes from this, starting at {@code offset}, to
   * {@code out}.
   */
  public final Buffer copyTo(OutputStream out, long offset, long byteCount) throws IOException {
    if (out == null) throw new IllegalArgumentException("out == null");
    checkOffsetAndCount(size, offset, byteCount);
    if (byteCount == 0) return this;

    // Skip segments that we aren't copying from.
    Segment s = head;
    for (; offset >= (s.limit - s.pos); s = s.next) {
      offset -= (s.limit - s.pos);
    }

    // Copy from one segment at a time.
    for (; byteCount > 0; s = s.next) {
      int pos = (int) (s.pos + offset);
      int toCopy = (int) Math.min(s.limit - pos, byteCount);
      out.write(s.data, pos, toCopy);
      byteCount -= toCopy;
      offset = 0;
    }
    return this;
  }

将缓存区Bufferoffset 位置开始,复制 byteCount 个字节数据到输出流中。
方法流程

copyTowriteTo 都可以向输出流中,写入当前缓存区中的数据。
它们之间的区别就是:

  • writeTo 方法,向输出流写入数据后,本身缓存区中的数据也相当于要丢弃了,也就是说会改变链表中的片段Segment,并且缓存区数据大小size 值也要改变。
  • copyTo 方法不会改变链表中的片段Segment,以及缓存区数据大小值size
  • copyTo方法可以指定位置offset 位置开始复制,而writeTo方法,必须从开始位置的数据写入到输出流中。

4.2 与其他缓存区Buffer 的交互

缓存区也可以从其他缓存区中读取字节数据,存储到当前缓存区中;
也可以将当前缓存中存储的字节数据写入到其他缓存区中。

这个是缓存区Buffer 的精髓,两个缓存区直接数据转移,非常高效。

还记得SourceSink 类么?

  • Sourceread(Buffer sink, long byteCount) 方法将读取输出流Source中的字节数据,存入到缓存区sink中。
  • Sinkwrite(Buffer source, long byteCount) 方法将缓存区source中存储的数据写入到输出流Sink 中。
    而它们直接大部分情况下,就用的是缓存区之间的数据迁移,因为我们大部分都使用它们的子类BufferedSourceBufferedSink

4.2.1 向当前缓存区中写入数据

  @Override public void write(Buffer source, long byteCount) {
 if (source == null) throw new IllegalArgumentException("source == null");
    if (source == this) throw new IllegalArgumentException("source == this");
    checkOffsetAndCount(source.size, 0, byteCount);

    while (byteCount > 0) {
      // Is a prefix of the source's head segment all that we need to move?
      if (byteCount < (source.head.limit - source.head.pos)) {
        Segment tail = head != null ? head.prev : null;
        if (tail != null && tail.owner
            && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
          // Our existing segments are sufficient. Move bytes from source's head to our tail.
          source.head.writeTo(tail, (int) byteCount);
          source.size -= byteCount;
          size += byteCount;
          return;
        } else {
          // We're going to need another segment. Split the source's head
          // segment in two, then move the first of those two to this buffer.
          source.head = source.head.split((int) byteCount);
        }
      }

      // Remove the source's head segment and append it to our tail.
      Segment segmentToMove = source.head;
      long movedByteCount = segmentToMove.limit - segmentToMove.pos;
      source.head = segmentToMove.pop();
      if (head == null) {
        head = segmentToMove;
        head.next = head.prev = head;
      } else {
        Segment tail = head.prev;
        tail = tail.push(segmentToMove);
        tail.compact();
      }
      source.size -= movedByteCount;
      size += movedByteCount;
      byteCount -= movedByteCount;
    }
  }

从其他缓存区source 中读取byteCount个字节数据写入到当前缓存区中。

这个是缓存区之间数据交互,最重要的方法,其他方法都是基于它的。
你就把它当成 write(byte[] source, int offset, int byteCount), 将缓存区source看成字节数组,就是将缓存区source中的一部分数据写入到当前缓存区中。

方法流程

这个方法已经分析完毕了,你会发现两个缓存区数据迁移的时候,大部分情况下,都是直接片段迁移,而不需要复制字节数据,所以非常快速高效。

4.2.2 从当前缓存区读取数据

  @Override public long read(Buffer sink, long byteCount) {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (size == 0) return -1L;
    if (byteCount > size) byteCount = size;
    sink.write(this, byteCount);
    return byteCount;
  }

从当前缓存区中读取 byteCount个字节数据,写入到缓存区sink 中。
你会发现方法中直接调用了 sink.write(this, byteCount) 来实现功能。

仔细分析一下,sink.write(this, byteCount) 就是从来源缓存区this 中读取byteCount个字节数据,写入到缓存区sink 中。

4.2.3 将当前缓存区内复制到其他缓存区

  /** Copy {@code byteCount} bytes from this, starting at {@code offset}, to {@code out}. */
  public final Buffer copyTo(Buffer out, long offset, long byteCount) {
    if (out == null) throw new IllegalArgumentException("out == null");
    checkOffsetAndCount(size, offset, byteCount);
    if (byteCount == 0) return this;

    out.size += byteCount;

    // Skip segments that we aren't copying from.
    Segment s = head;
    for (; offset >= (s.limit - s.pos); s = s.next) {
      offset -= (s.limit - s.pos);
    }

    // Copy one segment at a time.
    for (; byteCount > 0; s = s.next) {
      Segment copy = s.sharedCopy();
      copy.pos += offset;
      copy.limit = Math.min(copy.pos + (int) byteCount, copy.limit);
      if (out.head == null) {
        out.head = copy.next = copy.prev = copy;
      } else {
        out.head.prev.push(copy);
      }
      byteCount -= copy.limit - copy.pos;
      offset = 0;
    }

    return this;
  }

将当前缓存区从offset位置开始的byteCount 个字节数据复制到缓存区out 中。
方法大体流程

4.3 与字节数组的交互

4.3.1 将字节数组中的数据写入缓存区

  @Override public Buffer write(byte[] source) {
    if (source == null) throw new IllegalArgumentException("source == null");
    return write(source, 0, source.length);
  }

  @Override public Buffer write(byte[] source, int offset, int byteCount) {
    if (source == null) throw new IllegalArgumentException("source == null");
    checkOffsetAndCount(source.length, offset, byteCount);

    int limit = offset + byteCount;
    while (offset < limit) {
      Segment tail = writableSegment(1);

      int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);
      System.arraycopy(source, offset, tail.data, tail.limit, toCopy);

      offset += toCopy;
      tail.limit += toCopy;
    }

    size += byteCount;
    return this;
  }

将字节数组source 的一部分写入到当前缓存区中。
仔细分析方法:

4.3.2 将缓存区字节数据读取到字节数组中

  @Override public int read(byte[] sink) {
    return read(sink, 0, sink.length);
  }
  @Override public int read(byte[] sink, int offset, int byteCount) {
    checkOffsetAndCount(sink.length, offset, byteCount);

    Segment s = head;
    if (s == null) return -1;
    int toCopy = Math.min(byteCount, s.limit - s.pos);
    System.arraycopy(s.data, s.pos, sink, offset, toCopy);

    s.pos += toCopy;
    size -= toCopy;

    if (s.pos == s.limit) {
      head = s.pop();
      SegmentPool.recycle(s);
    }
    return toCopy;
  }

将当前缓存区一定数量字节数据写入到字节数组sink 中。
方法分析:

注意: read 方法没有使用循环,来保证一定读取byteCount个字节数据。那么因为读取方法,不强求一定要读取的字节个数,而是返回值会告诉调用者实际读取字节的个数。

4.4 小结

Buffer 类中还有很多方法,大家自己分析就可以了。
你只要记住,Buffer 就是一个内存中存储字节数据的类,它可以动态地扩展存储空间的大小,并且有高效的内存回收和复用。

五. BufferedSource 和 BufferedSink

我们知道 Source 代表输入流,Sink 代表输出流,它们分别有:

但是你想用它们的时候, 还需要自己创建一个缓存区Buffer,才能使用。而且它们提供的方法也的确太有限了,为此,Okio 提供了 BufferedSourceBufferedSink 两个类,让我们方便使用输入流和输出流。

5.1 BufferedSource

对于输入流,可能我们有下面要求:

BufferedSource 中还有很多其他的方法,大家自己看一下。

5.2 BufferedSink

对于输出流,可能我们有下面要求:

BufferedSink 中还有很多其他的方法,大家自己看一下。

六. RealBufferedSource 和 RealBufferedSink

它们是BufferedSourceBufferedSink的实现类。
SourceSink 进行缓存区包装,什么意思呢?

简单地说,就是它们内部都有一个缓存区 Buffer 属性,任何对输入流或者输出流的操作,先将数据存入缓存区,再进行操作。下面我们就进行分析。

6.1 RealBufferedSource

6.1.1 成员属性

  public final Buffer buffer = new Buffer();
  public final Source source;
  boolean closed;

6.1.2 构造方法

  RealBufferedSource(Source source) {
    if (source == null) throw new NullPointerException("source == null");
    this.source = source;
  }

只有一个构造函数,必须包裹一个输入流 source,因为RealBufferedSource 就是辅助输入流 source存在的。

6.1.3 read(Buffer sink, long byteCount)

  @Override public long read(Buffer sink, long byteCount) throws IOException {
    if (sink == null) throw new IllegalArgumentException("sink == null");
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");

    if (buffer.size == 0) {
      long read = source.read(buffer, Segment.SIZE);
      if (read == -1) return -1;
    }

    long toRead = Math.min(byteCount, buffer.size);
    return buffer.read(sink, toRead);
  }

这个是 Source 类的方法,表示从输入流中读取一定数量的字节数据存入缓存区sink中。
方法分析

6.1.4 require(long byteCount) 和 request(long byteCount)

  @Override public void require(long byteCount) throws IOException {
    if (!request(byteCount)) throw new EOFException();
  }

  @Override public boolean request(long byteCount) throws IOException {
    if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
    if (closed) throw new IllegalStateException("closed");
    while (buffer.size < byteCount) {
      if (source.read(buffer, Segment.SIZE) == -1) return false;
    }
    return true;
  }

这两个方法作用就是当前缓存区buffer 的数据不够 byteCount 的大小了,要从包裹输入流 source 读取一定的数据。

6.1.5 read 字节数组的方法

  @Override public int read(byte[] sink) throws IOException {
    return read(sink, 0, sink.length);
  }

  @Override public int read(byte[] sink, int offset, int byteCount) throws IOException {
    checkOffsetAndCount(sink.length, offset, byteCount);

    if (buffer.size == 0) {
      long read = source.read(buffer, Segment.SIZE);
      if (read == -1) return -1;
    }

    int toRead = (int) Math.min(byteCount, buffer.size);
    return buffer.read(sink, offset, toRead);
  }

你会发现和 read(Buffer sink, long byteCount) 方法相似,从包裹输入流 source 读取数据存入当前缓存区buffer 中,然后再通过缓存区的read 方法,将数据写入到字节数组 sink 中。

RealBufferedSource 剩下的方法,请大家自行分析。

6.2 RealBufferedSink

6.2.1 成员属性

  public final Buffer buffer = new Buffer();
  public final Sink sink;
  boolean closed;

6.2.2 构造方法

  RealBufferedSink(Sink sink) {
    if (sink == null) throw new NullPointerException("sink == null");
    this.sink = sink;
  }

只有一个构造函数,必须包裹一个输出流sink,因为RealBufferedSink就是辅助输出流sink存在的。

6.2.3 write(Buffer source, long byteCount)

  @Override public void write(Buffer source, long byteCount)
      throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source, byteCount);
    emitCompleteSegments();
  }

这个是 Sink 类的方法,表示将来源缓存区source 中的byteCount 个字节数据写入到输出流中。
方法分析

6.2.4 emitCompleteSegments 方法

  @Override public BufferedSink emitCompleteSegments() throws IOException {
    if (closed) throw new IllegalStateException("closed");
    long byteCount = buffer.completeSegmentByteCount();
    if (byteCount > 0) sink.write(buffer, byteCount);
    return this;
  }

6.2.4 write 字节数组

  @Override public BufferedSink write(byte[] source) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source);
    return emitCompleteSegments();
  }

  @Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source, offset, byteCount);
    return emitCompleteSegments();
  }

都是向缓存区buffer 中写入数据,最后调用 emitCompleteSegments 方法,看是否向包裹输出流sink 中。

RealBufferedSink 剩下的方法,请大家自行分析。

七. Okio

这其实是一个工具类,方便我们来获取 SourceSink

7.1 获取 Source

想要获取一个 Source 输出流对象,其实有很多个方法。

7.1.1 从文件中获取

  /** Returns a source that reads from {@code file}. */
  public static Source source(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return source(new FileInputStream(file));
  }

7.1.2 从输入流中获取

  /** Returns a source that reads from {@code in}. */
  public static Source source(InputStream in) {
    return source(in, new Timeout());
  }

  private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) {
            if (tail.pos == tail.limit) {
              // We allocated a tail segment, but didn't end up needing it. Recycle!
              sink.head = tail.pop();
              SegmentPool.recycle(tail);
            }
            return -1;
          }
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

可以看到创建了Source 一个内部类,通过的输入流的 in.read(tail.data, tail.limit, maxToCopy) 方法获取数据,写入到缓存区sink 中。

7.1.3 从路径中获取

  public static Source source(Path path, OpenOption... options) throws IOException {
    if (path == null) throw new IllegalArgumentException("path == null");
    return source(Files.newInputStream(path, options));
  }

7.1.4 从 socket 中获取

  public static Source source(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    if (socket.getInputStream() == null) throw new IOException("socket's input stream == null");
    AsyncTimeout timeout = timeout(socket);
    Source source = source(socket.getInputStream(), timeout);
    return timeout.source(source);
  }

7.2 获取 Sink

7.2.1 从文件中获取

  /** Returns a sink that writes to {@code file}. */
  public static Sink sink(File file) throws FileNotFoundException {
    if (file == null) throw new IllegalArgumentException("file == null");
    return sink(new FileOutputStream(file));
  }

7.2.2 从输出流中获取

  /** Returns a sink that writes to {@code out}. */
  public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }

  private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() {
      @Override public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      @Override public void flush() throws IOException {
        out.flush();
      }

      @Override public void close() throws IOException {
        out.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };
  }

可以看到创建了Sink一个内部类,从来源缓存区source 中获取字节数据,通过输出流的out.write(head.data, head.pos, toCopy) 写入。

7.2.3 从路径中获取

  /** Returns a sink that writes to {@code path}. */
  @IgnoreJRERequirement // Should only be invoked on Java 7+.
  public static Sink sink(Path path, OpenOption... options) throws IOException {
    if (path == null) throw new IllegalArgumentException("path == null");
    return sink(Files.newOutputStream(path, options));
  }

7.2.4 从 socket 中获取

  public static Sink sink(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    if (socket.getOutputStream() == null) throw new IOException("socket's output stream == null");
    AsyncTimeout timeout = timeout(socket);
    Sink sink = sink(socket.getOutputStream(), timeout);
    return timeout.sink(sink);
  }

7.3 获取缓存流

  public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }

  public static BufferedSource buffer(Source source) {
    return new RealBufferedSource(source);
  }
上一篇 下一篇

猜你喜欢

热点阅读