Okio精简高效的IO库
本篇文章已授权微信公众号 guolin_blog (郭霖)独家发布
本节主要讲讲Okhttp底层使用的IO库--Okio,Okio同样是Square公司推出的增强型IO处理库,旨在增强原生Java IO流的处理,以更加简便,高效的方式处理IO流操作。接下来我会从以下方面来分析它。
- Okio的特点和优势
- Okio结构分析
- Okio的流程分析,读与写的实现
- Buffer写数据的精华操作
- Buffer缓存的总结
- TimeOut超时机制
- Gzip压缩简要分析
- 总结
1. Okio的特点和优势
我们知道Java原生的IO处理已经很强大了,有针对字节和字符的输入输出接口,实现有缓存的处理,以及各种子类实现比如文件的(FileInputStream和FileOutputStream),数据的(DataInputStream和DataOutputStream),对象的(ObjectInputStream和ObjectOutputStream)等等。为什么Square还要搞出个Okio来呢?其实吧,我们要明白,Okio不是用来完全取代原生IO的,事实上它本身也是基于原生IO之上的,比如要从文件得到一个文件输入流,还是得通过FileIntputStream来得到,所以Okio的用意不是取代,而是在某些场合作更加优化的使用,意思就是你原生IO有些地方没有做好,我要用我自己的方式得到更高效简便的IO处理。那么Okio具体有哪些优势呢?主要有以下:
1.精简的api接口。
我们知道原生的Java IO流使用是比较复杂的,基础的字节流接口有InpuStream和OutputStream,字符流接口有Reader和Writer,每个接口都有很多实现的子类,里面大量使用了装饰着模式。假如我要创建一个DataOutputStream用于将一些数据类型数据输出到文件中,我可能需要经历FileOutputStream->BufferedOutputStream->DataOutputStream的创建过程。而如果使用Okio来操作的话可以很简单。
File file = new File(Environment.getExternalStorageDirectory() + "/" + "output.txt");
String name = "xiaoming";
int age = 11;
//使用原生IO
DataOutputStream dos = null;
try {
dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file)));
dos.writeChars(name);
dos.writeInt(age);
}catch (IOException ex){
ex.printStackTrace();
}finally {
try {
if(dos != null){
dos.close();
}
}catch (Exception ex){
ex.printStackTrace();
}
}
//使用Okio
try {
Okio.buffer(Okio.sink(file)).writeUtf8(name).writeInt(age).close();
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch (IOException ex){
ex.printStackTrace();
}
从上面可以看出Okio可以很方便的使用链式调用,一句代码就可以完成流的创建->操作->关闭,是不是有一气呵成的感觉。
Okio为了精简我们常用的字节字符流操作,抽象出了BufferedSource缓存流输入和BufferedSink缓存流输出接口,它们具有处理字节和字符,和数据缓存的功能。这么看,是不是Okio一个接口实现了原生IO中三个接口的功能呢。
2.性能高效的缓存处理功能。
这应该是推出Okio的关键原因吧,Okio不满足于原生IO中BufferedOutputStream等缓存流的简单粗暴的缓存处理,转而自己使用更高效的方式处理处理流中数据缓存的操作。
我们看看原生中缓存的处理。
原生中缓存的处理是这样的,每个buffer流中维护一个默认大小8192的buf数组,自己负责创建和回收,当缓存的数据快要满时(buf剩余空间不足以存储这一次的数据时),就会将buf中的缓存数据全部输出,然后重新缓存。如果一次操作的数据大小大于缓存的大小(默认8192),那么缓存就没法使用了,因为缓存一次性存不了这么多数据。
然后原生中的缓存与缓存之间没有直接的交流,这样造成的影响是,输入流中的数据转移到输出流中是:输入buf -> 临时byte数组 -> 输出buf,经历两次拷贝。
原生中的缓存功能看起来并不高效,存在不少问题,Okio要改变这种窘境。
- Buffer交互增强。首先是缓存交互的设计,我们看Source的read和Sink的write方法都是直接将对方的Buffer传入进去。这样的用意就是,增加输入和输出之间缓存的交互,减少额外的临时空间用于数据拷贝。例如从输入流中读取数据写入到输出流的这个过程,是先从Source流读取数据到Source缓存Buffer,然后从Source缓存Buffer将数据转移到Sink缓存BUffer(注意,这里是直接从Source的Buffer缓存转移数据到Sink的Buffer,相比原生的临时byte数组是不是少了一次数据拷贝过程?),最后到Sink缓存BUffer将数据flush到输出流中。
- Buffer结构优化,数据转移优化。不像原生中使用一个默认8192的大数组存放缓存数据,Okio转而使用一个个大小2048的小数组(这段数组数据封装为Segment),采用双链表的方式组织在一起,形成一条前后循环的以小段数组为单位的链表。那么这样的结构有一个潜在的好处就是,操作的数据是以一小段数组(segment)为单位,从source流的缓存数据中转移到sink流缓数据中的过程,大部分请求下就不必像原生中对字节数据一个个拷贝过去,而是可以直接指向过去,注意,是直接指向,将source中的该小段数组(Segment)从source的缓存链表中移除,添加到sink中缓存链表中,不需要任何的数据拷贝工作,这显然可以节省很多cpu时间。
- 灵活智能的Segment数据控制。同时这个segment又是一个很灵活智能的小段数组,它会考虑和它的前一个segment进行数据的合并(compact),节省出一个segment空间。同时它也支持将一个segment分割(split)为两个相连的segment,这样的话可以将原来[offset, limit]的数据,分割为[offset, offset+byteCount]和[offset+byteCount, limit]两段segment数据,这样的好处在于,分离出来后,操作更灵活,比如可以将[offset, offset+byteCount]这段segment直接从source缓存指向sink缓存,免去了数据拷贝工作,而其实这种分离操作,只是逻辑上的分离,这两个分离的segment其实还是用的通过一个数组数据,也就是说它们共享同一段数据,只是它们标记数据范围不一样而已。
- Segment数据池的引入。你以为就这样了吗,Okio为了防止一个个的segment数据被频繁的创建和销毁,使用了一个SegmentPool用于维护使用完成的segment数据,由它来管理segment的销毁或循环使用。SegmentPool提供了64*1024的最大大小,也就是说它可以容纳最多32个segment数据。它提供了take用于获取可使用的segment和recycle用来判定是否回收。这样的好处在于,segment数据和Buffer是独立分开的,Buffer只负责segment的使用,而不负责对它的创建和销毁,转而由SegmentPool来进行管理,这样的话,segment就像是公有资源一样,Buffer使用完了之后交还给SegmentPool,下一个Buffer需要了再从SegmentPool中获取就可以了,这样大大的减少了内存的分配和回收的开销。
可见Okio为了提高IO缓存的高效处理性能可谓是煞费苦心,从输入输出缓存的直接数据对接,到内部Segment结构的引入,以Segment为单位进行数据操作的高效,以及Segment池的引入等等不一而足,为的就是尽可能快的完成IO操作。
3.TimeOut超时的引入
我们知道原生IO中是没有超时这个机制的,如果在输入或输出过程中发生阻塞,那么在这个过程中就没有好的方式对它进行中断操作,在抛出IO异常前可能会一直阻塞下去,这显然不时我们想要的结果,我们希望如果如果它在阻塞到一定时间后能够抛出异常告诉我们发生TimeOut超时了。因此Okio推出了TimeOut机制,实现有TimeOut(同步计时的超时处理)和AsyncTimeOut(异步计时的超时处理)。
- 同步超时TimeOut。TimeOut会在每次的读写操作中判断是否到达了超时时间,进而做超时处理,因而它有个缺点,就是如果在读写方法中,它一直阻塞,那么TimeOut的计时方法也将被阻塞,这样的超时情况,它也无能为力去判断了。
- 异步超时AsyncTimeOut。而AsyncTimeOut则不同,有一个单独的线程Watchdog(姑且称它为看门狗吧)用于监控这些AsyncTimeOut是否超时,如果某个AsyncTimeOut超时了,它就汪汪两声调用AsyncTimeOut的timedOut方法,而你需要做的是实现这个timedOut方法,比如在Okio实现Socket的超时中,它实现的timedOut是关闭Socket,这样如果一直阻塞,看门狗发现超时了,会调用AsyncTimeOut的timedOut,然后就会关闭关闭Socket,这样系统就会抛出IO异常,阻塞也就中止了。
我想在描述了以上优点和它大致的实现原理之后,你已经对Okio已经有初步的了解了。大概知道了它有什么功能,是怎么样的设计理念。接下来对Okio中重要的类作一个简单的介绍,能让你快速熟悉类的结构。
2. Okio结构分析
Okio有一些重要类:
Source,Sink
Okio中封装的输入流接口和输出流接口,对应原生IO的InputStream和OutputStream。分别引入了read方法用于直接将数据读取到传入的Sink的Buffer缓存中,和引入了write方法用于直接从传入的Source缓存中读取数据并写入到自己的Buffer缓存中。然后还有timeout提供超时接口。
BufferedSource,BufferedSink
带有缓存功能的Source接口,Sink接口,分别继承自Source和Sink。同时提供一系列读写字节,字符数据的接口。
Okio
Okio是Okio库的入口类,也是工厂类,它提供source方法得到一个Source输入流,提供sink方法得到一个Sink输出流,提供buffer方法得到具有缓存功能的Source或者Sink对象。它提供对File,Socket,和(InputStream,OutputStream)三种类型的源进行操作,可见,Okio其实是构建在(InputStream,OutputStream)之上的,得到封装之后的(Source,Sink)。
Segment
一小段数组数据的链式节点封装结构,由它们可以组成一个循环连接的双向链表队列。
- 该Segment结构包含pre和next可以找到前一个和后一个的Segment节点。
- 内部维护着一段固定大小2048的数组数据,pos记录下一个可读的位置,limit记录下一个可写的位置,因此,0到pos的数据部分是已经标记读取过了的无效数据区域,pos到limit之间的就是该Segment的有效数据区域,limit到Segment大小部分是还可以再写入数据的区域。
- pop用于弹出当前Segment并返回下一个Segment。
- push用于压入一个Segment到当前Segment的后面。
- split用于一个Segment分割成两个相连的Segment,之前Segment的有效数据区域[offset,limit]被分割成前一个Segment的有效区域[offset,offset+byteCount]和后一个Segment的有效区域[offset+byteCount,limit]部分。
- compact用于考虑将当前Segment和前一个Segment的进行合并。如果前一个Segment的可写区域[limt,Segment.SIZE]大于当前Segment的有效数据区域[pos, limit],则可以将当前Segment的有效数据写到前一个Segment中去,然后删除当前Segment,节省一个Segment空间。
SegmentPool
管理Segment的池,使用单链表记录无用的Segment,提供了take获取一个可用的Segment,提供recycle将无用的Segment进行回收或维护。如果SegmentPool中的Segment的数量小于32个,recycle时会将它加入到单链表中记录起来,同时重置pos和limit以方便后期的Segment重用。如果超过了32个了,则recycle不进行任何操作,这将导致该Segment没有任何引用了,也就将会被回收了。
Buffer
Okio的核心类,用于数据缓存的管理,它实现了BufferedSource和BufferedSink接口,它还支持两个Buffer之间数据的转移(copyTo,注意是转移,不是拷贝,转移的话就是数据指向发送改变了,速度不是拷贝能比的),这就是为啥Buffer这么牛逼的原因了,因为它是唯一一个既能进行读取数据管理,又能进行写入数据管理,而且相互之间还能直接数据转移操作,真是神一样的存在。
RealBufferedSource,RealBufferedSink
RealBufferedSource是缓存Source接口的具体实现,继承自BufferedSource。同时提供一系列读取字节,字符数据的接口。内部的操作基本都是有Buffer来参与处理的,首先会调用request来读取source里的一段数据到Buffer中,然后后续的读取数据都是从Buffer中读的。
RealBufferedSink缓存Sink接口的具体实现,继承自BufferedSink。同时提供一系列写入字节,字符数据的接口。内部的操作基本都是有Buffer来参与处理的,首先会将数据写到Buffer中,然后调用emitCompleteSegments,如果Buffer存储缓存数据的size小于Segment大小的一半,即1024的话,不会可以继续缓存,否则会将缓存的内容全部写到输出中。
3. Okio的流程
上面已经说明了大部分Okio相关类的信息和作用了,那么Okio是怎么样的一个执行流程呢?我想从一个简单的入口,来逐步分析Okio在此期间经历了些什么。下面是将srcFile文件的数据全部复制到文件destFile中
try {
Okio.buffer(Okio.sink(destFile)).writeAll(Okio.buffer(Okio.source(srcFile)));
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch (IOException ex){
ex.printStackTrace();
}
看看,一句话就搞定了两个文件之间的数据复制。它其实等价于
try {
RealBufferedSource source = Okio.buffer(Okio.sink(srcFile));
RealBufferedSink sink = Okio.buffer(Okio.sink(destFile));
sink.writeAll(source);
} catch (FileNotFoundException e) {
e.printStackTrace();
}catch (IOException ex){
ex.printStackTrace();
}
也就是进入到RealBufferedSink.writeAll方法了,我们看里面做了些什么
final class RealBufferedSink implements BufferedSink {
@Override public long writeAll(Source source) throws IOException {
if (source == null) throw new IllegalArgumentException("source == null");
long totalBytesRead = 0;
//循环中,不断从source读取信息到Buffer中
for (long readCount; (readCount = source.read(buffer, Segment.SIZE)) != -1; ) {
totalBytesRead += readCount;
//考虑将Buffer的数据输出到sink
emitCompleteSegments();
}
return totalBytesRead;
}
}
读取数据的处理
我们先看看source.read(buffer, Segment.SIZE)是怎么完成读取工作的
final class RealBufferedSource implements BufferedSource {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
if (buffer.size == 0) {
//如果当前source中的buffer缓存没有数据,则先读取一部分数据到缓存中,省的每次都从source读取
long read = source.read(buffer, Segment.SIZE);
if (read == -1) return -1;
}
//接着就是从buffer中读取byteCount个数据到sink中,但是byteCount大小不能超过buffer缓存的大小,因为当前只有这么多缓存数据
long toRead = Math.min(byteCount, buffer.size);
return buffer.read(sink, toRead);
}
}
注意不要被上面的sink和buffer给迷惑了,sink表示要写入到Sink的缓存Buffer,而buffer是当前source的缓存Buffer。上面做的就是首先从source读取一段数据到自己的buffer中,然后再从buffer读取数据到对方的sink缓存中,虽然指定了要读取byteCount个数据,但是实际能读的大小要看buffer的size,最大不会超过Segment.SIZE。
这里你可能会疑惑 source.read(buffer, Segment.SIZE) 和 buffer.read(sink, toRead) 是怎么实现的。
我们先看看source.read(buffer, Segment.SIZE)
public final class Okio {
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
//这里就是不带缓存的Source的读取方式
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
//判断是否超时(同步方式)
timeout.throwIfReached();
//这里获取sink缓存中最后一个Segment,准备将数据读取到这个Segment中
Segment tail = sink.writableSegment(1);
//判断这个Segment还能容纳多少数据
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
//这里是从原生的InputStream输入流中读取数据到Segment的数组中
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
}
@Override public void close() throws IOException {
in.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "source(" + in + ")";
}
};
}
}
它的真实实现是在这里的,就是先找到sink缓存的最后一个Segment,然后将从InputStream输入流中读取数据到该Segment的数组中。所以它完成的操作就是从输入流读取一部分数据到buffer缓存中。 接着再看 buffer.read(sink, toRead) 是怎么实现的。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
@Override public long read(Buffer sink, long byteCount) {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (size == 0) return -1L;
if (byteCount > size) byteCount = size;
//调用write方法,将自身Buffer的部分数据写到sink的Buffer中
sink.write(this, byteCount);
return byteCount;
}
}
很简单就是,Buffer的read操作就相当于调用对方Buffer的write写入数据,其实就是两个Buffer之间数据的传递过程,这是重点部分,我们在后面重点讲解。接下来我们看Buffer有了数据之后是怎么输出到输出流的。
写入数据的处理
数据读取到Buffer缓存之后,我们再看emitCompleteSegments是干嘛的
final class RealBufferedSink implements BufferedSink {
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
//这里是计算buffer是否有Segment的数据已经满了,如果有的话,就会将满了的Segment数据写入到sink中
long byteCount = buffer.completeSegmentByteCount();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
}
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
/**
* Returns the number of bytes in segments that are not writable. This is the
* number of bytes that can be flushed immediately to an underlying sink
* without harming throughput.
*/
public long completeSegmentByteCount() {
// result是Buffer中所有的Segment的有效数据大小
long result = size;
if (result == 0) return 0;
// Omit the tail if it's still writable.
Segment tail = head.prev;
if (tail.limit < Segment.SIZE && tail.owner) {
result -= tail.limit - tail.pos;
}
//tail.limit - tail.pos; 是最后一个Segment的有效数据大小
//所以result如果大于0,说明Buffer中至少有两个Segment了,也就是有Segment满了。
return result;
}
}
在emitCompleteSegments中判断Buffer中的是否有两个以上的Segment了(也就是说有Segment满了),如果有的话,会将之前满了的Segment数据全部输出到sink中(留下最后一段未满的Segment数据继续作为缓存用),也就是将满的那部分缓存数据flush到输出流中。
4. Buffer写数据的精华操作
Buffer的write(Buffer source, long byteCount)方法是Buffer缓存处理中的精华操作,它描述的是将一个Buffer的数据转移到另一个Buffer中时,是怎么样的一个处理过程。这里保留了英文注释,让你能更原汁原味的了解其中的含义。
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
@Override public void write(Buffer source, long byteCount) {
// Move bytes from the head of the source buffer to the tail of this buffer
// while balancing two conflicting goals: don't waste CPU and don't waste
// memory.
//
//
// Don't waste CPU (ie. don't copy data around).
//
// Copying large amounts of data is expensive. Instead, we prefer to
// reassign entire segments from one buffer to the other.
//
//
// Don't waste memory.
//
// As an invariant, adjacent pairs of segments in a buffer should be at
// least 50% full, except for the head segment and the tail segment.
//
// The head segment cannot maintain the invariant because the application is
// consuming bytes from this segment, decreasing its level.
//
// The tail segment cannot maintain the invariant because the application is
// producing bytes, which may require new nearly-empty tail segments to be
// appended.
//
//
// Moving segments between buffers
//
// When writing one buffer to another, we prefer to reassign entire segments
// over copying bytes into their most compact form. Suppose we have a buffer
// with these segment levels [91%, 61%]. If we append a buffer with a
// single [72%] segment, that yields [91%, 61%, 72%]. No bytes are copied.
//
// Or suppose we have a buffer with these segment levels: [100%, 2%], and we
// want to append it to a buffer with these segment levels [99%, 3%]. This
// operation will yield the following segments: [100%, 2%, 99%, 3%]. That
// is, we do not spend time copying bytes around to achieve more efficient
// memory use like [100%, 100%, 4%].
//
// When combining buffers, we will compact adjacent buffers when their
// combined level doesn't exceed 100%. For example, when we start with
// [100%, 40%] and append [30%, 80%], the result is [100%, 70%, 80%].
//
//
// Splitting segments
//
// Occasionally we write only part of a source buffer to a sink buffer. For
// example, given a sink [51%, 91%], we may want to write the first 30% of
// a source [92%, 82%] to it. To simplify, we first transform the source to
// an equivalent buffer [30%, 62%, 82%] and then move the head segment,
// yielding sink [51%, 91%, 30%] and source [62%, 82%].
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
//检查操作,读取的byteCount大小不能超过source缓存Buffer的大小。
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
// Is a prefix of the source's head segment all that we need to move?
//要复制的数据size小于source中Segment有效数据的size
if (byteCount < (source.head.limit - source.head.pos)) {
//获取写入Buffer中最后一个可写的Segment
Segment tail = head != null ? head.prev : null;
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
// Our existing segments are sufficient. Move bytes from source's head to our tail.
//如果最后一个Segment存在,并且它是独立的(不是共享其他Segment的),要写的byteCount+有效数据大小<Segment的大小
//也就是说整个Segment的可以容易现存有效数据和要写入的byteCount个数据。
//则将数据byteCount个数据复制过去。
//这里首先会将sink的数组数据整体前移offset,然后在复制byteCount个数据到sink中,意思就是丢弃sink前面offset的数据,腾出空间来放更多的数据,所以是验证byteCount+有效数据大小<Segment的大小。
source.head.writeTo(tail, (int) byteCount);
source.size -= byteCount;
size += byteCount;
return;
} else {
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
//如果要写的byteCount数据不能全部写到最后一个Segment中,那就要考虑将source中的Segment进行拆分了
//拆分之后,就可以将这个byteCount个数组组成的Segment移动到新的Buffer中,不用复制数据
source.head = source.head.split((int) byteCount);
}
}
// Remove the source's head segment and append it to our tail.
//将这个source的Segment从头部移除,添加到自己Buffer的尾部
Segment segmentToMove = source.head;
long movedByteCount = segmentToMove.limit - segmentToMove.pos;
//将这个source的Segment从头部移除
source.head = segmentToMove.pop();
if (head == null) {
head = segmentToMove;
head.next = head.prev = head;
} else {
Segment tail = head.prev;
tail = tail.push(segmentToMove);
//添加了新的Segment之后,看看这个Segment能不能和前一个Segment进行数据合并,节省出一个Segment空间
tail.compact();
}
source.size -= movedByteCount;
size += movedByteCount;
byteCount -= movedByteCount;
}
}
}
这里根据不同情况进行处理。
- 首先判断操作的byteCount是否是在source的当前Segment范围内。如果是在范围内,判断操作的byteCount数据在要写入的Segment中是否容纳的下,如果容纳的下,则将数据复制过去(copy),返回完成操作。如果容纳不下,就考虑进行split拆分,拆分之后,就不考虑复制数据了,而是后面的直接将整个Segment移动到目标Buffer中(move)。
- 接下来就是移动Segment了,直接从soucrce中移除头部的Segment,然后添加到目标Buffer的Segment尾部,添加之后,判断是否需要和前一个Segment进行数据合并,以腾出一个Segment空间。
所以我们可以发现,这个操作非常精彩,对于目标Segment容纳的下的小段数据,采用直接复制的方法,而大段的Segment数据,则是直接移动,而不是复制,只是一个引用指向的变化,那效率超级高啊,这个设计很绝妙,所以说Okio为什么要设计成一小段的Segment,因为段小好操作啊,你要复制多少个数据,我可以根据情况来考虑大段的整个移动,小段的采用复制,而如果像原生IO那一大段的数组,就只能乖乖的采用复制的方法了。
接下来我们分析小段数据的复制,slit分割,compact合并的实现。
先看小段数据的复制
final class Segment {
/** Moves {@code byteCount} bytes from this segment to {@code sink}. */
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
//其实就是将sink数组的数据整体前移pos个位置,丢弃pos之前的数据
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
//将数据复制到sink数组中
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;
}
}
很简单,就是处理了sink数组的整体前移,然后将数据复制到sink中。接下来看split分割的实现
final class Segment {
/**
* Splits this head of a circularly-linked list into two segments. The first
* segment contains the data in {@code [pos..pos+byteCount)}. The second
* segment contains the data in {@code [pos+byteCount..limit)}. This can be
* useful when moving partial segments from one buffer to another.
*
* <p>Returns the new head of the circularly-linked list.
*/
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
//创建一个新的Segment,并且声明它是共享的,即共享另外一个Segment的数据
Segment prefix = new Segment(this);
//这里声明了新Segment的有效数据范围[pos,pos+byteCount],它作为前置的Segment
prefix.limit = prefix.pos + byteCount;
//这里声明了原来Segment的有效数据范围[pos+byteCount,limit],它作为后置的Segment
pos += byteCount;
//将新Segment添加到原来Segment的前面,作为前置Segment
prev.push(prefix);
return prefix;
}
Segment(Segment shareFrom) {
this(shareFrom.data, shareFrom.pos, shareFrom.limit);
shareFrom.shared = true;
}
}
以上将一个Segment拆分为相连的两个Segment,新Segment共享原来Segment的数据,新Segment作为前置Segment,有效范围[pos,pos+byteCount],原来Segment作为后置,有效范围[pos+byteCount,limit]。为什么要split分割?就是为了能将单个Segment逻辑分为两个Segment,以便完成byteCount的独立操作,比如整体移动,而不用进行耗时的复制操作。因为是共享一个数组数据的,所以没有多占用什么内存空间,只是逻辑上分离,有了各自独立的有效区域标识而已,数据还是公用的,这个想法很秒。接下来再看compact合并的实现
final class Segment {
/**
* Call this when the tail and its predecessor may both be less than half
* full. This will copy data so that segments can be recycled.
*/
public void compact() {
if (prev == this) throw new IllegalStateException();
if (!prev.owner) return; // Cannot compact: prev isn't writable.
//当前Segment的有效数据size
int byteCount = limit - pos;
//前一个Segment的可用空间,包括pos之前部分和limit之后的部分
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
//若果前一个Segment的可用空间能容纳当前Segment的数据,则复制数据过去,然后移除当前Segment,交给SegmentPool回收
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
writeTo(prev, byteCount);
pop();
SegmentPool.recycle(this);
}
}
以上判断前一个Segment的可用空间是否能容纳当前Segment的数据,如果能容纳,则复制数据到前一个中,移除当前的Segment,交给SegmentPool回收,这样可用腾出一个Segment空间,这个合并数据操作同样也是个优化操作。
5. Buffer缓存的总结
讲到这里,如果你都明白了的话,我想你已经能体会到Okio优化的精妙之处,构建在原生输入输出流的基础上,舍弃原生IO的缓存功能,自己实现一套流读取和写入的缓存机制,大大提高了缓存使用的效率。我们对比原生IO和Okio缓存处理时,数据从输入流到输出流的工程。
- 原生IO缓存处理过程
InputStream -> inBuffer -> 临时byte[] -> outBuffer -> OutpuStream- Okio缓存处理过程
InputStream -> inBuffer -> outBuffer -> OutpuStream,可以发现少了中转临时byte[]的过程,inBuffer -> outBuffer之间直接交互数据。同时inBuffer和outBuffer很多时候是可以共享Segment数据,这意味这个inBuffer -> outBuffer不是单纯的复制数据,而是可以以Segment为单位,直接从inBuffer去pop移除,然后push添加到outBuffer中,这意味着inBuffer和outBuffer之间的数据很多情况下就是共享的,也就是说会更加趋近与InputStream -> Buffer -> OutpuStream,那么现在是不是更能体会Okio的优化策略,它将之前的inBuffer -> 临时byte[] -> outBuffer优化成一体的了。
6. TimeOut超时机制
TimeOut超时机制有同步实现的TimeOut和异步实现的AsynTimeOut。我们来分析这两个
TimeOut同步超时
同步超时可以作用在sourc和sink中,这里我们以souce为例,sink原理相同
public final class Okio {
/** Returns a source that reads from {@code in}. */
public static Source source(final InputStream in) {
//传入一个默认的Timeout,默认是没有超时效果的
return source(in, new Timeout());
}
private static Source source(final InputStream in, final Timeout timeout) {
if (in == null) throw new IllegalArgumentException("in == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (byteCount == 0) return 0;
//这里就是同步判断是否会超时的,它是阻塞的,如果这个read方法阻塞了,那么它无法进行正常判断了。
timeout.throwIfReached();
Segment tail = sink.writableSegment(1);
int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
if (bytesRead == -1) return -1;
tail.limit += bytesRead;
sink.size += bytesRead;
return bytesRead;
}
@Override public void close() throws IOException {
in.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "source(" + in + ")";
}
};
}
}
我们进去看timeout.throwIfReached()的实现
public class Timeout {
/**
* Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
* thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
* asynchronously abort an in-progress operation.
*/
public void throwIfReached() throws IOException {
//如果线程标记为中断了,抛出线程中断异常
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
//如果设置了有超时限制,并且当前时间超过了超时时间,则抛出超时异常
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
}
同步超时异常还是很简单的,就是在read或者write方法中进行时间判断是否超时。它有缺陷就是如果read或者write发送阻塞了,就不能及时判断是否超时了。所以有了AsynTimeOut异步超时的机制。
AsyncTimeOut异步超时
AsyncTimeOut在Okio中只用作了处理Socket中,当然它也可以用到其他地方,由你实现。我们从这里分析
public final class Okio {
/**
* Returns a source that reads from {@code socket}. Prefer this over {@link
* #source(InputStream)} because this method honors timeouts. When the socket
* read times out, the socket is asynchronously closed by a watchdog thread.
*/
public static Source source(final Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
//获取一个针对Socket的AsyncTimeout
AsyncTimeout timeout = timeout(socket);
//根据socket获取source
Source source = source(socket.getInputStream(), timeout);
//这里是重点,包装一个新的Source来处理TimeOut
return timeout.source(source);
}
private static AsyncTimeout timeout(final Socket socket) {
return new AsyncTimeout() {
@Override protected IOException newTimeoutException(IOException cause) {
InterruptedIOException ioe = new SocketTimeoutException("timeout");
if (cause != null) {
ioe.initCause(cause);
}
return ioe;
}
@Override protected void timedOut() {
//发生超时,关闭Socket
try {
socket.close();
} catch (Exception e) {
logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
} catch (AssertionError e) {
if (e.getCause() != null && e.getMessage() != null
&& e.getMessage().contains("getsockname failed")) {
// Catch this exception due to a Firmware issue up to android 4.2.2
// https://code.google.com/p/android/issues/detail?id=54072
logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e);
} else {
throw e;
}
}
}
};
}
}
上面创建了一个AsycTimeOut对象,用于处理发生超时时关闭Socket。根据Socket得到一个输入流的Source,然后交给AsycTimeOut.source处理得到一个新的Source对象,那么新的Source有什么特别的呢,我们往下看
public class AsyncTimeout extends Timeout {
/**
* Returns a new source that delegates to {@code source}, using this to
* implement timeouts. This works best if {@link #timedOut} is overridden to
* interrupt {@code sink}'s current operation.
*/
public final Source source(final Source source) {
//新建了一个source对象
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
//这里enter中调用了scheduleTimeout,进行TimeOut的调度
enter();
//以下是数据读取,没什么特别的
try {
long result = source.read(sink, byteCount);
throwOnTimeout = true;
return result;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
@Override public void close() throws IOException {
boolean throwOnTimeout = false;
try {
source.close();
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
//这里会从超时队列中移除超时
exit(throwOnTimeout);
}
}
@Override public Timeout timeout() {
return AsyncTimeout.this;
}
@Override public String toString() {
return "AsyncTimeout.source(" + source + ")";
}
};
}
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
//超时时间
long timeoutNanos = timeoutNanos();
//是否有设置超时
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
//调度TimeOut
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
/**
* Returns either {@code cause} or an IOException that's caused by
* {@code cause} if a timeout occurred. See
* {@link #newTimeoutException(java.io.IOException)} for the type of
* exception returned.
*/
final IOException exit(IOException cause) throws IOException {
if (!exit()) return cause;
return newTimeoutException(cause);
}
/** Returns true if the timeout occurred. */
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
//从超时队列移除超时
return cancelScheduledTimeout(this);
}
}
以上就是创建了一个新的Source,在Source每次在read的之前,调用enter,其实就是scheduleTimeout来调度AsyncTimeout,而完成read时,调用exit,其实就是cancelScheduledTimeout来取消AsyncTimeout超时的调度,我们来分析
public class AsyncTimeout extends Timeout {
//调度AsyncTimeout,将AsyncTimeout按顺序加入到队列中
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
//启动一个叫看门狗的线程
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
//这里计算真正的超时到期时间
long now = System.nanoTime();
if (timeoutNanos != 0 && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
// Math.min() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
// Insert the node in sorted order.
//根据超时时间大小,插入这个超时AsyncTimeout到正确位置
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
//如果超时AsyncTimeout插入到最头部,则唤醒看门狗进行超时监听了
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
/** Returns true if the timeout occurred. */
//取消调度AsyncTimeout,将AsyncTimeout从队列中移除
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {
prev.next = node.next;
node.next = null;
return false;
}
}
// The node wasn't found in the linked list: it must have timed out!
return true;
}
}
以上就是将AsyncTimeout异步超时加入到超时队列中,同时可能需要开启看门狗进行监听(如果看门狗没有启动的话),我们在看看看门狗Watchdog的实现。
public class AsyncTimeout extends Timeout {
private static final class Watchdog extends Thread {
public Watchdog() {
super("Okio Watchdog");
//设置为守护线程
setDaemon(true);
}
public void run() {
while (true) {
try {
//这里是查找一个超时的AsyncTimeout
AsyncTimeout timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
//没找到超时AsyncTimeout,继续
if (timedOut == null) continue;
// Close the timed out node.
//找到超时AsyncTimeout,就通知超时了
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
/**
* Removes and returns the node at the head of the list, waiting for it to
* time out if necessary. Returns null if the situation changes while waiting:
* either a newer node is inserted at the head, or the node being waited on
* has been removed.
*/
private static synchronized AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait for something to be enqueued.
//如果超时队列空的话,wait
if (node == null) {
AsyncTimeout.class.wait();
return null;
}
//当前AsyncTimeout离超时还剩多少时间
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
//等待第一个AsyncTimeout离超时的时间
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
}
// The head of the queue has timed out. Remove it.
//到超时了,移除这个超时的AsyncTimeout,返回
head.next = node.next;
node.next = null;
return node;
}
}
以上就是在看门狗这个线程中,不断遍历超时队列,超时队列是根据超时时间排序的,第一个是里超时时间最近的,所以每次从第一个进行判断离超时还剩多少时间,然后wait等待,以让出CPU,当等待时间到了之后,继续判断,如果队列中还存在超时,则从超时队列移除,并通知超时。所以enter负责添加超时,exit负责移除超时,如果read,write方法发生超时了,那么exit不能正常移除超时,看门狗监听g该超时的时间到了,就能抛出超时通知了。
public class AsyncTimeout extends Timeout {
/**
* Invoked by the watchdog thread when the time between calls to {@link
* #enter()} and {@link #exit()} has exceeded the timeout.
*/
protected void timedOut() {
}
}
默认的timeOut没有实现,针对socket的超时实现是关闭Socket,关闭了Socket之后,IO阻塞就会抛出IO异常了,阻塞也就中断了。你可以实现你自己的超时发生的操作。
总体来说,默认情况下,超时机制没有开启的,但是有实现超时功能,开启的话需要指定超时的时间,需要用户自己去实现。使用同步超时还是异步超时,可以自己根据情况去使用。
7. Gzip压缩简要分析
Okio同时也是支持Gzip压缩的,当然Gzip压缩并不是自己实现的,而是使用Java zip包中的CRC32来进行数据的压缩。这里我们简单分析GzipSink来看看Gzip压缩是怎么实现的。
GzipSink 实现了Sink接口,是带有压缩功能的Sink,会将要写入的数据压缩之后再写入,内部有CRC32对象负责将原生sink的数据进行Gzip压缩,然后由DeflaterSink对象负责将压缩后的数据进行整理并执行写入。
GzipSource 实现了Source接口,是带有解压缩功能的Source,实现原理和GzipSink相反,由InflaterSource读取压缩的数据并整理,然后CRC32解压缩数据,得到原始的数据。
8. 总结
Okio还有还有诸如ByteString等,包括之前讲的缓存机制,超时机制,都是为了使IO流的操作更加简单高效,弥补原始IO操作效率不高的问题。Okio重点优化的就是缓存处理方面,目的就是优化原生IO不足的地方,所以说它不是Okhttp的附属库,是可以很方便我们日常进行IO操作时使用的,它有更简洁API使用方式和更高效的IO处理方式,有了Okio,我感觉再也不想用原生IO中复杂的使用方式啦。
我的博客
GitHub
我的简书
群号:194118438,欢迎入群
微信公众号 hesong ,微信扫一扫下方二维码即可关注: