Android程序员

[搞定开源] 第二篇 okio 1.14原理

2018-07-13  本文已影响0人  展翅而飞

Android开源项目原理系列

[搞定开源] 第一篇 okhttp 3.10原理

okio是okhttp的io部分,可以单独使用。知道它的存在后,我就爱不释手,因为它真的十分精彩。

按官方的介绍,okio是对java io和nio的补充,以便更加容易处理数据输入输出。

demo

private void write(File file) throws IOException {
    try (Sink sink = Okio.sink(file);
         BufferedSink bufferedSink = Okio.buffer(sink)) {

        bufferedSink.writeUtf8("寥落古行宫").writeUtf8(System.lineSeparator());
        bufferedSink.writeUtf8("宫花寂寞红").writeUtf8(System.lineSeparator());
        bufferedSink.writeUtf8("白头宫女在").writeUtf8(System.lineSeparator());
        bufferedSink.writeUtf8("闲坐说玄宗").writeUtf8(System.lineSeparator());
    }
}

private void read(File file) throws IOException {
    try (Source source = Okio.source(file);
         BufferedSource bufferedSource = Okio.buffer(source)) {

        for (String line; (line = bufferedSource.readUtf8Line()) != null; ) {
            System.out.println(line);
        }
    }
}

先来看okio的基本使用,例子是一个写方法、一个读方法,Sink和Source对应java io的OutputStream和InputStream。

try后面加了个括号,这是java7对try-catch的改进。结束try里的代码时,自动执行括号对象的close方法(对象需要实现Closeable),io代码的福音。

okio的代码不多,借助idea的show diagrams功能,将okio全部类和继承关系打印出来:

okio类

ByteString

ByteString是一个不可变的字节串,引用官方对ByteString诙谐的介绍:

ByteString is String's long-lost brother, making it easy to treat binary data as a value.

final byte[] data;
transient String utf8; // Lazily computed.

数据在ByteString里同时保存为byte[]和String,所以两者的转换易如反掌(都存成两份了,哈),典型的空间换时间。注意到String标记成transient,所以在序列化中,只处理byte[],无必要浪费功夫多理String。

有了ByteString,各种字符转换和处理就很容易了(一些工具类可以干掉了),挑几个方法看看:

public String utf8() {
  String result = utf8;
  return result != null ? result : (utf8 = new String(data, Util.UTF_8));
}
public String base64() {
  return Base64.encode(data);
}
public ByteString md5() {
  return digest("MD5");
}
public ByteString sha1() {
  return digest("SHA-1");
}

Source和Sink

okio对数据的处理和java io类似,使用了流的概念。Source是输入流,Sink是输出流,分别定义了基础的read/write方法。

我们知道,java io是“装饰者模式”的经典实现,在InputStream/OutputStream的基础上,有很多装饰类,动态地为流添加各种各样的功能。这是个很好的设计模式,但个人感觉类多了点,而Source/Sink的实现类只有几个,用得最多是buffer相关的类,提供大部分方法,简单些又不失功能。

网络上偷张《Head First设计模式》讲装饰者模式的图:


java io装饰者

Buffer

和ByteString处理不可变字节串相对,Buffer处理的是可变字节串。定义在BufferedSource/BufferedSink,对应的实现类是RealBufferedSource/RealBufferedSink。回顾上图okio所有类,Buffer在Source/Sink的中间,打通两者的操作。

每个BufferedSource/BufferedSink都会创建自己的Buffer,一一对应:

public final Buffer buffer = new Buffer();

segment

缓存操作的数据结构是segment,可以想象成一块块片段组成了整个buffer。

final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment next;
Segment prev;

segment其实就是长度固定的数组(SIZE=8192),两个指针pos和limit分别指示开始和结束的index,next和prev将segment构成一个双向链表。segmen还有两个布尔标记共享:

okio对缓存操作的核心思想就是:

在buffer之间共享segment。

在java io中,流与流的缓存是无联系的,如果输入流的数据要写入输出流,需要将数据复制过去。在okio中,相同的动作,只需要将segment从一个buffer分配给另一个buffer。好处,显而易见。


上面几个是操作segment的函数,在后文会用到,早点准备。函数的逻辑实质是链表的操作,其中push/pop将segment加入或移出链表,小学水平,路过。

public void writeTo(Segment sink, int byteCount) {
  if (!sink.owner) throw new IllegalArgumentException();
  if (sink.limit + byteCount > SIZE) {
    // We can’t fit byteCount bytes at the sink’s current position. Shift sink first.
    if (sink.shared) throw new IllegalArgumentException();
    if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
    System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
    sink.limit -= sink.pos;
    sink.pos = 0;
  }

  System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
  sink.limit += byteCount;
  pos += byteCount;
}

writeTo将当前segment中的数据写入另一个segment,写入前需要判断owner和sheared。目标segment的空间需要足够,但有可能是read导致pos向后移,这个时候pos会大于0。已读完的数据可以覆盖,所以先移一移数据。剩下的,就是当前segment数组内容复制到目标segment数组。

public void compact() {
  if (prev == this) throw new IllegalStateException();
  if (!prev.owner) return; // Cannot compact: prev isn‘t writable.
  int byteCount = limit - pos;
  int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
  if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
  writeTo(prev, byteCount);
  pop();
  SegmentPool.recycle(this);
}

为了节约内存空间,空闲的segment会尝试合并前一个segment。如果空间足够容纳,当前segment的内容writeTo前一个segment,然后回收当前segment。

public Segment split(int byteCount) {
  if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
  Segment prefix;

  // We have two competing performance goals:
  //  - Avoid copying data. We accomplish this by sharing segments.
  //  - Avoid short shared segments. These are bad for performance because they are readonly and
  //    may lead to long chains of short segments.
  // To balance these goals we only share segments when the copy will be large.
  if (byteCount >= SHARE_MINIMUM) {
    prefix = sharedCopy();
  } else {
    prefix = SegmentPool.take();
    System.arraycopy(data, pos, prefix.data, 0, byteCount);
  }

  prefix.limit = prefix.pos + byteCount;
  pos += byteCount;
  prev.push(prefix);
  return prefix;
}

split方法将一个segment分割成两个segment,前者保存的数据范围:pos..pos+byteCount,后者保存的数据范围:pos+byteCount..limit。在创建新segment时,会根据内容多少控制segment是否shared(size>=1024),避免共享太细的segment。

SegmentPool

为了提高性能,无用的segment不会白白丢弃,SegmentPool提供了回收机制(享元模式,另一个例子是Handler发送的Message)。

static void recycle(Segment segment) {
  if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
  if (segment.shared) return; // This segment cannot be recycled.
  synchronized (SegmentPool.class) {
    if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
    byteCount += Segment.SIZE;
    segment.next = next;
    segment.pos = segment.limit = 0;
    next = segment;
  }
}

recycle方法将无用的segment放入SegmentPool的单链表存着,下次重复使用时,调用take取出来:

static Segment take() {
  synchronized (SegmentPool.class) {
    if (next != null) {
      Segment result = next;
      next = result.next;
      result.next = null;
      byteCount -= Segment.SIZE;
      return result;
    }
  }
  return new Segment(); // Pool is empty. Don’t zero-fill while holding a lock.
}

有存货就从链表里取出来,无就新建。

write

okio的缓存实质就是对segment读写,停一分钟想想对segment的write/read需要考虑什么问题。

好了,segment里数组的长度固定,我们读写的数据可能是各种格式:Int、Long、String等。有可能读写发生在一个segment,也可能需要跨多个segment。

具体来看例子里调用BufferedSink的writeUtf8,里面调用buffer的writeUtf8。写入一段string会比较长,writeUtf8进行分拆处理,核心是对分拆的字符调用writeByte方法。

@Override public Buffer writeByte(int b) {
  Segment tail = writableSegment(1);
  tail.data[tail.limit++] = (byte) b;
  size += 1;
  return this;
}

写入操作就是两步:1、获取一个剩余空间足够的segment;2、数据写入segment里的数组。

Segment writableSegment(int minimumCapacity) {
  if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();

  if (head == null) {
    head = SegmentPool.take(); // Acquire a first segment.
    return head.next = head.prev = head;
  }

  Segment tail = head.prev;
  if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
    tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
  }
  return tail;
}

writableSegment入参是最低要求的空间长度,如果head不存在,直接新建;否则通过head.prev取得链表尾,检查长度并返回。其他一堆write方法,根据数据类型,控制写入长度,大同小异。

最精华的地方来了,来看write(Buffer source, long byteCount) ,入参是另一个buffer:

