Okio源码分析
1. 简单关系结构
在okio中,使用Source/Sink接口替代in/outputstream;
先看一下简单的继承关系,Source和Sink差不多,这里用Sink举例
final class RealBufferedSink implements BufferedSink{}
public interface BufferedSink extends Sink, WritableByteChannel{}
我们简单看一下Sink的接口
public interface Sink extends Closeable, Flushable {
void write(Buffer source, long byteCount) throws IOException;
@Override void flush() throws IOException;
Timeout timeout();
@Override void close() throws IOException;
}
Sink接口声明了基本的write/flush/close方法,以及一个超时方法timeout。这个超时方法我们最后会详细解说一下。
在看看子接口BufferedSink,也就是RealBufferedSink直接实现的接口
public interface BufferedSink extends Sink, WritableByteChannel {
/** Returns this sink's internal buffer. */
Buffer buffer();
BufferedSink write(ByteString byteString) throws IOException;
BufferedSink write(byte[] source) throws IOException;
BufferedSink write(byte[] source, int offset, int byteCount) throws IOException;
long writeAll(Source source) throws IOException;
BufferedSink write(Source source, long byteCount) throws IOException;
BufferedSink writeUtf8(String string) throws IOException;
BufferedSink writeUtf8(String string, int beginIndex, int endIndex) throws IOException;
···
BufferedSink emit() throws IOException;
BufferedSink emitCompleteSegments() throws IOException;
OutputStream outputStream();
}
可以看到,他在Sink的基础上声明了更多的write重载方法,类似flush的方法emit,以及转换成OutputStream的方法
下面我们看一下实现上面各个方法的类 RealBufferedSink。
2.RealBufferedSink
先看一下属性和构造方法
final class RealBufferedSink implements BufferedSink {
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
RealBufferedSink(Sink sink) {
if (sink == null) throw new NullPointerException("sink == null");
this.sink = sink;
}
```
}
持有一个Buffer(缓冲区)和又一个Sink。实例化的时候需要Sink
我们看一下简单的写入方法,其他重载方法大同小异
@Override public BufferedSink write(ByteString byteString) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(byteString);//首先将byteString写入buffer中
return emitCompleteSegments();
}
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();//获取写入完毕的segment中的字节数
if (byteCount > 0) sink.write(buffer, byteCount);//写入底层的sink
return this;//return
}
由此可以看到,对于写入数据,实际上先把数据写入buffer,再把buffer中的数据写入内部持有的实际底层sink中。
@Override public BufferedSink emit() throws IOException {//和上面不同,这个方法是把所有的字节都写入底层sink
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.size();
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
}
其他方法大同小异,暂时先不提及了。我们接下来看看Buffer。数据是先进入Buffer,再交给底层处理的。
3.Buffer
先看一下类注释是怎么说的。
内存中的字节集合。
此类通过更改底层字节数组的所有权而不是对字节的复制实现了在Buffer之间字节的快速移动。
类似于ArrayList,Buffer刚开始很小,会随数据而增长,并只会占用需要的内存空间。
在java中当你请求一个bytearray时,虚拟机一定会返回给你一个填充满0的数组,即使你本来就打算覆盖数组,不需要他清空。Buffer通过存储bytearray缓存池的方式避免零填充和GC引起的性能流失。
再看看属性和构造方法
public final class Buffer implements BufferedSource, BufferedSink, Cloneable, ByteChannel {
private static final byte[] DIGITS =
{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
static final int REPLACEMENT_CHARACTER = '\ufffd';
@Nullable Segment head;
long size;
public Buffer() {
}
}
buffer同时实现了BufferedSink,BufferedSource。其中有一个Segment ,就是用于存储数据的了。head,应该是头节点。那我们首先看看这个Segment的结构。
3.1 Segment
先简单看一下注释:
Buffer的一个片段。
在Buffer中的Segments是一个双向环状链表。
在缓存池中的Segments是个单向链表。
首先是属性字段
static final int SIZE = 8192;//大小
static final int SHARE_MINIMUM = 1024;//最小分享段
final byte[] data; //用于存储数据的byte数组
int pos;//读取位置指针
int limit;//写入的位置指针
boolean shared;//data是不是处于共享状态
boolean owner;//是不是data数组的owner
Segment next;
Segment prev;//前后指针可以得出是个环形链表
构造方法及创建:
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
Segment(byte[] data, int pos, int limit, boolean shared, boolean owner) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.shared = shared;
this.owner = owner;
}
Segment sharedCopy() {
shared = true;//设置共享状态
return new Segment(data, pos, limit, true, false);//通过共享byte数组生成一个新的copy
}
Segment unsharedCopy() {
return new Segment(data.clone(), pos, limit, false, true);//通过clone生成一个
}
下面看一下写入的方法
//这个方法是把本segment中的数据写入sink里面
public void writeTo(Segment sink, int byteCount) {
if (!sink.owner) throw new IllegalArgumentException();//如果不是owner是不允许写入的
if (sink.limit + byteCount > SIZE) {//如果写入之后会超出最大容量
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
if (sink.shared) throw new IllegalArgumentException();//共享状态,不可左移
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);//左移sink数据
sink.limit -= sink.pos;//修改开始和结束的指针
sink.pos = 0;
}
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);//复制数据过去
sink.limit += byteCount;//修改sink数组尾指针
pos += byteCount;//修改本segment的头指针
}
/**压缩优化
* 尾部和前一个节点的数据都不到一半时,调用此方法把这个节点的数据压缩到上个节点,并回收此节点
*/
public void compact() {
if (prev == this) throw new IllegalStateException();//只有一个节点,无法压缩
if (!prev.owner) return; // 上一个节点不可写入
int byteCount = limit - pos;//本节点数据长度
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);//可压缩长度,
//上一个节点的剩余和索引之前的空间 如果上一个节点是共享状态,那么其数据不可左移,可利用的就只有剩余空间
if (byteCount > availableByteCount) return; //数据长度大于上一个节点的剩余空间,无法压缩
writeTo(prev, byteCount);//写入上一个节点
pop();//出队
SegmentPool.recycle(this);//回收
}
//split方法 把一个节点根据byteCount分割成两个
//这里使用了共享机制
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
if (byteCount >= SHARE_MINIMUM) {//如果要分割的内容>SHARE_MINIMUM
prefix = new Segment(this);//使用共享机制创建新的segment
} else {
prefix = SegmentPool.take();//从缓存池中取出一个
System.arraycopy(data, pos, prefix.data, 0, byteCount);//复制要分隔的数据过去
}
prefix.limit = prefix.pos + byteCount;//修改新节点尾指针
pos += byteCount;//修改本节点的头指针
prev.push(prefix);//把新节点插入到本节点的前面
return prefix;
}
由上面几个方法可知,在一个segment中有头尾两个指针,指针之间的byte才是有效数据。这样就避免了很多数组位移的情况。
到这里segment的结构我们分析的就差不多了,其余的方法比如push是链表相关的,我们就不分析了。下面简单看一下segment缓存池。
3.2 SegmentPool
static final long MAX_SIZE = 64 * 1024; // 64 KiB.最大容量
/** Singly-linked list of segments. */
static @Nullable Segment next; //单链表next 用于回收
//该值记录了当前所有Segment的总大小,最大值是为MAX_SIZE
static long byteCount;//
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) { //池中有segment,取出
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.//共享的segment不能被回收
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;//插入回收链表
segment.pos = segment.limit = 0;//置空
next = segment;
}
}
没啥好说的,很多缓存池都是怎么实现的。下面我们在看一下之前有见到过的参数类型
3.3 ByteString
是一个不可变比特序列
实际就是一个 final byte[],提供utf-8等处理方法
final byte[] data;
transient String utf8; // Lazily computed.
内部同时保存byte和utf8 string,节省互相转换性能
最后就是本节的重点,缓冲Buffer了
3.4 Buffer (okio读写核心)
首先我们看一下Buffer中写入数据的方法
首先是根据字节数组写入
@Override public Buffer write(byte[] source, int offset, int byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
checkOffsetAndCount(source.length, offset, byteCount);//验证参数数据合法
int limit = offset + byteCount;//计算要写入的数据范围
while (offset < limit) {//循环
Segment tail = writableSegment(1);//申请一个容量至少为1的尾部节点
int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit);//计算本次写入的大小
System.arraycopy(source, offset, tail.data, tail.limit, toCopy);//写入尾节点
offset += toCopy;
tail.limit += toCopy;
}
size += byteCount;
return this;
}
逻辑比较简单,把要写的数据按照segment大小分成多个碎片写入。
其他一些重载方法大同小异,这里先不介绍了。
然后我们看这个方法,根据其他Buffer写入。
@Override public void write(Buffer source, long byteCount) {
if (source == null) throw new IllegalArgumentException("source == null");
if (source == this) throw new IllegalArgumentException("source == this");
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {//循环
// Is a prefix of the source's head segment all that we need to move?
if (byteCount < (source.head.limit - source.head.pos)) {//如果只是头部要写过来
Segment tail = head != null ? head.prev : null;//获取尾节点
if (tail != null && tail.owner
&& (byteCount + tail.limit - (tail.shared ? 0 : tail.pos) <= Segment.SIZE)) {
//是否可以直接写入尾节点
// Our existing segments are sufficient. Move bytes from source's head to our tail.
source.head.writeTo(tail, (int) byteCount);//把头部写入
source.size -= byteCount;
size += byteCount;
return;
} else {//不能直接写入尾节点
// We're going to need another segment. Split the source's head
// segment in two, then move the first of those two to this buffer.
source.head = source.head.split((int) byteCount);//把其头节点按照byteCount分成两个
}
}
// Remove the source's head segment and append it to our tail.
Segment segmentToMove = source.head;//
long movedByteCount = segmentToMove.limit - segmentToMove.pos;//要移动的碎片
source.head = segmentToMove.pop();//移除source的头节点
if (head == null) {
head = segmentToMove;//设置给本buffer
head.next = head.prev = head;
} else {
Segment tail = head.prev;//设置成我们的为节点
tail = tail.push(segmentToMove);
tail.compact();//尝试压缩
}
source.size -= movedByteCount;//修改指针
size += movedByteCount;
byteCount -= movedByteCount;
}
}
4 TimeOut
最后我们看一下OKIO的timeout机制
我们先看一下普通的TimeOut类
4.1 Timeout
首先是三个属性
private boolean hasDeadline;//是否设置整个io的截止时间
private long deadlineNanoTime;//截止时间
private long timeoutNanos;//一次操作的最大时间
这三个属性网上很多地方说的很清楚了,这里先不写了。我们看看主要的几个方法
首先是这个
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
方法看起来很简单,每次调用此方法会判读当前线程是否中断或者是否设置了Deadline且超时了,如果是就抛出异常。
那么在流的输入输出中什么时候调用此方法呢?
是在Okio把流包装成Sink或者Source的时候
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
···
};
}
可以看到,在循环写入或者读取的时候就会调用此方法判断是不是超时了。相对于一会儿要看的AsyncTimeout,这个方法是同步进行的。所以如果要重写Timeout的此判断方法的话,最好不要做太大的耗时操作~
下面看另一个方法
public final void waitUntilNotified(Object monitor) throws InterruptedIOException {
try {
boolean hasDeadline = hasDeadline();
long timeoutNanos = timeoutNanos();
if (!hasDeadline && timeoutNanos == 0L) {//没有需要检测的超时设定
monitor.wait(); //直接wait
return;
}
// 计算我们要wait多久
long waitNanos;
long start = System.nanoTime();//当前时间
//下面的判断会根据设置选出最近的判断超时时间
if (hasDeadline && timeoutNanos != 0) {
long deadlineNanos = deadlineNanoTime() - start;
waitNanos = Math.min(timeoutNanos, deadlineNanos);
} else if (hasDeadline) {
waitNanos = deadlineNanoTime() - start;
} else {
waitNanos = timeoutNanos;
}
// Attempt to wait that long. This will break out early if the monitor is notified.
long elapsedNanos = 0L;
if (waitNanos > 0L) {
long waitMillis = waitNanos / 1000000L;
monitor.wait(waitMillis, (int) (waitNanos - waitMillis * 1000000L));//开始等待
elapsedNanos = System.nanoTime() - start;//wait结束,计算这次的耗时时间
}
// Throw if the timeout elapsed before the monitor was notified.
if (elapsedNanos >= waitNanos) {//如果耗时时间>=最近的超时时间
throw new InterruptedIOException("timeout");//报错
}
} catch (InterruptedException e) {
throw new InterruptedIOException("interrupted");
}
}
此方法会对传入的obj参数进行wait,直到被唤醒或者wait时间到达了超时判定的时间,然后根据wait的时间来判断本次操作是否超时了。注释中也说明了调用者必须对obj参数添加内置锁。我们找一个调用的地方看一下。这里提到了Pipe类。
简单来说,Pipe的作用就是:生产者把数据通过他的Sink写入数据,消费者在其他线程可以通过Source读取数据
public final class Pipe {
final long maxBufferSize;
final Buffer buffer = new Buffer();
boolean sinkClosed;
boolean sourceClosed;
private final Sink sink = new PipeSink();
private final Source source = new PipeSource();
···
final class PipeSink implements Sink {
final Timeout timeout = new Timeout();
@Override public void write(Buffer source, long byteCount) throws IOException {
synchronized (buffer) {
if (sinkClosed) throw new IllegalStateException("closed");
while (byteCount > 0) {
if (sourceClosed) throw new IOException("source is closed");
long bufferSpaceAvailable = maxBufferSize - buffer.size();//剩余的缓存空间
if (bufferSpaceAvailable == 0) {//没有剩余空间了
timeout.waitUntilNotified(buffer); // wait在这里等待并计算是否超时
continue;
}
long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount);
buffer.write(source, bytesToWrite);
byteCount -= bytesToWrite;
buffer.notifyAll(); // 有新数据进来了,通知读取数据的线程notify不再等待
}
}
}
···
}
final class PipeSource implements Source {
final Timeout timeout = new Timeout();
@Override public long read(Buffer sink, long byteCount) throws IOException {
synchronized (buffer) {
if (sourceClosed) throw new IllegalStateException("closed");
while (buffer.size() == 0) {
if (sinkClosed) return -1L;
timeout.waitUntilNotified(buffer); // 没有剩余数据了,wait等待sink写入数据并计算超时
}
long result = buffer.read(sink, byteCount);
buffer.notifyAll(); //读取消费了数据,通知写入数据的线程有新的空间可用了,可以不再等待了~
return result;
}
}
···
}
}
同步的Timeout看完了,我们再看看他的其他子类
4.2 AsyncTimeout
先看一下注释
使用后台线程在超时发生时准确地采取措施。 使用它可以实现原声不支持的超时,例如写入时被阻塞的套接字。
子类应该重写{@link #timedOut}以便在发生超时时采取措施。 这个方法将由共享的监视程序线程调用,因此它不应执行任何长时间运行的操作。
使用{@link #sink}和{@link #source}将此超时应用于流。 返回值将Timeout应用于包装流上的每个操作。
在进行工作之前和之后,应调用{@link #enter}和{@link #exit}。 {@link #exit}的返回值指示是否触发了超时。
请注意,对{@link #timedOut}的调用是异步的,在{@link #exit}之后调用。
根据注释提到的,我们在Okio的对Socket的包装方法中找到了对AsyncTimeout的使用。
public static Source source(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
if (socket.getInputStream() == null) throw new IOException("socket's input stream == null");
AsyncTimeout timeout = timeout(socket);
Source source = source(socket.getInputStream(), timeout);
return timeout.source(source);
}
在 Source source = source(socket.getInputStream(), timeout); 这一行代码之前看到过了,实际上就是把socket的stream转换成了source并添加了基本的超时判断。而最后这里又通过timeout.source(source)对source又包了一层,我们看看这一层都干了啥。
public final Source source(final Source source) {
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
long result = source.read(sink, byteCount);
throwOnTimeout = true;
return result;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
···
}
正如注释所说,在写之前调用了enter方法,在写之后调用了exit方法。
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
public final boolean exit() {
if (!inQueue) return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
通过更改inQueue状态并调用scheduleTimeout,cancelScheduledTimeout方法。
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();//生成一个新节点设置成头节点
new Watchdog().start();//开启一个新的Watchdog线程并运行
}
long now = System.nanoTime();
if (timeoutNanos != 0 && hasDeadline) {
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
}
//上面的方法和我们之前看的同步的计算超时时间的一样,都是找出一个最近的超时时限。
long remainingNanos = node.remainingNanos(now);//计算并设置超时时间间隔给node节点
for (AsyncTimeout prev = head; true; prev = prev.next) {//循环遍历当前所有的Async节点
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {//按照超时时限从小到大的顺序插入此节点
node.next = prev.next;//
prev.next = node;
if (prev == head) {//如果是第一个插入的
AsyncTimeout.class.notify(); //notify 这里要唤醒的是啥呢?等下再解释
}
break;
}
}
}
下面看一下exit时调用的方法
private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
// Remove the node from the linked list.
for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
if (prev.next == node) {//移除此节点
prev.next = node.next;
node.next = null;
return false;//返回false
}
}
// The node wasn't found in the linked list: it must have timed out!
return true;
}
上面提到如果再队列中找不到此节点,那一定是已经超时了。为什么这么说呢,这就要看看WatchDog线程究竟干了啥。
public void run() {
while (true) {//死循环
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();//获取一个超时的节点
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue; //没有超时的节点
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {//队列彻底空了,没有节点了,只剩下头节点了
head = null;
return;//直接退出
}
}
// Close the timed out node.
timedOut.timedOut();//调用超时节点的timedOut方法。
} catch (InterruptedException ignored) {
}
}
}
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next; //获取下一个有效节点(头节点不算的)
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {//当前队列是空的 等待
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS);//等待直到有节点插入时被唤醒或者时间到了
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // 闲置时间到了还没有新节点插入,直接返回head 在上面run方法中就会置空head并关闭watchdog线程了
: null; // 有新的节点插入,此时返回null,run方法会进行下一次循环继续调用此方法来获取next节点
}
long waitNanos = node.remainingNanos(System.nanoTime());//node节点还有多久超时
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {//还没有超时,则在这里等待。因为队列中的节点是按照剩余时间排序的,所以next没有超时则剩下的节点肯定还早呢。
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;//返回null,进行下一次循环
}
// The head of the queue has timed out. Remove it.//节点超时了,移除此节点并返回node watchdog会调用其timeout方法
head.next = node.next;
node.next = null;
return node;
}
到这里,异步的超时类我们基本就看完了,最后在整理一下逻辑:
首先在每次写入之前向队列中插入一个带有当前各种超时信息的节点,按照超时先后时限排列。而Watchdog线程会获取n头节点的下一个节点做判断,如果节点没有超时,在这里等待直到时间耗尽,再重新获取next节点做判断。在这段时间内如果写入完毕了,exit方法将此节点移除了,watchdog第二次获取next节点时获取的就是下一个节点了,会继续判断。而如果写入时间过长,watchdog等待完了再循环发现这个节点还在,waitNanos 等于0了,说明已经超时了,调用超时方法并移除此节点。cancelScheduledTimeout方法再运行的时候自然就找不到此节点了。