Ok I/O 简析
java 的NIO和阻塞I/O
- 阻塞I/O通信模式:调用InputStream.read()方法时是阻塞的,它会一直等到数据到来时才返回
- NIO通信模式:在JDK1.4开始,是一种非阻塞I/O,在Java NIO的服务端由一个专门的线程来处理所有I/O事件,并负责分发;线程之间通讯通过wait和notify等方式。
输入与输出
Sink 与 Source在JDK里面有InputStream和OutputStream两个接口,Source和Sink类似于InputStream和OutputStream,是io操作的顶级接口类,这两个接口均实现了Closeable接口。所以可以把Source简单的看成InputStream,Sink简单看成OutputStream。
子类
其中 BufferedXXX 对
write()
方法扩展了许多参数,是一个接口,而 RealBufferedXXX 则是真正的具体实现其他子类
- Sink 和Source 它们还有各自的支持gzip压缩的实现类GzipSink和GzipSource
- 具有委托功能的抽象类ForwardingSink和ForwardingSource,它们的具体实现类是InflaterSource和DeflaterSink,这两个类主要用于压缩,为GzipSink和GzipSource服务。
Segment
okio将数据分割成一块块的片段,内部维护者固定长度的byte[]数组,同时segment拥有前面节点和后面节点,构成一个双向循环链表。
分片中使用数组存储,兼具读的连续性,以及写的可插入性,对比单一使用链表或者数组,是一种折中的方案,读写更快,而且有个好处根据需求改动分片的大小来权衡读写的业务操作,另外,segment也有一些内置的优化操作
segment
compact()
方法(压缩机制)
除了写入数据之外,segment还有一个优化的技巧,因为每个segment的片段size是固定的,为了防止经过长时间的使用后,每个segment中的数据被分割的十分严重,可能一个很小的数据却占据了整个segment,所以有了一个压缩机制。
public void compact() {
//上一个节点就是自己,意味着就一个节点,无法压缩,抛异常
if (prev == this) throw new IllegalStateException();
//如果上一个节点不是自己的,所以你是没有权利压缩的
if (!prev.owner) return; // Cannot compact: prev isn't writable.
//能进来说明,存在上一个节点,且上一个节点是自己的,可以压缩
//记录当前Segment具有的数据,数据大小为limit-pos
int byteCount = limit - pos;
// 统计前结点是否被共享,如果共享则只记录Size-limit大小,
// 如果没有被共享,则加上pre.pos之前的空位置;
//本行代码主要是获取前一个segment的可用空间。
// 先判断prev是否是共享的,如果是共享的,则只记录SIZE-limit,
// 如果没有共享则记录SIZE-limit加上prev.pos之前的空位置
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
//判断prev的空余空间是否能够容纳Segment的全部数据,不能容纳则返回
if (byteCount > availableByteCount) return;
//能容纳则将自己的这个部分数据写入上一个Segment
writeTo(prev, byteCount);
//讲当前Segment从Segment链表中移除
pop();
//回收该Segment
SegmentPool.recycle(this);
}
总结下上述代码:如果前面的Segment是共享的,那么不可写,也不能压缩,接着判断前一个的剩余大小是否比当前空间大,如果有足够的空间来容纳数据,调用前面的writeTo方法写入数据,写完以后,移除当前segment,并回收segment。
split()
方法(共享机制)
为了减少数据复制带来的性能开销。
先把Segment一分为二,将(pos + 1, pos + btyeCount - 1)的内容给新的Segment,将(pos + byteCount, limit - 1)的内容留给自己.
SegemtnPool
SegmentPool是一个Segment池,由一个单项链表构成。该池负责Segment的回收和闲置Segment管理,也就是说Buffer使用的Segment是从Segment单项链表中取出的,这样有效的避免了GC频率.
//一个Segment记录的最大长度是8192,因此SegmentPool只能存储8个Segment
static final long MAX_SIZE = 64 * 1024;
//该SegmentPool存储了一个回收Segment的链表
static Segment next;
//该值记录了当前所有Segment的总大小,最大值是为MAX_SIZE
static long byteCount;
它的两个重要的方法 take()
, recycle()
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
static void recycle(Segment segment) {
//如果这个要回收的Segment被前后引用,则无法回收
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
//如果这个要回收的Segment的数据是分享的,则无法回收
if (segment.shared) return; // This segment cannot be recycled.
//加锁
synchronized (SegmentPool.class) {
//如果 这个空间已经不足以再放入一个空的Segment,则不回收
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
//设置SegmentPool的池大小
byteCount += Segment.SIZE;
//segment的下一个指向头
segment.next = next;
//设置segment的可读写位置为0
segment.pos = segment.limit = 0;
//设置当前segment为头
next = segment;
}
}
ByteString
ByteString存储的是不可变比特序列.final btye[] data
本质就是一个byte序列(数组),以制定的编码格式进行解码。目前支持的解码规则有hex,base64和UTF-8等。
Buffer
Buffer存储的是可变比特序列,需要注意的是Buffer内部对比特数据的存储不是直接使用一个byte数组那么简单,它使用了一种新的数据类型Segment进行存储。
Buffer持有一个Segment的引用,所以通过这个引用能拿到整个链表中的所有数据。
同时Buffer实现了三个接口,读,写以及clone。
//看到注释的第一句话,我就知道了是深拷贝,哈哈!
/** Returns a deep copy of this buffer. */
@Override public Buffer clone() {
//先new了一个Buffer对象
Buffer result = new Buffer();
//如果size==0,说明这个Buffer是空的,所以直接返回即可
if (size == 0) return result;
//如果size!=0,说明这个Buffer是有数据的,然后吧head指向这个copy的head,PS大家回想下Segment的这个构造函数,里面是怎么操作的?
result.head = new Segment(head);
//然后设置copy的head的next和prev的值
result.head.next = result.head.prev = result.head;
//开始遍历这个Buffer持有的Segment链了
for (Segment s = head.next; s != head; s = s.next) {
result.head.prev.push(new Segment(s));
}
result.size = size;
return result;
}
超时
private static final class Watchdog extends Thread {
public Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
这里的WatchDog只是一个继承于Thread的一类,里面的run方法执行的就是超时的判断,之所以在socket写时采取异步超时,这完全是由socket自身的性质决定的,socket经常会阻塞自己,导致下面的事情执行不了。
enter() 和 exit()
public final Source source(final Source source) {
return new Source() {
@Override public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
long result = source.read(sink, byteCount);
throwOnTimeout = true;
return result;
} catch (IOException e) {
throw exit(e);
} finally {
exit(throwOnTimeout);
}
}
}
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
//head==null,表明之前没有,本次是第一次操作,开启Watchdog守护线程
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
//如果有 deadLine,并且超时时长不为0
if (timeoutNanos != 0 && hasDeadline) {
//对比最长限制和超时时长,选择最小的那个值
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
//如果没有最长限制,但是超时时长不为0,则使用超时时长
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
//如果有最长限制,但是超时时长为0,则使用最长限制
node.timeoutAt = node.deadlineNanoTime();
} else {
//如果既没有最长限制,和超时时长,则抛异常
throw new AssertionError();
}
// 按照排序顺序插入
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
//如果下一个为null或者剩余时间比下一个短 就插入node
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
// 唤醒 watchdog
AsyncTimeout.class.notify();
}
break;
}
}
}
Okio的特点
- 它对数据进行了分块处理(Segment),这样在大数据IO的时候可以以块为单位进行IO,这可以提高IO的吞吐率
- 它对这些数据块使用链表来进行管理,这可以仅通过移动指针就进行数据的管理,而不用真正的处理数据,而且对扩容来说十分方便.
- 闲置的块进行管理,通过一个块池(SegmentPool)的管理,避免系统GC和申请byte时的zero-fill。
- 为所有的Source、Sink提供了超时操作,这是在Java原生IO操作是没有的。
- okio它对数据的读写都进行了封装,调用者可以十分方便的进行各种值(Stringg,short,int,hex,utf-8,base64等)的转化。