@Override public void write(Buffer source, long byteCount) {
  if (source == null) throw new IllegalArgumentException("source == null");
  if (source == this) throw new IllegalArgumentException("source == this");
  checkOffsetAndCount(source.size, 0, byteCount);

  while (byteCount > 0) {
    //1
    if (byteCount < (source.head.limit - source.head.pos)) {
      Segment tail = head != null ? head.prev : null;
      if (tail != null && tail.owner
          && (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
        //2
        source.head.writeTo(tail, (int) byteCount);
        source.size -= byteCount;
        size += byteCount;
        return;
      } else {
        //3
        source.head = source.head.split((int) byteCount);
      }
    }

    // 4
    Segment segmentToMove = source.head;
    long movedByteCount = segmentToMove.limit - segmentToMove.pos;
    source.head = segmentToMove.pop();
    if (head == null) {
      head = segmentToMove;
      head.next = head.prev = head;
    } else {
      Segment tail = head.prev;
      tail = tail.push(segmentToMove);
      tail.compact();
    }
    source.size -= movedByteCount;
    size += movedByteCount;
    byteCount -= movedByteCount;
  }
}

从别的buffer读取内容写入当前buffer,正是前文提到的,okio利用segment共享提高性能的体现。来源buffer需要考虑跨segment,目标buffer也需要考虑跨segment。

进入循环,每次操作一段数据,从byteCount减去本次操作数据的长度,直到byteCount为零。mark1判断是否source.head包括所有要操作的数据,是的话就比较简单:

source:[aabbbb] -> [bbbb]
sink:[head]-[10%]  -> [head]-[10%+aa]

mark2,source.head剩余[aa]需要输入,尾segment能够容纳,直接调用writeTo,最终[aa]复制进尾segment。

source:[aabbbb] -> [aa]-[bbbb]
sink:[head]-[80%] -> [head]-[80%]-[aa]

mark3,source[aabbbb]还剩一段[aa]需要写入,尾segment不能容纳,无法使用writeTo。这个时候,需要将source分割成[aa]-[bbbb]两个,在mark4中将[aa]接入尾segment。

mark4就是source的segment接入当前buffer的过程,当操作完成后,会尝试compact节约空间。

纵观上述的过程,只有mark2有复制数据的动作,而且也是必须的,节约了内存。其他情况都是将segment指向不同的buffer,非常节约性能,真是666。

read

再来看read方法,例子里调用的readUtf8Line最终调用了buffer的readString:

@Override public String readString(long byteCount, Charset charset) throws EOFException {
   checkOffsetAndCount(size, 0, byteCount);
   if (charset == null) throw new IllegalArgumentException("charset == null");
   if (byteCount > Integer.MAX_VALUE) {
     throw new IllegalArgumentException("byteCount > Integer.MAX_VALUE: " + byteCount);
   }
   if (byteCount == 0) return "";

   Segment s = head;
   if (s.pos + byteCount > s.limit) {
     // If the string spans multiple segments, delegate to readBytes().
     return new String(readByteArray(byteCount), charset);
   }

   String result = new String(s.data, s.pos, (int) byteCount, charset);
   s.pos += byteCount;
   size -= byteCount;

   if (s.pos == s.limit) {
     head = s.pop();
     SegmentPool.recycle(s);
   }

   return result;
 }

首先获取head所在的segment,判断读取的长度是否大于limit:

@Override public void readFully(byte[] sink) throws EOFException {
  int offset = 0;
  while (offset < sink.length) {
    int read = read(sink, offset, sink.length - offset);
    if (read == -1) throw new EOFException();
    offset += read;
  }
}

readByteArray创建了个字节数组存放结果,交由readFully处理。既然需要跨segment,少不了循环逐段读取,readFully的循环里调用了read方法。

@Override public int read(byte[] sink, int offset, int byteCount) {
  checkOffsetAndCount(sink.length, offset, byteCount);

  Segment s = head;
  if (s == null) return -1;
  int toCopy = Math.min(byteCount, s.limit - s.pos);
  System.arraycopy(s.data, s.pos, sink, offset, toCopy);

  s.pos += toCopy;
  size -= toCopy;

  if (s.pos == s.limit) {
    head = s.pop();
    SegmentPool.recycle(s);
  }

  return toCopy;
}

read方法和上面说的readString方法类似,从segment读取的结果复制到目标数组。

okhttp和okio

okhttp连接上的流Http1Codec的输入输出用的就是okio:

final BufferedSource source;
final BufferedSink sink;

Timeout

okio对数据的读写提供超时机制,例如从InputSream获取Source(适配器,Sink/Source和OutputStream/InputSteam可以随意切换),有个入参Timeout,表示对读取数据有时间要求。具体Timeout的代码不打算细说,讲个大概。

超时的指定有两种:

Timeout类很常规,是个同步超时类,除此之外,AsyncTimeout类继承Timeout,提供异步超时机制。在适配Socket时用的是AsyncTimeout,因为socket在等待数据时会阻塞,需要异步处理。

AsyncTimeout互相之间形成链表,根据超时时间有序排序。有个唯一守护线程WatchDog,实现超时到达马上反馈。io阻塞时线程停顿,需要通过别人告知,这就是异步超时的原理。

第一次知道异步超时处理,wonderful。

总结

okio说:“java io很好,也有不足的地方,由我来优化”。

小而美,精巧强大,缓存部分对segment的操作非常值得学习。只言片语不能尽述,RTFSC。

不care有无人看我写的笔记,反正我会了,哈哈。

上一篇下一篇

猜你喜欢

热点阅读