OKHttp源码解析(五)--OKIO简介及FileSystem
- 1.OkHttp源码解析(一):OKHttp初阶
- 2 OkHttp源码解析(二):OkHttp连接的"前戏"——HTTP的那些事
- 3 OkHttp源码解析(三):OKHttp中阶之线程池和消息队列
- 4 OkHttp源码解析(四):OKHttp中阶之拦截器及调用链
- 5 OkHttp源码解析(五):OKHttp中阶之OKio简介
- 6 OkHttp源码解析(六):OKHttp中阶之缓存基础
- 7 OkHttp源码解析(七):OKHttp中阶之缓存机制
- 8 OkHttp源码解析(八):OKHttp中阶之连接与请求值前奏
- 9 OkHttp源码解析(九):OKHTTP连接中三个"核心"RealConnection、ConnectionPool、StreamAllocation
- 10 OkHttp源码解析(十) OKHTTP中连接与请求
- 11 OkHttp的感谢
本来我打算OKHttp源码解析(四) 是写OKHTTP的缓存,最后再单独写OKIO的,但是发现里面运用到了OKIO,而且后面讲连接的时候也要涉及到OKIO,所以我就把OKIO拿到前面来,这样大家在读缓存源码和连接的时候更清楚。
本篇文章的大纲如下:
- 1.什么是OKIO
- 2.如何使用OKIO
- 3.Sink和Source及其实现
- 4.Segment和SegmentPool解析
- 5.不可变的ByteString
- 6.最核心的Buffer解析
- 7.okio中的超时机制
- 8.okio的优雅之处
- 9.FileSystem
一、okio
说道okio就不能不提JDK里面io,那么咱们先简单说下JDK里面的io。
(一)、javaNIO和阻塞I/O:
- 1、阻塞I/O通信模式:调用InputStream.read()方法时是阻塞的,它会一直等到数据到来时才返回
- 2、NIO通信模式:在JDK1.4开始,是一种非阻塞I/O,在Java NIO的服务端由一个专门的线程来处理所有I/O事件,并负责分发;线程之间通讯通过wait和notify等方式。
(二)、okio
okio是由square公司开发的,它补充了java.io和java.nio的不足,以便能够更加方便,快速的访问、存储和处理你的数据。OKHttp底层也是用该库作为支持。而且okio使用起来很简单,减少了很多io操作的基本代码,并且对内存和CPU使用做了优化,他的主要功能封装在ByteString和Buffer这两个类中。
okio的作者认为,javad的JDK对字节流和字符流进行分开定义这一世情,并不是那么优雅,所以在okio并不进行这样划分。具体的做法就是把比特数据都交给Buffer管理,然后Buff实现BufferedSource和BufferedSink这两个接口,最后通过调用buffer相应的方法对数据进行编码。
二、okio的使用
假设我有一个hello.txt文件,内容是hello jianshu,现在我用okio把它读出来。
那我们先理一下思路:读取文件的步骤是首先要拿到一个输入流,okio封装了许多输入流,统一使用的方法重载source方法转换成一个source,然后使用buffer方法包装成BufferedSource,这个里面提供了流的各种操作,读String,读byte,short等,甚至是16进制数。这里直接读出文件的内容,十分简单,代码如下:
public static void main(String[] args) {
File file = new File("hello.txt");
try {
readString(new FileInputStream(file));
} catch (IOException e) {
e.printStackTrace();
}
}
public static void readString(InputStream in) throws IOException {
BufferedSource source = Okio.buffer(Okio.source(in)); //创建BufferedSource
String s = source.readUtf8(); //以UTF-8读
System.out.println(s); //打印
source.close();
}
--------------------------------------输出-----------------------------------------
hello jianshu
Okio是对Java底层io的封装,所以底层io能做的Okio都能做。
上面的大体流程如下:
第一步,首先是调用okio的source(InputStream in)方法获取Source对象
第二步,调用okio的buffer(Source source)方法获取BufferedSource对象
第三步,调用BufferedSource的readUtf8()方法读取String对象
第四步,关闭BufferedSource
总结下大体流程如下:
- 1.构建缓冲池,缓冲源对象
- 2.读写操作
- 3.关闭缓冲池
三、Sink和Source及其实现
(一)、Sink和Source
在JDK里面有InputStream和OutputStream两个接口,由于okio作者认为Inputsream和OutputStream中的available()和读写单字节的方法纯属鸡肋,所以怼出Source和Sink接口。Source和Sink类似于InputStream和OutputStream,是io操作的顶级接口类,这两个接口均实现了Closeable接口。所以可以把Source简单的看成InputStream,Sink简单看成OutputStream。结构图如下图:
okio1.png
其中Sink代表的是输出流,Source代表的是输入流,这两个基本上都是对称的。那我们先来分析下Sink,代码如下:
public interface Sink extends Closeable, Flushable {
/** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
// 定义基础的write操作,该方法将字节写入Buffer
void write(Buffer source, long byteCount) throws IOException;
/** Pushes all buffered bytes to their final destination. */
@Override void flush() throws IOException;
/** Returns the timeout for this sink. */
Timeout timeout();
/**
* Pushes all buffered bytes to their final destination and releases the
* resources held by this sink. It is an error to write a closed sink. It is
* safe to close a sink more than once.
*/
@Override void close() throws IOException;
}
public interface Source extends Closeable {
/**
* Removes at least 1, and up to {@code byteCount} bytes from this and appends
* them to {@code sink}. Returns the number of bytes read, or -1 if this
* source is exhausted.
*/
// 定义基础的read操作,该方法将字节写入Buffer
long read(Buffer sink, long byteCount) throws IOException;
/** Returns the timeout for this source. */
Timeout timeout();
/**
* Closes this source and releases the resources held by this source. It is an
* error to read a closed source. It is safe to close a source more than once.
*/
@Override void close() throws IOException;
}
虽然Sink和Source只定义了很少的方法,这也是为何说它容易实现的原因,但是我们一般在使用过程中,不直接拿它使用,而是使用BufferedSink和BufferedSouce对接口的封装,因为在BufferedSinke和BufferedSource接口定义了一系列好用的方法。
(二)、 BufferedSinke和 BufferedSource
看源码可知BufferedSink和BufferedSource定义了很多方便的方法如下图:
BufferedSinke.png
但是发现BufferedSink和BufferedSource两个都是接口 ,那么他的具体具体实现类是什么那?
(三)、 RealBufferedSink和 RealBufferedSource
因为RealBufferedSink和RealBufferedSource是一一对应的,我就讲解RealBufferedSInk了,RealBufferedSource这里就不仔细讲解了。
final class RealBufferedSink implements BufferedSink {
public final Buffer buffer = new Buffer();
public final Sink sink;
public RealBufferedSink(Sink sink){
if (sink == null)
throw new NullPointerException("sink == null");
this.sink = sink;
}
}
大家看源代码可知RealBufferedSink类中有两个主要参数,一个是新建的Buffer对象,一个是Sink对象。虽然这个类叫做RealBufferedSinke,但是实际上这个只是一个保存Buffer对象的代理而已,真生的实现都是在Buffer中实现的。
@Override public BufferedSink write(byte[] source) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source);
return emitCompleteSegments();
}
@Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
buffer.write(source, offset, byteCount);
return emitCompleteSegments();
}
看上面代码可知,RealBufferedSink实现BufferedSink的方法实际上都是调用buffer对应的方法,对应的RealBufferedSource也是这样调用的buffer的read方法。
可以看到这个实现了BufferedSink接口的两个方法实际上都是调用了buffer的对应方法,对应的RealBufferedSource也是同样的调用buffer中的read方法,关于Buffer这个类会在下面详述,刚才我们看到Sink接口中有一个Timeout的类,这个就是Okio所实现的超时机制,保证了IO操作的稳定性。
所以关于读写操作这块的类图如下:
okio4.png
关于Buffer这个类,我们后面再说
(四)其它实现类
1、Sink和Source它们还有各自的支持gzip压缩的实现类GzipSink和GzipSource
2、具有委托功能的抽象类ForwardingSink和ForwardingSource,它们的具体实现类是InflaterSource和DeflaterSink,这两个类主要用于压缩,为GzipSink和GzipSource服务。
综上所述关于类的结构图如下:
okio2.png
刚刚我们看到在Sink接口中有一个Time类,这个就是okio中实现超时机制的接口,用于保证IO操作的稳定性。
四、Segment和SegmentPool解析
(一)、Segment
1、Segment字面的意思就是片段,okio将数据也就是Buffer分割成一块块的片段,内部维护者固定长度的byte[]数组,同时segment拥有前面节点和后面节点,构成一个双向循环链表,就像下图:
segment.png这样采取分片使用链表连接,片中使用数组存储,兼具读的连续性,以及写的可插入性,对比单一使用链表或者数组,是一种折中的方案,读写更快,而且有个好处根据需求改动分片的大小来权衡读写的业务操作,另外,segment也有一些内置的优化操作,综合这些okio才能大放异彩,后面解析Buffer时会讲解什么时候形成双向循环链表。
// 每一个segment所含数据的大小,固定的
static final int SIZE = 8192;
// Segments 用分享的方式避免复制数组
static final int SHARE_MINIMUM = 1024;
final byte[] data;
// data[]中第一个可读的位置
int pos;
// data[]中第一个可写的位
//所以一个Segment的可读数据数量为pos~limit-1=limit-pos;limit和pos的有效值为0~SIZE-1
int limit;
//当前存储的data数据是其它对象共享的则为真
boolean shared;
//是否是自己是操作者
boolean owner;
///前一个Segment
Segment pre;
///下一个Segment
Segment next;
总结一下就是:
SIZE就是一个segment的最大字节数,其中还有一个SHARE_MINIMUM,这个涉及到segment优化的另一个技巧,共享内存,然后data就是保存的字节数组,pos,limit就是开始和结束点的index,shared和owner用来设置状态判断是否可读写,一个有共享内存的sement是不能写入的,pre,next就是前置后置节点。
2、Segment的构造方法
Segment() {
this.data = new byte[SIZE];
this.owner = true;
this.shared = false;
}
Segment(Segment shareFrom) {
this(shareFrom.data, shareFrom.pos, shareFrom.limit);
shareFrom.shared = true;
}
Segment(byte[] data, int pos, int limit) {
this.data = data;
this.pos = pos;
this.limit = limit;
this.owner = false;
this.shared = true;
}
- 1、无参的构造函数,表明采用该构造器表明该数据data的所有者就是该Segment自己,所以owner为真,shared为假
- 2、一个参数的构造函数,看代码我们知道这个数据来自其他Segment,所以表明这个Segment是被别人共享了,所以shared为真,owner为假
- 3、三个参数构造函数表明数据直接使用外面的,所以share为真,owner为假
由上面代码可知,owner和shared这两个状态是互斥的,且赋值都是同步赋值的。
3、Segment的几个方法分析
既然是双向循环链表,其中也会有一些操作的方法:
(1)pop()方法:
将当前Segment从Segment链中移除去将,返回下一个Segment代码如下
public Segment pop(){
Segment result = next != this ? next : null;
pre.next = next;
next.pre = pre;
next = null;
pre = null;
return result;
}
pop方法移除自己,首先将自己的前后两个节点连接起来,然后将自己的前后置空,这昂就脱离了整个双向链表,然后返回next。说道pop就一定会联想到push。
(2)push方法:
Segment中的push表示将一个Segment压入该Segment节点的后面,返回刚刚压入的Segment
public Segment push(Segment segment){
segment.pre = this;
segment.next = next;
next.pre = segment;
next = segment;
return segment;
}
push方法就是在当前和next引用中间插入一个segment进来,并且返回插入的segment,这两个都是寻常的双向链表的操作,我们再来看看如何写入数据。
(3)writeTo方法
/** Moves {@code byteCount} bytes from this segment to {@code sink}. */
public void writeTo(Segment sink, int byteCount) {
//只能对自己操作
if (!sink.owner) throw new IllegalArgumentException();
//写的起始位置加上需要写的byteCount大于SIZE
if (sink.limit + byteCount > SIZE) {
// We can't fit byteCount bytes at the sink's current position. Shift sink first.
//如果是共享内存,不能写,则抛异常
if (sink.shared) throw new IllegalArgumentException();
//如果不是共享内存,且写的起始位置加上byteCount减去头还大于SIZE则抛异常
if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
//上述条件都不满足,我们需要先触动要写文件的地址,把sink.data从sink.pos这个位置移动到 0这个位置,移动的长度是limit - sink.pos,这样的好处就是sink初始位置是0
System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
sink.limit -= sink.pos;
sink.pos = 0;
}
//开始尾部写入 写完置limit地址
System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
sink.limit += byteCount;
pos += byteCount;//索引后移
}
PS:这里我们来复习一下arraycopy方法:
public static native void arraycopy(Object src, int srcPos, Object dest, int destPos, int length);
src:源数组; srcPos:源数组要复制的起始位置;dest:目的数组; destPos:目的数组放置的起始位置;length:复制的长度。
实现过程是这样的,先生成一个长度为length的临时数组,将圆数组中srcPos 到scrPos+lengh-1之间的数据拷贝到临时数组中。然后将这个临时数组复制到dest中。
读到上面的代码,我们发现owner和shared这两个状态是互斥的,赋值都是同步赋值的,这里我们有点明白这两个参数的意义了,如果是共享的就是无法写,以免污染数据,会抛出异常。
如果要写的字节加上原有的字节大于单个segment的最大值也会抛出异常。
还有一种情况就是,由于前面的pos索引可能因为read方法取出数据,pos所以后移这样导致可以容纳数据,这样可以先执行移动操作,使用JDK提供的System.arraycopy方法来移动到pos为0的状态,更改pos和limit索引后再在尾部写入byteCount数的数据,写完之后实际上原segment读了byteCount的数据,所以pos需要后移这么多。
(4) compact()方法(压缩机制)
除了写入数据之外,segment还有一个优化的技巧,因为每个segment的片段size是固定的,为了防止经过长时间的使用后,每个segment中的数据被分割的十分严重,可能一个很小的数据却占据了整个segment,所以有了一个压缩机制。
public void compact() {
//上一个节点就是自己,意味着就一个节点,无法压缩,抛异常
if (prev == this) throw new IllegalStateException();
//如果上一个节点不是自己的,所以你是没有权利压缩的
if (!prev.owner) return; // Cannot compact: prev isn't writable.
//能进来说明,存在上一个节点,且上一个节点是自己的,可以压缩
//记录当前Segment具有的数据,数据大小为limit-pos
int byteCount = limit - pos;
统计前结点是否被共享,如果共享则只记录Size-limit大小,如果没有被共享,则加上pre.pos之前的空位置;
//本行代码主要是获取前一个segment的可用空间。先判断prev是否是共享的,如果是共享的,则只记录SIZE-limit,如果没有共享则记录SIZE-limit加上prev.pos之前的空位置
int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
//判断prev的空余空间是否能够容纳Segment的全部数据,不能容纳则返回
if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
//能容纳则将自己的这个部分数据写入上一个Segment
writeTo(prev, byteCount);
//讲当前Segment从Segment链表中移除
pop();
//回收该Segment
SegmentPool.recycle(this);
}
总结下上述代码:如果前面的Segment是共享的,那么不可写,也不能压缩,然后判断前一个的剩余大小是否比当前打,如果有足够的空间来容纳数据,调用前面的writeTo方法写入数据,写完以后,移除当前segment,然后回收segment。
(5)split()方法(共享机制)
还有一种机制是共享机制,为了减少数据复制带来的性能开销。
本方法用于将Segment一分为二,将pos+1pos+btyeCount-1的内容给新的Segment,将pos+byteCountlimit-1的内容留给自己,然后将前面的新Segment插入到自己的前面。这里需要注意的是,虽然这里变成了两个Segment,但是实际上byte[]数据并没有被拷贝,两个Segment都引用了改Segment。
public Segment split(int byteCount) {
if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
Segment prefix;
// We have two competing performance goals:
// - Avoid copying data. We accomplish this by sharing segments.
// - Avoid short shared segments. These are bad for performance because they are readonly and
// may lead to long chains of short segments.
// To balance these goals we only share segments when the copy will be large.
if (byteCount >= SHARE_MINIMUM) {
prefix = new Segment(this);
} else {
prefix = SegmentPool.take();
System.arraycopy(data, pos, prefix.data, 0, byteCount);
}
prefix.limit = prefix.pos + byteCount;
pos += byteCount;
prev.push(prefix);
return prefix;
}
这个方法实际上经过很多次的改变,在回顾okio1.6时,发现有一个重要的差异就是多了一个SHARE_MININUM参数,同时也多了一个注释,为了防止一个很小的片段就进行共享,我们知道共享之后为了防止数据污染就无法写了,如果存在大量的共享片段,实际上是浪费资源的,所以通过这个对比可以看出这个最小数的的意义,而且这个方法在1.6的版本中检索实际上只有一个地方使用了这个方法,就是Buffer中的write方法,为了效率在移动大数据的时候直接移动了segment而不是data,这样在写数据上能达到很高的效率,具体的write细节会在后面讲解。
在上面的方法中出现的 SegmentPool ,这是什么东西,那我们下面就来讲解一下
(二) SegmentPool
SegmentPool是一个Segment池,由一个单项链表构成。该池负责Segment的回收和闲置Segment管理,也就是说Buffer使用的Segment是从Segment单项链表中取出的,这样有效的避免了GC频率
SegmentPool这个类比较简单,先来看下这个类的属性
先看下
//一个Segment记录的最大长度是8192,因此SegmentPool只能存储8个Segment
static final long MAX_SIZE = 64 * 1024;
//该SegmentPool存储了一个回收Segment的链表
static Segment next;
//该值记录了当前所有Segment的总大小,最大值是为MAX_SIZE
static long byteCount;
上面代码表明了一个池子的上线也就是64K,相当于8个Segment,next这个属性表明这个SegmentPool是按照单链表的方式进行存储的,byteCount这个字段表明已经使用的大小。
SegmentPool的方法十分简单,就两个一个是"取",一个是"收"
static Segment take() {
synchronized (SegmentPool.class) {
if (next != null) {
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
}
}
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
}
为了防止多线程同时操作出现问题,这里加了锁。注意:这里的next表示整个对象池的头,而不是"下一个"。如果next为null,则这个,池子里面没有Segment。否则就从里面拿出一个next出来,并将next的下一个节点赋值为next(即为头),然后设置下一下byteCount。如果链表为空,则创建一个Segment对象返回。
下面咱们来看下回收的方法
static void recycle(Segment segment) {
//如果这个要回收的Segment被前后引用,则无法回收
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
//如果这个要回收的Segment的数据是分享的,则无法回收
if (segment.shared) return; // This segment cannot be recycled.
//加锁
synchronized (SegmentPool.class) {
//如果 这个空间已经不足以再放入一个空的Segment,则不回收
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
//设置SegmentPool的池大小
byteCount += Segment.SIZE;
//segment的下一个指向头
segment.next = next;
//设置segment的可读写位置为0
segment.pos = segment.limit = 0;
//设置当前segment为头
next = segment;
}
}
上面代码很简单,就是尝试将参数Segment对象加入到自身的Segment链表中。其中做了一些条件判断,具体看注释。
SegmentPool总结:
1.大家注意到没有SegmentPool的作用就是管理多余的Segment,不直接丢弃废弃的Segment,等客户需要Segment的时候直接从池中获取一个对象,避免了重复创建新兑现,提高资源利用率。
2.大家仔细读取源码会发现SegmentPool的所有操作都是基于对表头的操作。
五、ByteString
ByteString存储的是不可变比特序列,可能你不太理解这句话,如果给出final btye[] data 你是不是就秒懂了。官方文档说可以把它看成string的远方亲戚,且这个亲戚符合人工工学设计,逼格是不是很高。不过简单的讲,他就是一个byte序列(数组),以制定的编码格式进行解码。目前支持的解码规则有hex,base64和UTF-8等,机智如你可能会说String也是如此。是的,你说的没错,ByteString 只是把这些方法进行了封装。免去了我们直接输入类似的"utf-8"这样的错误,直接通过调用utf-8格式进行解码,还做了优化,在第一次调用uft8()方法的时候得到了一个该解码的String,同时在ByteString内部还保留了这个引用,当再次调用utf-8()的时候,则直接返回这个引用。
刚刚说了不可变对象。Effective Java 一书中有一条给了不可变对象的需要遵循的的几条原则:
- 1.不要提供任何会修改对象状态的方法
- 2.保证类不会被扩展
- 3.使所有的域都是final的
- 4.使所有的域都是private的
- 5.确保对于任何可变组件的互斥访问
不可变对象有许多好处,首先本质是线程安全,不要求同步处理,也就是没有锁之类的性能问题,而且可以被自由的共享内部信息,当然坏处就是要创建大量类的对象,咱们看看ByteString的属性
1、ByteString属性
//明显给16进制数准备的
static final char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
//一个空的对象
/** A singleton empty {@code ByteString}. */
public static final ByteString EMPTY = ByteString.of();
final byte[] data;
//hashcode的值
transient int hashCode; // Lazily computed; 0 if unknown.
transient String utf8; // Lazily computed.
这里要重点说明下 byte[] data和transient String utf8,ByteString不仅是不可变的,同时在内部有两个filed,分别是byte数据,以及String数据,这样能够让这个类Byte和String转换上基本没有开销。同样需要保存这两份引用,这明显的空间换时间的方式,为了性能okio做了很多事情。这是这个String前面有transient关键字标记,也就是说不会进入序列化和反序列化,所以readObject和writeObject()中没有utf8属性
2、ByteString构造函数
ByteString(byte[] data) {
this.data = data; // Trusted internal constructor doesn't clone data.
}
构造函数的参数为一个byte[]类型,不过构造函数只能被同包的类使用,因此我们创建ByteString对象并不是通过该方法。我们是如何构造一个ByteString对象?其实是通过 of()方法构造对象的
3、of()方法创建ByteString对象
/**
* Returns a new byte string containing a clone of the bytes of {@code data}.
*/
public static ByteString of(byte... data) {
if (data == null) throw new IllegalArgumentException("data == null");
return new ByteString(data.clone());
}
/**
* Returns a new byte string containing a copy of {@code byteCount} bytes of {@code data} starting
* at {@code offset}.
*/
public static ByteString of(byte[] data, int offset, int byteCount) {
if (data == null) throw new IllegalArgumentException("data == null");
checkOffsetAndCount(data.length, offset, byteCount);
byte[] copy = new byte[byteCount];
System.arraycopy(data, offset, copy, 0, byteCount);
return new ByteString(copy);
}
public static ByteString of(ByteBuffer data) {
if (data == null) throw new IllegalArgumentException("data == null");
byte[] copy = new byte[data.remaining()];
data.get(copy);
return new ByteString(copy);
ByteString 有三个of方法,都是return一个ByteString,咱们就一个一个的分析
- 1.第一个of方法,就是调用clone方法,重新创建一个byte数组。clone一个数组的原因很简单,我们确保ByteString的data指向byte[]没有被其他对象所引用,否则就容易破坏ByteString中存储的是一个不可变化的的比特流数据这一约束。
- 2.第二个of方法,首先进行checkOffsetAndCount()方法进行边界检查,然后,将data中指定的数据拷贝到数组中去。
- 3.第三个of方法,同理,和第二个of方法差不多,只不过一个是byte[],一个是ByteBuffer。
4、toString()
/**
* Returns a human-readable string that describes the contents of this byte string. Typically this
* is a string like {@code [text=Hello]} or {@code [hex=0000ffff]}.
*/
@Override public String toString() {
if (data.length == 0) {
return "[size=0]";
}
String text = utf8();
int i = codePointIndexToCharIndex(text, 64);
if (i == -1) {
return data.length <= 64
? "[hex=" + hex() + "]"
: "[size=" + data.length + " hex=" + substring(0, 64).hex() + "…]";
}
String safeText = text.substring(0, i)
.replace("\\", "\\\\")
.replace("\n", "\\n")
.replace("\r", "\\r");
return i < text.length()
? "[size=" + data.length + " text=" + safeText + "…]"
: "[text=" + safeText + "]";
}
5、utf8()
utf8格式的String
/** Constructs a new {@code String} by decoding the bytes as {@code UTF-8}. */
public String utf8() {
String result = utf8;
// We don't care if we double-allocate in racy code.
return result != null ? result : (utf8 = new String(data, Util.UTF_8));
}
这里面的一个判断语句,实现ByteString性能优化,看来优化这个东西还是很容易实现的。第一次创建UTF-8对象的方法是调用new String(data,Util.UTF_8),后面就不再调用该方法而是直接返回result;发现uft8是对String 的方法进一步封装。
下面是ByteString的方法
image.png
六、Buffer
1、Buffer简介
- 1、Buffer存储的是可变比特序列,需要注意的是Buffer内部对比特数据的存储不是直接使用一个byte数组那么简单,它使用了一种新的数据类型Segment进行存储。不过我们先不去管Segment是什么东西,可以先直接将Buffer想象成一个LinkedList集合就知道了,之所以做这样,因为Buffer的容量可以动态扩展,从序列的尾部存入数据,从序列的头部读取数据。其实Buffer的底层实现了远比LinkedList复杂的多,他使用双向链表的形式存储数组,链表节点的数据类型就是前面说的Segment,Segment存储有一个不可变比特序列,即final byte[] data。使用的Buffer的好处在于从一个Buffer移动到另一个Buffer的时候,实际上并没有进行拷贝,只是改变了它对应的Segment的所有者,其实这也采用链表存储数据的好处,这样的特点在多线程网络通信中带来很大的好处。最后使用Buffer还有另一个好处那就是它实现了BufferSource和BufferSink接口。
- 2、前面讲到Buffer这个类实际上就是整个读写的核心,包括RealBufferSource和RealBufferedSink实际上都只是一个代理,里面操作全部都是通过Buffer来完成。
2、Buffer的属性及实现的接口
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
//明显是给16进制准备的
private static final byte[] DIGITS =
{ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
static final int REPLACEMENT_CHARACTER = '\ufffd';
//Buffer存储了一个这样的head节点,这就是Buffer对数据的存储结构。字节数组都是交给Segment进行管理
Segment head;
//当前存储的数据大小
long size;
因为Buffer持有一个Segment的引用,所以通过这个引用能拿到整个链表中的所有数据。
同时Buffer实现了三个接口,读,写以及clone。
(1)clone接口
那咱们先说clone吧,这个比较简答,大家都能看懂,Java拷贝,主要分为深拷贝还是浅拷贝。Buffer这里采用的是浅拷贝。
//看到注释的第一句话,我就知道了是深拷贝,哈哈!
/** Returns a deep copy of this buffer. */
@Override public Buffer clone() {
//先new了一个Buffer对象
Buffer result = new Buffer();
//如果size==0,说明这个Buffer是空的,所以直接返回即可
if (size == 0) return result;
//如果size!=0,说明这个Buffer是有数据的,然后吧head指向这个copy的head,PS大家回想下Segment的这个构造函数,里面是怎么操作的?
result.head = new Segment(head);
//然后设置copy的head的next和prev的值
result.head.next = result.head.prev = result.head;
//开始遍历这个Buffer持有的Segment链了
for (Segment s = head.next; s != head; s = s.next) {
result.head.prev.push(new Segment(s));
}
result.size = size;
return result;
}
- 1对应了实现的clone方法,如果整个Buffer的size为0,也就是没有数据,那么返回一个新建的Buffer对象,如果不为空就是遍历所有segment并且都创建一个对应的segment,这样clone出来的对象就是一个全新的毫无关系的对象。
- 2前面分析segment的时候有讲到是一个双向链表,但是segment自身构造的时候却没有形成闭环,其实就是在Buffer中产生的。
result.head.next = result.head.prev = result.head;- 3关于for循环有同学表示看不懂,我这里就详细解释下:
先取出head的next对象的引用,然后压入该节点,比如 目前顺序是 A B C,A是head,A的pre是C,所以在C节点的后面加入新的segmentD,压后的顺序就是 A B C D,然后执行s.next,如果s应是最后了,则跳出循环
2、BufferedSource, BufferedSink接口
Buffer实现了这两个接口的所有方法,既有读,也有写,由于篇幅的限制,我就不全部讲解,只挑几个讲解。其他的都大同小异
@Override public int readInt() {
//第一步
if (size < 4) throw new IllegalStateException("size < 4: " + size);
//第二步
Segment segment = head;
int pos = segment.pos;
int limit = segment.limit;
// If the int is split across multiple segments, delegate to readByte().
if (limit - pos < 4) {
return (readByte() & 0xff) << 24
| (readByte() & 0xff) << 16
| (readByte() & 0xff) << 8
| (readByte() & 0xff);
}
//第三步
byte[] data = segment.data;
int i = (data[pos++] & 0xff) << 24
| (data[pos++] & 0xff) << 16
| (data[pos++] & 0xff) << 8
| (data[pos++] & 0xff);
size -= 4;
//第四步
if (pos == limit) {
head = segment.pop();
SegmentPool.recycle(segment);
} else {
segment.pos = pos;
}
//第五步
return i;
}
第一步,做了一次验证,因为一个int数据的字节数是4,所以必须保证当前Buffer的size大于4。
第二步,如果当前的Segement所包含的字节数小于4,因此还需要去一个Segment中获取一分部数据,因此通过调用readByte()方法一个字节一个字节的读取,该方法我们后介绍
第三步,如果当前Segment的数据够用,因此直接从pos位置开始读取4个字节的数据,然后将其转换为int数据,转换方法很简单就是位和或运算。
第四步,如果pos==limit证明当前head对应的Segment没有可读数据,因此将该Segment从双向链表移除,并回收该Segment。如果还有数据则设置新的pos值。
第五步,返回解析得到int值
这里面提到了readByte()那么咱们就来研究下这个readByte()方法
@Override public byte readByte() {
//第一步
if (size == 0) throw new IllegalStateException("size == 0");
//第二步
Segment segment = head;
int pos = segment.pos;
int limit = segment.limit;
//第三步
byte[] data = segment.data;
//第四步
byte b = data[pos++];
size -= 1;
//第五步
if (pos == limit) {
head = segment.pop();
SegmentPool.recycle(segment);
} else {
segment.pos = pos;
}
//第六步
return b;
}
第一步,做size判断,如果size==0表示没有什么东西好读取的。
第二步,获取Segment,并取得这个segment的pos位置和limit的位置
第三步,获取byte[] data
第四步,获取一个字节的byte
第五步,判断这个segment有没有可读数据了,如果没有可读数据则回收Segment。如果有数据则更新pos的值
第六步,返回b
那咱们再来看看写的方法,那换一个就不写int,选写short吧
@Override public Buffer writeShort(int s) {
//第一步
Segment tail = writableSegment(2);
//第二步
byte[] data = tail.data;
int limit = tail.limit;
data[limit++] = (byte) ((s >>> 8) & 0xff);
data[limit++] = (byte) (s & 0xff);
tail.limit = limit;
//第三步
size += 2;
//第四步
return this;
}
/**
* Returns a tail segment that we can write at least {@code minimumCapacity}
* bytes to, creating it if necessary.
*/
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
//如果hea=null表明Buffer里面一个Segment都没有
if (head == null) {
//从SegmentPool取出一个Segment
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
}
//如果head不为null.
Segment tail = head.prev;
//如果已经写入的数据+最小可以写入的空间超过限制,则在SegmentPool里面取一个
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
}
return tail;
}
writeShort用来给Buffer写入一个short数据
第一步,通过writeableSegment拿到一个能有2个字节的空间的segment,writeableSegment方法我里面写了注释,大家自己去看,我这里就不详细说明了。
第二步,tail中的data就是字节数组,limit则是数据的尾部索引,写数据就是在尾部继续写,直接设置在data通过limit自增后的index,然后重置尾部索引.
第三步,size+2
第四步,返回tail
七、Okio中的超时机制
1、TimeOut
okio的超时机制让I/O操作不会因为异常阻塞在某个未知的错误上,okio的基础超时机制是采取同步超时。
1以输出流Sink为例子,当我们用下面的方法包装流时:
//Okio.java
//实际上调用的两个参数的sink方法,第二个参数是new的TimeOut对象,即同步超时
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
}
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override
public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached();
Segment head = source.head;
int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
out.write(head.data, head.pos, toCopy);
head.pos += toCopy;
byteCount -= toCopy;
source.size -= toCopy;
if (head.pos == head.limit) {
source.head = head.pop();
SegmentPool.recycle(head);
}
}
}
@Override public void flush() throws IOException {
out.flush();
}
@Override public void close() throws IOException {
out.close();
}
@Override public Timeout timeout() {
return timeout;
}
@Override public String toString() {
return "sink(" + out + ")";
}
};
}
可以看到write方法里中实际上有一个while循环,在每个开始写的时候就调用了timeout.throwIfReached()方法,我们推断这个方法里面做了时间是否超时的判断,如果超时了,应该throw一个exception出来。这很明显是一个同步超时机制,按序执行。同理Source也是一样,那么咱们看下他里面到底是怎么执行的
/**
* Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
* thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
* asynchronously abort an in-progress operation.
*/
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
throw new InterruptedIOException("deadline reached");
}
}
大家先看下上面的注释,我英语不是很好,大家如果有更好的翻译,请提示我,我翻译如下:
如果到达了最后的时间,会抛出InterruptedIOException,或者线程被interrupted了也会抛出InterruptedIOException。该方法是不会检查超时的,应该是是一个异步进度操作单元来实现这个类,进行检查超时。
但是当我们看okio对于socket的封装时
/**
* Returns a sink that writes to {@code socket}. Prefer this over {@link
* #sink(OutputStream)} because this method honors timeouts. When the socket
* write times out, the socket is asynchronously closed by a watchdog thread.
*/
public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
}
2、AsyncTimeout
这里出现一个AsyncTimeout类,这个实际上继承于TimeOut所实现的一个异步超时类,这个异步类比同步要复杂的多,它使用了人一个WatchDog线程在后台进行监听超时。
这里面用到了Watchdog,Watchdog是AsyncTimeout的静态内部类,那么我们来下看Watchdog是个什么东西
private static final class Watchdog extends Thread {
public Watchdog() {
super("Okio Watchdog");
setDaemon(true);
}
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
return;
}
}
// Close the timed out node.
timedOut.timedOut();
} catch (InterruptedException ignored) {
}
}
}
}
这里的WatchDog并不是Linux中的那个,只是一个继承于Thread的一类,里面的run方法执行的就是超时的判断,之所以在socket写时采取异步超时,这完全是由socket自身的性质决定的,socket经常会阻塞自己,导致下面的事情执行不了。AsyncTimeout继承Timeout类,可以复写里面的timeout方法,这个方法会在watchdao的线程中调用,所以不能执行长时间的操作,否则就会引起其他的超时。
下面详细分析AsyncTimeout这个类
//不要一次写超过64k的数据否则可能会在慢连接中导致超时
private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
private static AsyncTimeout head;
private boolean inQueue;
private AsyncTimeout next;
private long timeoutAt;
首先就是一个最大的写值,定义为64K,刚好和一个Buffer大小一样。官方的解释是如果连续读写超过这个数字的字节,那么及其容易导致超时,所以为了限制这个操作,直接给出了一个能写的最大数。
下面两个参数head和next,很明显表明这是一个单链表,timeoutAt则是超时时间。使用者在操作之前首先要调用enter()方法,这样相当于注册了这个超时监听,然后配对的实现exit()方法。这样exit()有一个返回值会表明超时是否出发,注意:这个timeout是异步的,可能会在exit()后才调用
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
}
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
}
这里只是判断了inQueue的状态,然后设置inQueue的状态,真正的调用是在scheduleTimeout()里面,那我们进去看下
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
//head==null,表明之前没有,本次是第一次操作,开启Watchdog守护线程
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
}
long now = System.nanoTime();
//如果有最长限制(hasDeadline我翻译为最长限制),并且超时时长不为0
if (timeoutNanos != 0 && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
// Math.min() is undefined for absolute values, but meaningful for relative ones.
//对比最长限制和超时时长,选择最小的那个值
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
//如果没有最长限制,但是超时时长不为0,则使用超时时长
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
//如果有最长限制,但是超时时长为0,则使用最长限制
node.timeoutAt = node.deadlineNanoTime();
} else {
//如果既没有最长限制,和超时时长,则抛异常
throw new AssertionError();
}
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
//如果下一个为null或者剩余时间比下一个短 就插入node
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
}
break;
}
}
}
上面可以看出这个链表实际上是按照剩余的超时时间来进行排序的,快到超时的节点排在表头,一次往后递增。我们以一个read的代码来砍整个超时的绑定过程。
@Override
public long read(Buffer sink, long byteCount) throws IOException {
boolean throwOnTimeout = false;
enter();
try {
long result = source.read(sink,byteCount);
throwOnTimeout = true;
return result;
}catch (IOException e){
throw exit(e);
}finally {
exit(throwOnTimeout);
}
}
首先调用enterf方法,然后去做读的操作,这里可以砍到不仅在catch上而且是在finally中也做了操作,这样一场和正常的情况都考虑到了,在exit中调用了真正的exit方法,exit中会判断这个异步超时对象是否在链表中
final void exit(boolean throwOnTimeout) throws IOException {
boolean timeOut = exit();
if (timeOut && throwOnTimeout)
throw newTimeoutException(null);
}
public final boolean exit(){
if (!inQueue)
return false;
inQueue = false;
return cancelScheduledTimeout(this);
}
回到前面说的的WatchDog,内部的run方法是一个while(true)的一个死循环,由于在while(true)里面锁住了内部的awaitTimeout的操作,这个await正是判断是否超时的真正地方。
static AsyncTimeout awaitTimeout() throws InterruptedException {
//拿到下一个节点
AsyncTimeout node = head.next;
//如果queue为空,等待直到有node进队,或者触发IDLE_TIMEOUT_MILLS
if (node == null) {
long startNanos = System.nanoTime();
AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLS);
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head : null;
}
long waitNanos = node.remainingNanos(System.nanoTime());
//这个head依然还没有超时,继续等待
if (waitNanos > 0) {
long waitMills = waitNanos / 1000000L;
waitNanos -= (waitMills * 1000000L);
AsyncTimeout.class.wait(waitMills, (int) waitNanos);
return null;
}
head.next = node.next;
node.next = null;
return node;
}
这里就比较清晰了,主要就是通过这个remainNanos来判断预定的超时时间减去当前时间是否大于0,如果比0大就说明还没超时,于是wait剩余的时间,然后表示没有超时,如果小于0,就会把这个从链表中移除,根据前面的exit方法中的判断就能触发整个超时的方法。
所以Buffer的写操作,实际上就是不断增加Segment的一个过程,读操作,就是不断消耗Segment中的数据,如果数据读取完,则使用SegmentPool进行回收。Buffer更多的逻辑主要是跨Segment读取数据,需要把前一个Segment的前端拼接在一起,因此看起来代码相对很多,但是其实开销非常低
八、okio的优雅之处
(一)、okio类
在说okio的设计模式之前,先说下okio这这个类,该类是一个大工厂,为我们创建出各种各样的Sink、Source 对象,提供了三种数据源InputStream/OutputStream、Socket、File,我们可以把本该对这个三类数据源的IO操作通过okio库来实现,更方便,更高效。
(二) 图演示
1、okio的类图
image.png
2、okio读写流程图
okio操作图.png(三) okio高效方便之处
- 1、它对数据进行了分块处理(Segment),这样在大数据IO的时候可以以块为单位进行IO,这可以提高IO的吞吐率
- 2、它对这些数据块使用链表来进行管理,这可以仅通过移动指针就进行数据的管理,而不用真正的处理数据,而且对扩容来说十分方便.
- 3、闲置的块进行管理,通过一个块池(SegmentPool)的管理,避免系统GC和申请byte时的zero-fill。其他的还有一些小细节上的优化,比如如果你把一个UTF-8的String转化为ByteString,ByteString会保留一份对原来String的引用,这样当你下次需要decode这个String时,程序通过保留的引用直接返回对应的String,从而避免了转码过程。
- 4、他为所有的Source、Sink提供了超时操作,这是在Java原生IO操作是没有的。
- 5、okio它对数据的读写都进行了封装,调用者可以十分方便的进行各种值(Stringg,short,int,hex,utf-8,base64等)的转化。
九、FileSystem
public interface FileSystem {
/** The host machine's local file system. */
FileSystem SYSTEM = new FileSystem() {
@Override public Source source(File file) throws FileNotFoundException {
return Okio.source(file);
}
@Override public Sink sink(File file) throws FileNotFoundException {
try {
return Okio.sink(file);
} catch (FileNotFoundException e) {
// Maybe the parent directory doesn't exist? Try creating it first.
file.getParentFile().mkdirs();
return Okio.sink(file);
}
}
@Override public Sink appendingSink(File file) throws FileNotFoundException {
try {
return Okio.appendingSink(file);
} catch (FileNotFoundException e) {
// Maybe the parent directory doesn't exist? Try creating it first.
file.getParentFile().mkdirs();
return Okio.appendingSink(file);
}
}
@Override public void delete(File file) throws IOException {
// If delete() fails, make sure it's because the file didn't exist!
if (!file.delete() && file.exists()) {
throw new IOException("failed to delete " + file);
}
}
@Override public boolean exists(File file) {
return file.exists();
}
@Override public long size(File file) {
return file.length();
}
@Override public void rename(File from, File to) throws IOException {
delete(to);
if (!from.renameTo(to)) {
throw new IOException("failed to rename " + from + " to " + to);
}
}
@Override public void deleteContents(File directory) throws IOException {
File[] files = directory.listFiles();
if (files == null) {
throw new IOException("not a readable directory: " + directory);
}
for (File file : files) {
if (file.isDirectory()) {
deleteContents(file);
}
if (!file.delete()) {
throw new IOException("failed to delete " + file);
}
}
}
};
/** Reads from {@code file}. */
Source source(File file) throws FileNotFoundException;
/**
* Writes to {@code file}, discarding any data already present. Creates parent directories if
* necessary.
*/
Sink sink(File file) throws FileNotFoundException;
/**
* Writes to {@code file}, appending if data is already present. Creates parent directories if
* necessary.
*/
Sink appendingSink(File file) throws FileNotFoundException;
/** Deletes {@code file} if it exists. Throws if the file exists and cannot be deleted. */
void delete(File file) throws IOException;
/** Returns true if {@code file} exists on the file system. */
boolean exists(File file);
/** Returns the number of bytes stored in {@code file}, or 0 if it does not exist. */
long size(File file);
/** Renames {@code from} to {@code to}. Throws if the file cannot be renamed. */
void rename(File from, File to) throws IOException;
/**
* Recursively delete the contents of {@code directory}. Throws an IOException if any file could
* not be deleted, or if {@code dir} is not a readable directory.
*/
void deleteContents(File directory) throws IOException;
}
看完这段代码大家就会知道,FileSystem是一个接口,里面有一个他的实现类SYSTEM.所以可以FileSystem看成okhttp中文件系统对okio的桥接管理类。
看下他的所有方法:
- 1、Source source(File): 获取File的source (用于读)
- 2、Sinke sink(File):获取File的Sink (用于写)
- 3、Sink appending(File): 获取File的Sink,拼接用的(用于写)
- 4、delete(File): void 删除文件
- 5、exists(File):文件是否存在
- 6、size(Flie):获取文件的大小
- 7、rename(File,File): 文件改名
- 8、deleteContents(File):删除文件夹