Okio 源码分析
square在开源社区的贡献是卓越的,这里是square在Android领域贡献的开源项目。
1. okio概念
-
okio是一个由square公司开发的开源库,它弥补了Java.io和java.nio的不足,能够更方便快速的读取、存储和处理数据。
-
okio有自己的流类型Source和Sink,对应于java.io的InputStream和OutputStream。
-
okio内部引入了ByteString和Buffer,提升了效率和性能。
-
okio引入了超时机制。
-
okio规模不大,代码精巧,是源码学习的好素材(okio-1.6.0.jar):
2. Source和Sink
Source代表输入流,Sink代表输出流,Source和Sink的实现逻辑基本相似,以Source为例,学习一下它的实现原理,首先来看一下Source的源码:
package okio;
import java.io.Closeable;
import java.io.IOException;
public interface Source extends Closeable {
/**
* 将此source输入流中的数据移动到sink中(至少1字节,至多byteCount字节)
* 返回移动的字节数,source读完为空时返回-1
*/
long read(Buffer sink, long byteCount) throws IOException;
/** 超时机制 */
Timeout timeout();
/**
* 关闭此source输入流并释放此source输入流持有的所有资源
* 关闭后的source输入流不能再进行读取
* 及时关闭source输入流
*/
@Override void close() throws IOException;
}
source相比于java.io的InputStream精简很多,它的具体功能通过装饰器模式在它的装饰类中实现,整体的认识一下Source和它的装饰器的实现关系:
这里写图片描述GzipSource为支持gzip压缩的实现类,InflaterSource为GzipSource服务,用于压缩;ForwardingSource是一个具有委托功能的抽象类。
其中BufferedSource为实现Source支持缓冲区的子类接口,其中定义了缓冲区及多种类型的读方法,源码如下:
package okio;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
public interface BufferedSource extends Source {
Buffer buffer();
boolean exhausted() throws IOException;
void require(long byteCount) throws IOException;
boolean request(long byteCount) throws IOException;
byte readByte() throws IOException;
short readShort() throws IOException;
short readShortLe() throws IOException;
int readInt() throws IOException;
......
}
RealBufferedSource为BufferedSource的实现类,通常情况下我们对输入流的操作都是在操作RealBufferedSource,RealBufferedSource类中有两个主要参数,一个是Source对象,一个是新建的Buffer对象,而各种读方法都是通过Buffer来具体实现的,比如readByteArray方法:
@Override public byte[] readByteArray() throws IOException {
buffer.writeAll(source);
return buffer.readByteArray();
}
可见虽然这个类叫RealBufferedSource,但是实际上只是一个保存Buffer对象的一个代理实现,真正的实现都是在Buffer中实现的,而正是通过Buffer的应用,才实现了okio的高效性。
3. Buffer
Buffer是BufferedSink和BufferedSource的实现类,因此它既可以用来读数据,也可以用来写数据。在Buffer的注释中说明了okio的高效性:
-
copy数组时仅仅改变底层字节数组的所有权,而不是把数据从一块内存复制到另一块内存中
-
如ArrayList一样根据需要动态分配内存大小
-
避免了数组创建时的zero-fill,降低了GC的频率。
Buffer是通过Segment和SegmentPool来实现以上高效功能的,Segment译为片段,okio将数据也就是Buffer分割成片段,同时Segment有前置节点和后置节点,构成了一个双向循环链表,如图:
Buffer和Segment的关系
分片之间使用链表连接,片中使用数组存储,兼具读的连续性和写的可插入性,Segment中并不是随意的使用数组存储数据,其内部维护着一个固定长度的字节数组。Segment源码分析如下:
static final int SIZE = 2048;
final byte[] data;
int pos;
int limit;
boolean shared;
boolean owner;
Segment next;
Segment prev;
SIZE就是Segment维护的数组固定长度,data用来存储数据,pos,limit就是开始和结束点的index,shared标志此Segment是否被其他Segment引用即共享数据,owner标志此Segment是否只被自己引用即独享数据,next、prev分别指向后置节点和前置节点。
public Segment pop() {
Segment result = next != this ? next : null;
prev.next = next;
next.prev = prev;
next = null;
prev = null;
return result;
}
public Segment push(Segment segment) {
segment.prev = this;
segment.next = next;
next.prev = segment;
next = segment;
return segment;
}
pop方法移除了自己,首先将自己的前后两个节点连接起来,然后将自己的前后引用置空,这样就脱离了整个双向链表,然后返回next;push方法就是在当前和next引用中间插入一个segment进来,并且返回插入的segment,这两个都是寻常的双向链表的操作。
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}
writeTo方法将此Segment中的数据移动到sink片段中,其中的owner和Shared用来判断如果是共享片端就无法改变片段数据,sink.limit + byteCount > SIZE 即当要写的字节大小加上原来的字节数(尾节点索引)大于Segment的最大值时抛出异常,但是也存在一种情况就是虽然尾节点索引和写入字节大小加起来超过,但是可能是由于前面的read方法取出数据时导致pos索引后移(pos>0),这时就先执行移动操作,使用系统的System.arraycopy方法将从pos开始的数据移动到从0开始的位置,然后重置pos为0,limit为移动后的尾节点索引,然后再从limit位置写入数据。
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix = new Segment(this);
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
split译为分割,该方法从此Segment中分割出一个新Segment,其中新Segment包含pos(pos+byteCount)的数据,原Segment包含(pos+byteCount)limit的数据,其中数据并没有真正进行移动,通过改变pos、limit索引值,避免了copy操作。且原Segment和新Segment的shared都被置为true,标志数据不可再进行写入改动,此方法在Buffer的write方法中调用,主要为了实现在移动数据时直接操作Segment而不是data,这样在写数据时可以达到很高的效率。
public void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return;
int byteCount = limit - pos;
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
if (byteCount > availableByteCount) return;
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
compact译为压缩,此方法判断前置片段的空闲容量是否能容纳此片段的数据,如果能容纳则将此片段的数据移动到前置片段,然后回收此片段,可以防止十分短的数据占据一整个Segment而浪费空间的现象。
相比于Segment,SegmentPool就十分精简了,它只有三个静态变量和两个方法:
static final long MAX_SIZE = 64 * 1024;
static Segment next;
static long byteCount;
MAX_SIZE 表示片段池的最大容量,此版本的数值表示片端池中最多可以容纳32个片段,next将片段连接成单向链表;byteCount表示片端池当前包含的总字节数。
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment();
}
take方法用于从片段池中取片段,当next为空即片段池为空时,新建一个片段返回(此新建片段在用完后回收进片段池),若片段池不为空则返回池中第一个片段,并将这个片段从片段池中移除,其中加锁防止多线程同时取数据。
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
}
}
recycle方法用于将不用的Segment回收至片段池,且首先要判断即将回收的Segment的next和prev是否已经置空,即是否已经从Segment双向链表中正确的移除,然后还要判断此Segment是否为共享状态,为共享状态表示此Segment可能还要执行一些操作,比如正在移动Segment,此时也不能进行回收,最后一层判断片段池能否容纳此Segment。符合以上三个条件后进行回收,将此Segment参数初始化,加入单向链表,等待被取出使用。
下面通过Buffer的write方法分析Buffer是如何通过Segment和SegmentPool实现高效性的:
@Override public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
//不能将数据移动给自身,没有意义
if (source == this) throw new IllegalArgumentException("source == this");
//判断参数的合理性
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// 判断要移动的数据是否全部存储在source的第一个片段中
if (byteCount < (source.head.limit - source.head.pos)) {
//目标片段tail为当前片段的前置片段
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
//若目标片段tail可用且容量足以存储数据
// 将source头片段中的数据移动到目标片段tail
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
//若目标片段tail不可用或容量不足以存储数据
//使用split方法分割source片段,并将分割得到的待写入数据片段作为source的第一个片段
source.head = source.head.split((int) byteCount);
}
}
// 将source第一个片段移动到目标片段
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
//移除source第一个片段,并将source的后置节点前移作为source第一个片段,相当于堆栈的出栈
source.head = segmentToMove.pop();
if (head == null) {
//若待写入Buffer(即this)中没有任何片段,则直接将头片段指向数据片段
head = segmentToMove;
head.next = head.prev = head;
} else {
//否则将数据片段移动到头片段的前置片段,并尝试压缩
Segment tail = head.prev;
tail = tail.push(segmentToMove);
tail.compact();
}
//重置容量,若没有移动完则循环
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
当我们想要把一个Buffer中的数据移动到另一个Buffer时,上面方法可能会处理的几种情景如下(【91%,61%】表示一个Buffer中有两个片段,91%表示此片段中已有数据占片段最大容量的91%):
将 【72%】写入 【91%,61%】 --> 【91%,61%,72%】
将 【99%,3%】写入 【100%,2%】 --> 【100%,2%,99%,3%】
将 【3%,99%】写入 【100%,2%】 --> 【100%,5%,99%】
将 【92%,82%】的头片段的30%写入 【51%,91%】 --> 首先把源Buffer分割成【30%,62%,82%】,移动 --> 【51%,91%,30%】
结合这几种情景再去分析write方法,逻辑就十分清晰了,其中有一点就是在程序中的逻辑往Buffer中插入数据是往前置节点插入的,而这些情景将数据插入到了尾部,其实这并不矛盾,因为片段本身就是双向链表,只要你插入数据的顺序和读取数据的顺序相对应就可以,上面的情景主要为了分析数据移动的可能结果。这里分析了在Buffer中是怎么通过Segment实现高效的,没有涉及到SegmentPool,SegmentPool的应用十分简单,只有take取片段和recycle回收片段两个方法,这里就不再展开了。
4. okio的超时机制
okio的超时机制让IO不会因为异常阻塞在某个未知的错误上,这让上层不会错过一个可能导致系统崩溃的超时异常,超时机制使okio更加稳定。比如通过okio将srcFile文件内容复制给dstFile:
sink = Okio.sink(dstFile);
source = Okio.source(srcFile);
Buffer buf = new Buffer();
for (long readCount; (readCount = source.read(buf, 2048)) != -1; ) {
sink.write(buf, readCount);
}
点击第一行的sink方法查看:
public static Sink sink(File file) throws FileNotFoundException {
if (file == null) throw new IllegalArgumentException("file == null");
return sink(new FileOutputStream(file));
}
继续查看sink()方法:
public static Sink sink(final OutputStream out) {
return sink(out, new Timeout());
}
可以看到此处引入了一个Timeout实例,即Sink输入流引入超时机制,继续跟踪sink方法查看如何让这个Timeout实例跟我们的sink挂钩:
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
......
//写入操作
......
可以看到在每次开始写入操作之前,调用了timeout的throwIfReached方法,查看throwIfReached方法:
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
在每次开始写的时候就调用此方法去判断写入的是否超时,同样Source也是一样的操作。但是查看okio源码可以发现除了Timeout之外,还有一个AsyncTimeout类,AsyncTimeout继承于Timeout,Timeout用于同步计时,即在同一个线程中执行我们的okio操作和计时,而AsyncTimeout扩展了异步计时功能,当我们对scoket包装时就引入了异步超时机制:
public static Sink sink(final Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
之所以在socket写时采取异步超时,这完全是由socket自身的性质决定的,socket经常会阻塞自己,导致可能无法使用同步超时。timeout(socket)方法首先把socket封装到AsyncTimeout中,然后返回的是经过AsyncTimeout 中重新包装过的sink,AsyncTimeout 中sink方法如下:
public final Sink sink(final Sink sink) {
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.write(source, byteCount);
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public void flush() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.flush();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public void close() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
sink.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public Timeout timeout() {
return AsyncTimeout.this;
}
@Override public String toString() {
return "AsyncTimeout.sink(" + sink + ")";
}
};
}
其中使用装饰器模式构造了一个新的sink,除了保留原来sink的方法之外,还增加了异步超时的操作,在write、flush和close开始都调用了enter方法:
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
然后查看scheduleTimeout方法:
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
......
......
到这里终于看到“异步”操作了,new Watchdog().start()开启了用来判断是否超时的线程,到此为止只大概分析了异步超时机制是如何引入的,其中还涉及到其他比较复杂的wait,链表,加锁等操作,感兴趣的可以阅读源码详细学习。
5. byteString类型
byteString类型为一个不可变的字节序列,看方法就能知道它的功能:
这里写图片描述byteString的高效性体现在byte[]类型和String类型之间的转换以及各种常用字符工具的集成。byteString内部以两种类型的变量记录了同个数据:
byte[] data;
transient String utf8;
这样能够在byte[]和String转换上基本没有开销,同样的也需要保存两份引用,这是明显的空间换时间的方式,节省了new String(byte[] data)这样的CPU开销。由于它包含了双倍的数据,所占的内存相对比较大,所以适用于不太长的数据,这样便不必考虑带来的内存问题。String类型用transient修饰,表示该变量不会进行序列化和反序列化,表明当我们序列化byteString对它传输时,只序列化了byte[]类型,因为我们并不想传输双倍的数据而降低性能。
okio的内容基本上分析完了,下面附上一张okio的框架图:
这里写图片描述最后再来总结一下okio的优点:
1. 使用简便
-
Buffer是处理可变byte序列的利器,它可以根据使用情况自动增长,在使用过程中不用关心位置的处理
-
Java.IO读取不同类型数据要用DataInputStream来包装,使用缓存要使用BufferedOutputStream,而在okio中BufferedSink/BufferedSource就具有以上所有功能
-
okio提供了方便压缩及字符常用处理工具
2. 速度快
-
okio采用了segment机制进行内存共享,极大减少了copy操作带来的时间消耗,加快了读写速度
-
okio引入ByteString使其在byte[]与String之间转换速度非常快
3. 稳定
- okio提供了超时机制,不仅在IO操作上加上超时的判定,包括close,flush之类的方法中都有超时机制
4. 内存消耗小
-
虽然okio在byteString采用空间换时间,但是对内存也做了极致的优化,总体还是极大提高了性能
-
okio的segment机制进行内存复用,上传大文件时完全不用考虑OOM
参考文章:
http://www.jianshu.com/p/f033a64539a1