框架第三方库源码分析Android网络编程

OKHttp源码解析(五)--OKIO简介及FileSystem

2017-06-04  本文已影响1627人  隔壁老李头

本来我打算OKHttp源码解析(四) 是写OKHTTP的缓存,最后再单独写OKIO的,但是发现里面运用到了OKIO,而且后面讲连接的时候也要涉及到OKIO,所以我就把OKIO拿到前面来,这样大家在读缓存源码和连接的时候更清楚。
本篇文章的大纲如下:

  • 1.什么是OKIO
  • 2.如何使用OKIO
  • 3.Sink和Source及其实现
  • 4.Segment和SegmentPool解析
  • 5.不可变的ByteString
  • 6.最核心的Buffer解析
  • 7.okio中的超时机制
  • 8.okio的优雅之处
  • 9.FileSystem

一、okio

说道okio就不能不提JDK里面io,那么咱们先简单说下JDK里面的io。

(一)、javaNIO和阻塞I/O:

  • 1、阻塞I/O通信模式:调用InputStream.read()方法时是阻塞的,它会一直等到数据到来时才返回
  • 2、NIO通信模式:在JDK1.4开始,是一种非阻塞I/O,在Java NIO的服务端由一个专门的线程来处理所有I/O事件,并负责分发;线程之间通讯通过wait和notify等方式。

(二)、okio

okio是由square公司开发的,它补充了java.io和java.nio的不足,以便能够更加方便,快速的访问、存储和处理你的数据。OKHttp底层也是用该库作为支持。而且okio使用起来很简单,减少了很多io操作的基本代码,并且对内存和CPU使用做了优化,他的主要功能封装在ByteString和Buffer这两个类中。

okio的作者认为,javad的JDK对字节流和字符流进行分开定义这一世情,并不是那么优雅,所以在okio并不进行这样划分。具体的做法就是把比特数据都交给Buffer管理,然后Buff实现BufferedSource和BufferedSink这两个接口,最后通过调用buffer相应的方法对数据进行编码。

二、okio的使用

假设我有一个hello.txt文件,内容是hello jianshu,现在我用okio把它读出来。
那我们先理一下思路:读取文件的步骤是首先要拿到一个输入流,okio封装了许多输入流,统一使用的方法重载source方法转换成一个source,然后使用buffer方法包装成BufferedSource,这个里面提供了流的各种操作,读String,读byte,short等,甚至是16进制数。这里直接读出文件的内容,十分简单,代码如下:


    public static void main(String[] args) {
        File file = new File("hello.txt");
        try {
            readString(new FileInputStream(file));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void readString(InputStream in) throws IOException {
      BufferedSource source = Okio.buffer(Okio.source(in));  //创建BufferedSource
      String s = source.readUtf8();  //以UTF-8读
      System.out.println(s);     //打印
      source.close();
    }

--------------------------------------输出-----------------------------------------
hello jianshu
Okio是对Java底层io的封装,所以底层io能做的Okio都能做。

上面的大体流程如下:
第一步,首先是调用okio的source(InputStream in)方法获取Source对象
第二步,调用okio的buffer(Source source)方法获取BufferedSource对象
第三步,调用BufferedSource的readUtf8()方法读取String对象
第四步,关闭BufferedSource

总结下大体流程如下:

  • 1.构建缓冲池,缓冲源对象
  • 2.读写操作
  • 3.关闭缓冲池

三、SinkSource及其实现

(一)、Sink和Source

在JDK里面有InputStream和OutputStream两个接口,由于okio作者认为Inputsream和OutputStream中的available()和读写单字节的方法纯属鸡肋,所以怼出Source和Sink接口。Source和Sink类似于InputStream和OutputStream,是io操作的顶级接口类,这两个接口均实现了Closeable接口。所以可以把Source简单的看成InputStream,Sink简单看成OutputStream。结构图如下图:


okio1.png

其中Sink代表的是输出流,Source代表的是输入流,这两个基本上都是对称的。那我们先来分析下Sink,代码如下:

public interface Sink extends Closeable, Flushable {
  /** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
  // 定义基础的write操作,该方法将字节写入Buffer
  void write(Buffer source, long byteCount) throws IOException;

  /** Pushes all buffered bytes to their final destination. */
  @Override void flush() throws IOException;

  /** Returns the timeout for this sink. */
  Timeout timeout();

  /**
   * Pushes all buffered bytes to their final destination and releases the
   * resources held by this sink. It is an error to write a closed sink. It is
   * safe to close a sink more than once.
   */
  @Override void close() throws IOException;
}
public interface Source extends Closeable {
  /**
   * Removes at least 1, and up to {@code byteCount} bytes from this and appends
   * them to {@code sink}. Returns the number of bytes read, or -1 if this
   * source is exhausted.
   */
    // 定义基础的read操作,该方法将字节写入Buffer
  long read(Buffer sink, long byteCount) throws IOException;

  /** Returns the timeout for this source. */
  Timeout timeout();

  /**
   * Closes this source and releases the resources held by this source. It is an
   * error to read a closed source. It is safe to close a source more than once.
   */
  @Override void close() throws IOException;
}

虽然Sink和Source只定义了很少的方法,这也是为何说它容易实现的原因,但是我们一般在使用过程中,不直接拿它使用,而是使用BufferedSink和BufferedSouce对接口的封装,因为在BufferedSinke和BufferedSource接口定义了一系列好用的方法。

(二)、 BufferedSinkeBufferedSource

看源码可知BufferedSink和BufferedSource定义了很多方便的方法如下图:


BufferedSinke.png

但是发现BufferedSink和BufferedSource两个都是接口 ,那么他的具体具体实现类是什么那?

(三)、 RealBufferedSinkRealBufferedSource

因为RealBufferedSink和RealBufferedSource是一一对应的,我就讲解RealBufferedSInk了,RealBufferedSource这里就不仔细讲解了。

final class RealBufferedSink implements BufferedSink {
    public final Buffer buffer = new Buffer();
    public final Sink sink;

    public RealBufferedSink(Sink sink){
        if (sink == null)
            throw new NullPointerException("sink == null");
        this.sink = sink;
    }
}

大家看源代码可知RealBufferedSink类中有两个主要参数,一个是新建的Buffer对象,一个是Sink对象。虽然这个类叫做RealBufferedSinke,但是实际上这个只是一个保存Buffer对象的代理而已,真生的实现都是在Buffer中实现的。

  @Override public BufferedSink write(byte[] source) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source);
    return emitCompleteSegments();
  }

  @Override public BufferedSink write(byte[] source, int offset, int byteCount) throws IOException {
    if (closed) throw new IllegalStateException("closed");
    buffer.write(source, offset, byteCount);
    return emitCompleteSegments();
  }

看上面代码可知,RealBufferedSink实现BufferedSink的方法实际上都是调用buffer对应的方法,对应的RealBufferedSource也是这样调用的buffer的read方法。

可以看到这个实现了BufferedSink接口的两个方法实际上都是调用了buffer的对应方法,对应的RealBufferedSource也是同样的调用buffer中的read方法,关于Buffer这个类会在下面详述,刚才我们看到Sink接口中有一个Timeout的类,这个就是Okio所实现的超时机制,保证了IO操作的稳定性。
所以关于读写操作这块的类图如下:


okio4.png

关于Buffer这个类,我们后面再说

(四)其它实现类

1、Sink和Source它们还有各自的支持gzip压缩的实现类GzipSink和GzipSource
2、具有委托功能的抽象类ForwardingSink和ForwardingSource,它们的具体实现类是InflaterSource和DeflaterSink,这两个类主要用于压缩,为GzipSink和GzipSource服务。
综上所述关于类的结构图如下:


okio2.png

刚刚我们看到在Sink接口中有一个Time类,这个就是okio中实现超时机制的接口,用于保证IO操作的稳定性。

四、Segment和SegmentPool解析

(一)、Segment

1、Segment字面的意思就是片段,okio将数据也就是Buffer分割成一块块的片段,内部维护者固定长度的byte[]数组,同时segment拥有前面节点和后面节点,构成一个双向循环链表,就像下图:

segment.png

这样采取分片使用链表连接,片中使用数组存储,兼具读的连续性,以及写的可插入性,对比单一使用链表或者数组,是一种折中的方案,读写更快,而且有个好处根据需求改动分片的大小来权衡读写的业务操作,另外,segment也有一些内置的优化操作,综合这些okio才能大放异彩,后面解析Buffer时会讲解什么时候形成双向循环链表。

      // 每一个segment所含数据的大小,固定的
    static final int SIZE = 8192;
     // Segments 用分享的方式避免复制数组
    static final int SHARE_MINIMUM = 1024;
  
    final byte[] data;
     // data[]中第一个可读的位置
    int pos;
     // data[]中第一个可写的位 
     //所以一个Segment的可读数据数量为pos~limit-1=limit-pos;limit和pos的有效值为0~SIZE-1
    int limit;
    //当前存储的data数据是其它对象共享的则为真  
    boolean shared;
    //是否是自己是操作者 
    boolean owner;
    ///前一个Segment
    Segment pre;
    ///下一个Segment
    Segment next;

总结一下就是:
SIZE就是一个segment的最大字节数,其中还有一个SHARE_MINIMUM,这个涉及到segment优化的另一个技巧,共享内存,然后data就是保存的字节数组,pos,limit就是开始和结束点的index,shared和owner用来设置状态判断是否可读写,一个有共享内存的sement是不能写入的,pre,next就是前置后置节点。

2、Segment的构造方法

Segment() {  
  this.data = new byte[SIZE];  
  this.owner = true;   
  this.shared = false; 
}  
Segment(Segment shareFrom) {  
  this(shareFrom.data, shareFrom.pos, shareFrom.limit);  
  shareFrom.shared = true;
}  
Segment(byte[] data, int pos, int limit) {  
  this.data = data;  
  this.pos = pos;  
  this.limit = limit;  
  this.owner = false; 
  this.shared = true;  
}  
  • 1、无参的构造函数,表明采用该构造器表明该数据data的所有者就是该Segment自己,所以owner为真,shared为假
  • 2、一个参数的构造函数,看代码我们知道这个数据来自其他Segment,所以表明这个Segment是被别人共享了,所以shared为真,owner为假
  • 3、三个参数构造函数表明数据直接使用外面的,所以share为真,owner为假

由上面代码可知,owner和shared这两个状态是互斥的,且赋值都是同步赋值的。

3、Segment的几个方法分析

既然是双向循环链表,其中也会有一些操作的方法:

(1)pop()方法:

将当前Segment从Segment链中移除去将,返回下一个Segment代码如下

    public Segment pop(){
        Segment result = next != this ? next : null;
        pre.next = next;
        next.pre = pre;
        next = null;
        pre = null;
        return result;
    }

pop方法移除自己,首先将自己的前后两个节点连接起来,然后将自己的前后置空,这昂就脱离了整个双向链表,然后返回next。说道pop就一定会联想到push。

(2)push方法:

Segment中的push表示将一个Segment压入该Segment节点的后面,返回刚刚压入的Segment

    public Segment push(Segment segment){
        segment.pre = this;
        segment.next = next;
        next.pre = segment;
        next = segment;
        return segment;
    }

push方法就是在当前和next引用中间插入一个segment进来,并且返回插入的segment,这两个都是寻常的双向链表的操作,我们再来看看如何写入数据。

(3)writeTo方法
  /** Moves {@code byteCount} bytes from this segment to {@code sink}. */
  public void writeTo(Segment sink, int byteCount) {
     //只能对自己操作
    if (!sink.owner) throw new IllegalArgumentException();
     //写的起始位置加上需要写的byteCount大于SIZE
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
       //如果是共享内存,不能写,则抛异常
      if (sink.shared) throw new IllegalArgumentException();
      //如果不是共享内存,且写的起始位置加上byteCount减去头还大于SIZE则抛异常
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      //上述条件都不满足,我们需要先触动要写文件的地址,把sink.data从sink.pos这个位置移动到 0这个位置,移动的长度是limit - sink.pos,这样的好处就是sink初始位置是0
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }
    //开始尾部写入 写完置limit地址
    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;//索引后移
  }

PS:这里我们来复习一下arraycopy方法:
public static native void arraycopy(Object src, int srcPos, Object dest, int destPos, int length);
src:源数组; srcPos:源数组要复制的起始位置;dest:目的数组; destPos:目的数组放置的起始位置;length:复制的长度。
实现过程是这样的,先生成一个长度为length的临时数组,将圆数组中srcPos 到scrPos+lengh-1之间的数据拷贝到临时数组中。然后将这个临时数组复制到dest中。

读到上面的代码,我们发现owner和shared这两个状态是互斥的,赋值都是同步赋值的,这里我们有点明白这两个参数的意义了,如果是共享的就是无法写,以免污染数据,会抛出异常。
如果要写的字节加上原有的字节大于单个segment的最大值也会抛出异常。
还有一种情况就是,由于前面的pos索引可能因为read方法取出数据,pos所以后移这样导致可以容纳数据,这样可以先执行移动操作,使用JDK提供的System.arraycopy方法来移动到pos为0的状态,更改pos和limit索引后再在尾部写入byteCount数的数据,写完之后实际上原segment读了byteCount的数据,所以pos需要后移这么多。

(4) compact()方法(压缩机制)

除了写入数据之外,segment还有一个优化的技巧,因为每个segment的片段size是固定的,为了防止经过长时间的使用后,每个segment中的数据被分割的十分严重,可能一个很小的数据却占据了整个segment,所以有了一个压缩机制。

  public void compact() {
    //上一个节点就是自己,意味着就一个节点,无法压缩,抛异常
    if (prev == this) throw new IllegalStateException();
     //如果上一个节点不是自己的,所以你是没有权利压缩的
    if (!prev.owner) return; // Cannot compact: prev isn't writable.
    //能进来说明,存在上一个节点,且上一个节点是自己的,可以压缩
    //记录当前Segment具有的数据,数据大小为limit-pos
    int byteCount = limit - pos;
    统计前结点是否被共享,如果共享则只记录Size-limit大小,如果没有被共享,则加上pre.pos之前的空位置;
    //本行代码主要是获取前一个segment的可用空间。先判断prev是否是共享的,如果是共享的,则只记录SIZE-limit,如果没有共享则记录SIZE-limit加上prev.pos之前的空位置
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
   //判断prev的空余空间是否能够容纳Segment的全部数据,不能容纳则返回
    if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space.
    //能容纳则将自己的这个部分数据写入上一个Segment
    writeTo(prev, byteCount);
    //讲当前Segment从Segment链表中移除
    pop();
    //回收该Segment
    SegmentPool.recycle(this);
  }

总结下上述代码:如果前面的Segment是共享的,那么不可写,也不能压缩,然后判断前一个的剩余大小是否比当前打,如果有足够的空间来容纳数据,调用前面的writeTo方法写入数据,写完以后,移除当前segment,然后回收segment。

(5)split()方法(共享机制)

还有一种机制是共享机制,为了减少数据复制带来的性能开销。
本方法用于将Segment一分为二,将pos+1pos+btyeCount-1的内容给新的Segment,将pos+byteCountlimit-1的内容留给自己,然后将前面的新Segment插入到自己的前面。这里需要注意的是,虽然这里变成了两个Segment,但是实际上byte[]数据并没有被拷贝,两个Segment都引用了改Segment。

 public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    // We have two competing performance goals:
    //  - Avoid copying data. We accomplish this by sharing segments.
    //  - Avoid short shared segments. These are bad for performance because they are readonly and
    //    may lead to long chains of short segments.
    // To balance these goals we only share segments when the copy will be large.
    if (byteCount >= SHARE_MINIMUM) {
      prefix = new Segment(this);
    } else {
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    prev.push(prefix);
    return prefix;
  }

这个方法实际上经过很多次的改变,在回顾okio1.6时,发现有一个重要的差异就是多了一个SHARE_MININUM参数,同时也多了一个注释,为了防止一个很小的片段就进行共享,我们知道共享之后为了防止数据污染就无法写了,如果存在大量的共享片段,实际上是浪费资源的,所以通过这个对比可以看出这个最小数的的意义,而且这个方法在1.6的版本中检索实际上只有一个地方使用了这个方法,就是Buffer中的write方法,为了效率在移动大数据的时候直接移动了segment而不是data,这样在写数据上能达到很高的效率,具体的write细节会在后面讲解。
在上面的方法中出现的 SegmentPool ,这是什么东西,那我们下面就来讲解一下

(二) SegmentPool

SegmentPool是一个Segment池,由一个单项链表构成。该池负责Segment的回收和闲置Segment管理,也就是说Buffer使用的Segment是从Segment单项链表中取出的,这样有效的避免了GC频率
SegmentPool这个类比较简单,先来看下这个类的属性
先看下

//一个Segment记录的最大长度是8192,因此SegmentPool只能存储8个Segment
static final long MAX_SIZE = 64 * 1024;
//该SegmentPool存储了一个回收Segment的链表
static Segment next;
//该值记录了当前所有Segment的总大小,最大值是为MAX_SIZE
static long byteCount;  

上面代码表明了一个池子的上线也就是64K,相当于8个Segment,next这个属性表明这个SegmentPool是按照单链表的方式进行存储的,byteCount这个字段表明已经使用的大小。

SegmentPool的方法十分简单,就两个一个是"取",一个是"收"

  static Segment take() {
    synchronized (SegmentPool.class) {
      if (next != null) {
        Segment result = next;
        next = result.next;
        result.next = null;
        byteCount -= Segment.SIZE;
        return result;
      }
    }
    return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
  }

为了防止多线程同时操作出现问题,这里加了锁。注意:这里的next表示整个对象池的头,而不是"下一个"。如果next为null,则这个,池子里面没有Segment。否则就从里面拿出一个next出来,并将next的下一个节点赋值为next(即为头),然后设置下一下byteCount。如果链表为空,则创建一个Segment对象返回。
下面咱们来看下回收的方法

  static void recycle(Segment segment) {
    //如果这个要回收的Segment被前后引用,则无法回收
    if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
    //如果这个要回收的Segment的数据是分享的,则无法回收
    if (segment.shared) return; // This segment cannot be recycled.
    //加锁
    synchronized (SegmentPool.class) {
      //如果 这个空间已经不足以再放入一个空的Segment,则不回收
      if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
      //设置SegmentPool的池大小
      byteCount += Segment.SIZE;
      //segment的下一个指向头
      segment.next = next;
      //设置segment的可读写位置为0
      segment.pos = segment.limit = 0;
      //设置当前segment为头
      next = segment;
    }
  }

上面代码很简单,就是尝试将参数Segment对象加入到自身的Segment链表中。其中做了一些条件判断,具体看注释。
SegmentPool总结:
1.大家注意到没有SegmentPool的作用就是管理多余的Segment,不直接丢弃废弃的Segment,等客户需要Segment的时候直接从池中获取一个对象,避免了重复创建新兑现,提高资源利用率。
2.大家仔细读取源码会发现SegmentPool的所有操作都是基于对表头的操作。

五、ByteString

ByteString存储的是不可变比特序列,可能你不太理解这句话,如果给出final btye[] data 你是不是就秒懂了。官方文档说可以把它看成string的远方亲戚,且这个亲戚符合人工工学设计,逼格是不是很高。不过简单的讲,他就是一个byte序列(数组),以制定的编码格式进行解码。目前支持的解码规则有hex,base64和UTF-8等,机智如你可能会说String也是如此。是的,你说的没错,ByteString 只是把这些方法进行了封装。免去了我们直接输入类似的"utf-8"这样的错误,直接通过调用utf-8格式进行解码,还做了优化,在第一次调用uft8()方法的时候得到了一个该解码的String,同时在ByteString内部还保留了这个引用,当再次调用utf-8()的时候,则直接返回这个引用。
刚刚说了不可变对象。Effective Java 一书中有一条给了不可变对象的需要遵循的的几条原则:

  • 1.不要提供任何会修改对象状态的方法
  • 2.保证类不会被扩展
  • 3.使所有的域都是final的
  • 4.使所有的域都是private的
  • 5.确保对于任何可变组件的互斥访问

不可变对象有许多好处,首先本质是线程安全,不要求同步处理,也就是没有锁之类的性能问题,而且可以被自由的共享内部信息,当然坏处就是要创建大量类的对象,咱们看看ByteString的属性

1、ByteString属性

//明显给16进制数准备的
static final char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  //一个空的对象 
  /** A singleton empty {@code ByteString}. */
  public static final ByteString EMPTY = ByteString.of();
  
  final byte[] data;
  //hashcode的值
  transient int hashCode; // Lazily computed; 0 if unknown.
  transient String utf8; // Lazily computed.

这里要重点说明下 byte[] data和transient String utf8,ByteString不仅是不可变的,同时在内部有两个filed,分别是byte数据,以及String数据,这样能够让这个类Byte和String转换上基本没有开销。同样需要保存这两份引用,这明显的空间换时间的方式,为了性能okio做了很多事情。这是这个String前面有transient关键字标记,也就是说不会进入序列化和反序列化,所以readObject和writeObject()中没有utf8属性

2、ByteString构造函数

  ByteString(byte[] data) {
    this.data = data; // Trusted internal constructor doesn't clone data.
  }

构造函数的参数为一个byte[]类型,不过构造函数只能被同包的类使用,因此我们创建ByteString对象并不是通过该方法。我们是如何构造一个ByteString对象?其实是通过 of()方法构造对象的

3、of()方法创建ByteString对象

  /**
   * Returns a new byte string containing a clone of the bytes of {@code data}.
   */
  public static ByteString of(byte... data) {
    if (data == null) throw new IllegalArgumentException("data == null");
    return new ByteString(data.clone());
  }

  /**
   * Returns a new byte string containing a copy of {@code byteCount} bytes of {@code data} starting
   * at {@code offset}.
   */
  public static ByteString of(byte[] data, int offset, int byteCount) {
    if (data == null) throw new IllegalArgumentException("data == null");
    checkOffsetAndCount(data.length, offset, byteCount);

    byte[] copy = new byte[byteCount];
    System.arraycopy(data, offset, copy, 0, byteCount);
    return new ByteString(copy);
  }

  public static ByteString of(ByteBuffer data) {
    if (data == null) throw new IllegalArgumentException("data == null");

    byte[] copy = new byte[data.remaining()];
    data.get(copy);
    return new ByteString(copy);

ByteString 有三个of方法,都是return一个ByteString,咱们就一个一个的分析

  • 1.第一个of方法,就是调用clone方法,重新创建一个byte数组。clone一个数组的原因很简单,我们确保ByteString的data指向byte[]没有被其他对象所引用,否则就容易破坏ByteString中存储的是一个不可变化的的比特流数据这一约束。
  • 2.第二个of方法,首先进行checkOffsetAndCount()方法进行边界检查,然后,将data中指定的数据拷贝到数组中去。
  • 3.第三个of方法,同理,和第二个of方法差不多,只不过一个是byte[],一个是ByteBuffer。

4、toString()

/**
   * Returns a human-readable string that describes the contents of this byte string. Typically this
   * is a string like {@code [text=Hello]} or {@code [hex=0000ffff]}.
   */
  @Override public String toString() {
    if (data.length == 0) {
      return "[size=0]";
    }

    String text = utf8();
    int i = codePointIndexToCharIndex(text, 64);

    if (i == -1) {
      return data.length <= 64
          ? "[hex=" + hex() + "]"
          : "[size=" + data.length + " hex=" + substring(0, 64).hex() + "…]";
    }

    String safeText = text.substring(0, i)
        .replace("\\", "\\\\")
        .replace("\n", "\\n")
        .replace("\r", "\\r");
    return i < text.length()
        ? "[size=" + data.length + " text=" + safeText + "…]"
        : "[text=" + safeText + "]";
  }

5、utf8()

utf8格式的String

  /** Constructs a new {@code String} by decoding the bytes as {@code UTF-8}. */
  public String utf8() {
    String result = utf8;
    // We don't care if we double-allocate in racy code.
    return result != null ? result : (utf8 = new String(data, Util.UTF_8));
  }

这里面的一个判断语句,实现ByteString性能优化,看来优化这个东西还是很容易实现的。第一次创建UTF-8对象的方法是调用new String(data,Util.UTF_8),后面就不再调用该方法而是直接返回result;发现uft8是对String 的方法进一步封装。

下面是ByteString的方法


image.png

六、Buffer

1、Buffer简介
  • 1、Buffer存储的是可变比特序列,需要注意的是Buffer内部对比特数据的存储不是直接使用一个byte数组那么简单,它使用了一种新的数据类型Segment进行存储。不过我们先不去管Segment是什么东西,可以先直接将Buffer想象成一个LinkedList集合就知道了,之所以做这样,因为Buffer的容量可以动态扩展,从序列的尾部存入数据,从序列的头部读取数据。其实Buffer的底层实现了远比LinkedList复杂的多,他使用双向链表的形式存储数组,链表节点的数据类型就是前面说的Segment,Segment存储有一个不可变比特序列,即final byte[] data。使用的Buffer的好处在于从一个Buffer移动到另一个Buffer的时候,实际上并没有进行拷贝,只是改变了它对应的Segment的所有者,其实这也采用链表存储数据的好处,这样的特点在多线程网络通信中带来很大的好处。最后使用Buffer还有另一个好处那就是它实现了BufferSource和BufferSink接口。
  • 2、前面讲到Buffer这个类实际上就是整个读写的核心,包括RealBufferSource和RealBufferedSink实际上都只是一个代理,里面操作全部都是通过Buffer来完成。

2、Buffer的属性及实现的接口

public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
  //明显是给16进制准备的
  private static final byte[] DIGITS =
      { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  static final int REPLACEMENT_CHARACTER = '\ufffd';
  //Buffer存储了一个这样的head节点,这就是Buffer对数据的存储结构。字节数组都是交给Segment进行管理
  Segment head;
  //当前存储的数据大小
  long size;

因为Buffer持有一个Segment的引用,所以通过这个引用能拿到整个链表中的所有数据。
同时Buffer实现了三个接口,读,写以及clone。

(1)clone接口

那咱们先说clone吧,这个比较简答,大家都能看懂,Java拷贝,主要分为深拷贝还是浅拷贝。Buffer这里采用的是浅拷贝。

  //看到注释的第一句话,我就知道了是深拷贝,哈哈!
  /** Returns a deep copy of this buffer. */
  @Override public Buffer clone() {
    //先new了一个Buffer对象
    Buffer result = new Buffer();
    //如果size==0,说明这个Buffer是空的,所以直接返回即可
    if (size == 0) return result;
    //如果size!=0,说明这个Buffer是有数据的,然后吧head指向这个copy的head,PS大家回想下Segment的这个构造函数,里面是怎么操作的?
    result.head = new Segment(head);
    //然后设置copy的head的next和prev的值
    result.head.next = result.head.prev = result.head;
    //开始遍历这个Buffer持有的Segment链了
    for (Segment s = head.next; s != head; s = s.next) {
      result.head.prev.push(new Segment(s));
    }
    result.size = size;
    return result;
  }
  • 1对应了实现的clone方法,如果整个Buffer的size为0,也就是没有数据,那么返回一个新建的Buffer对象,如果不为空就是遍历所有segment并且都创建一个对应的segment,这样clone出来的对象就是一个全新的毫无关系的对象。
  • 2前面分析segment的时候有讲到是一个双向链表,但是segment自身构造的时候却没有形成闭环,其实就是在Buffer中产生的。
    result.head.next = result.head.prev = result.head;
  • 3关于for循环有同学表示看不懂,我这里就详细解释下:
    先取出head的next对象的引用,然后压入该节点,比如 目前顺序是 A B C,A是head,A的pre是C,所以在C节点的后面加入新的segmentD,压后的顺序就是 A B C D,然后执行s.next,如果s应是最后了,则跳出循环

2、BufferedSource, BufferedSink接口

Buffer实现了这两个接口的所有方法,既有读,也有写,由于篇幅的限制,我就不全部讲解,只挑几个讲解。其他的都大同小异

 @Override public int readInt() {
    //第一步
    if (size < 4) throw new IllegalStateException("size < 4: " + size);
    //第二步
    Segment segment = head;
    int pos = segment.pos;
    int limit = segment.limit;
    // If the int is split across multiple segments, delegate to readByte().
    if (limit - pos < 4) {
      return (readByte() & 0xff) << 24
          |  (readByte() & 0xff) << 16
          |  (readByte() & 0xff) <<  8
          |  (readByte() & 0xff);
    }
     //第三步
    byte[] data = segment.data;
    int i = (data[pos++] & 0xff) << 24
        |   (data[pos++] & 0xff) << 16
        |   (data[pos++] & 0xff) <<  8
        |   (data[pos++] & 0xff);
    size -= 4;
    //第四步
    if (pos == limit) {
      head = segment.pop();
      SegmentPool.recycle(segment);
    } else {
      segment.pos = pos;
    }
    //第五步
    return i;
  }

第一步,做了一次验证,因为一个int数据的字节数是4,所以必须保证当前Buffer的size大于4。
第二步,如果当前的Segement所包含的字节数小于4,因此还需要去一个Segment中获取一分部数据,因此通过调用readByte()方法一个字节一个字节的读取,该方法我们后介绍
第三步,如果当前Segment的数据够用,因此直接从pos位置开始读取4个字节的数据,然后将其转换为int数据,转换方法很简单就是位和或运算。
第四步,如果pos==limit证明当前head对应的Segment没有可读数据,因此将该Segment从双向链表移除,并回收该Segment。如果还有数据则设置新的pos值。
第五步,返回解析得到int值
这里面提到了readByte()那么咱们就来研究下这个readByte()方法

 @Override public byte readByte() {
     //第一步
    if (size == 0) throw new IllegalStateException("size == 0");
     //第二步
    Segment segment = head;
    int pos = segment.pos;
    int limit = segment.limit;
    //第三步
    byte[] data = segment.data;
    //第四步
    byte b = data[pos++];
    size -= 1;
    //第五步
    if (pos == limit) {
      head = segment.pop();
      SegmentPool.recycle(segment);
    } else {
      segment.pos = pos;
    }
    //第六步
    return b;
  }

第一步,做size判断,如果size==0表示没有什么东西好读取的。
第二步,获取Segment,并取得这个segment的pos位置和limit的位置
第三步,获取byte[] data
第四步,获取一个字节的byte
第五步,判断这个segment有没有可读数据了,如果没有可读数据则回收Segment。如果有数据则更新pos的值
第六步,返回b

那咱们再来看看写的方法,那换一个就不写int,选写short吧

  @Override public Buffer writeShort(int s) {
    //第一步
    Segment tail = writableSegment(2);
    //第二步
    byte[] data = tail.data;
    int limit = tail.limit;
    data[limit++] = (byte) ((s >>> 8) & 0xff);
    data[limit++] = (byte)  (s        & 0xff);
    tail.limit = limit;
    //第三步
    size += 2;
    //第四步
    return this;
  }

  /**
   * Returns a tail segment that we can write at least {@code minimumCapacity}
   * bytes to, creating it if necessary.
   */
  Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
    //如果hea=null表明Buffer里面一个Segment都没有
    if (head == null) {
      //从SegmentPool取出一个Segment 
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }
    //如果head不为null.
    Segment tail = head.prev;
    //如果已经写入的数据+最小可以写入的空间超过限制,则在SegmentPool里面取一个
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
      tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
    }
    return tail;
  }

writeShort用来给Buffer写入一个short数据
第一步,通过writeableSegment拿到一个能有2个字节的空间的segment,writeableSegment方法我里面写了注释,大家自己去看,我这里就不详细说明了。
第二步,tail中的data就是字节数组,limit则是数据的尾部索引,写数据就是在尾部继续写,直接设置在data通过limit自增后的index,然后重置尾部索引.
第三步,size+2
第四步,返回tail

七、Okio中的超时机制

1、TimeOut

okio的超时机制让I/O操作不会因为异常阻塞在某个未知的错误上,okio的基础超时机制是采取同步超时。
1以输出流Sink为例子,当我们用下面的方法包装流时:

//Okio.java

 //实际上调用的两个参数的sink方法,第二个参数是new的TimeOut对象,即同步超时
public static Sink sink(OutputStream out) {
    return sink(out, new Timeout());
  }

  private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Sink() {
        @Override 
        public void write(Buffer source, long byteCount) throws IOException {
        checkOffsetAndCount(source.size, 0, byteCount);
        while (byteCount > 0) {
          timeout.throwIfReached();
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;

          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      @Override public void flush() throws IOException {
        out.flush();
      }

      @Override public void close() throws IOException {
        out.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };
  }

可以看到write方法里中实际上有一个while循环,在每个开始写的时候就调用了timeout.throwIfReached()方法,我们推断这个方法里面做了时间是否超时的判断,如果超时了,应该throw一个exception出来。这很明显是一个同步超时机制,按序执行。同理Source也是一样,那么咱们看下他里面到底是怎么执行的

  /**
   * Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
   * thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
   * asynchronously abort an in-progress operation.
   */
  public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException("thread interrupted");
    }

    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
  }

大家先看下上面的注释,我英语不是很好,大家如果有更好的翻译,请提示我,我翻译如下:
如果到达了最后的时间,会抛出InterruptedIOException,或者线程被interrupted了也会抛出InterruptedIOException。该方法是不会检查超时的,应该是是一个异步进度操作单元来实现这个类,进行检查超时。
但是当我们看okio对于socket的封装时

  /**
   * Returns a sink that writes to {@code socket}. Prefer this over {@link
   * #sink(OutputStream)} because this method honors timeouts. When the socket
   * write times out, the socket is asynchronously closed by a watchdog thread.
   */
  public static Sink sink(Socket socket) throws IOException {
    if (socket == null) throw new IllegalArgumentException("socket == null");
    AsyncTimeout timeout = timeout(socket);
    Sink sink = sink(socket.getOutputStream(), timeout);
    return timeout.sink(sink);
  }

2、AsyncTimeout

这里出现一个AsyncTimeout类,这个实际上继承于TimeOut所实现的一个异步超时类,这个异步类比同步要复杂的多,它使用了人一个WatchDog线程在后台进行监听超时。
这里面用到了Watchdog,Watchdog是AsyncTimeout的静态内部类,那么我们来下看Watchdog是个什么东西

  private static final class Watchdog extends Thread {
    public Watchdog() {
      super("Okio Watchdog");
      setDaemon(true);
    }

    public void run() {
      while (true) {
        try {
          AsyncTimeout timedOut;
          synchronized (AsyncTimeout.class) {
            timedOut = awaitTimeout();

            // Didn't find a node to interrupt. Try again.
            if (timedOut == null) continue;

            // The queue is completely empty. Let this thread exit and let another watchdog thread
            // get created on the next call to scheduleTimeout().
            if (timedOut == head) {
              head = null;
              return;
            }
          }

          // Close the timed out node.
          timedOut.timedOut();
        } catch (InterruptedException ignored) {
        }
      }
    }
  }

这里的WatchDog并不是Linux中的那个,只是一个继承于Thread的一类,里面的run方法执行的就是超时的判断,之所以在socket写时采取异步超时,这完全是由socket自身的性质决定的,socket经常会阻塞自己,导致下面的事情执行不了。AsyncTimeout继承Timeout类,可以复写里面的timeout方法,这个方法会在watchdao的线程中调用,所以不能执行长时间的操作,否则就会引起其他的超时。
下面详细分析AsyncTimeout这个类

    //不要一次写超过64k的数据否则可能会在慢连接中导致超时
    private static final int TIMEOUT_WRITE_SIZE = 64 * 1024;
    private static AsyncTimeout head;
    private boolean inQueue;
    private AsyncTimeout next;
    private long timeoutAt;

首先就是一个最大的写值,定义为64K,刚好和一个Buffer大小一样。官方的解释是如果连续读写超过这个数字的字节,那么及其容易导致超时,所以为了限制这个操作,直接给出了一个能写的最大数。
下面两个参数head和next,很明显表明这是一个单链表,timeoutAt则是超时时间。使用者在操作之前首先要调用enter()方法,这样相当于注册了这个超时监听,然后配对的实现exit()方法。这样exit()有一个返回值会表明超时是否出发,注意:这个timeout是异步的,可能会在exit()后才调用

   public final void enter() {
    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
    long timeoutNanos = timeoutNanos();
    boolean hasDeadline = hasDeadline();
    if (timeoutNanos == 0 && !hasDeadline) {
      return; // No timeout and no deadline? Don't bother with the queue.
    }
    inQueue = true;
    scheduleTimeout(this, timeoutNanos, hasDeadline);
  }

这里只是判断了inQueue的状态,然后设置inQueue的状态,真正的调用是在scheduleTimeout()里面,那我们进去看下

private static synchronized void scheduleTimeout(
      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
    // Start the watchdog thread and create the head node when the first timeout is scheduled.
    //head==null,表明之前没有,本次是第一次操作,开启Watchdog守护线程
    if (head == null) {
      head = new AsyncTimeout();
      new Watchdog().start();
    }

    long now = System.nanoTime();
    //如果有最长限制(hasDeadline我翻译为最长限制),并且超时时长不为0
    if (timeoutNanos != 0 && hasDeadline) {
      // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
      // Math.min() is undefined for absolute values, but meaningful for relative ones.
      //对比最长限制和超时时长,选择最小的那个值
      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
    } else if (timeoutNanos != 0) {
      //如果没有最长限制,但是超时时长不为0,则使用超时时长
      node.timeoutAt = now + timeoutNanos;
    } else if (hasDeadline) {
      //如果有最长限制,但是超时时长为0,则使用最长限制
      node.timeoutAt = node.deadlineNanoTime();
    } else {
     //如果既没有最长限制,和超时时长,则抛异常
      throw new AssertionError();
    }

    // Insert the node in sorted order.
    long remainingNanos = node.remainingNanos(now);
    for (AsyncTimeout prev = head; true; prev = prev.next) {
      //如果下一个为null或者剩余时间比下一个短 就插入node
      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
        node.next = prev.next;
        prev.next = node;
        if (prev == head) {
          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
        }
        break;
      }
    }
  }

上面可以看出这个链表实际上是按照剩余的超时时间来进行排序的,快到超时的节点排在表头,一次往后递增。我们以一个read的代码来砍整个超时的绑定过程。

     @Override
      public long read(Buffer sink, long byteCount) throws IOException {
            boolean throwOnTimeout = false;
            enter();
             try {
                 long result = source.read(sink,byteCount);
                 throwOnTimeout = true;
                 return result;
             }catch (IOException e){
                 throw exit(e);
             }finally {
                 exit(throwOnTimeout);
             }
      }

首先调用enterf方法,然后去做读的操作,这里可以砍到不仅在catch上而且是在finally中也做了操作,这样一场和正常的情况都考虑到了,在exit中调用了真正的exit方法,exit中会判断这个异步超时对象是否在链表中

    final void exit(boolean throwOnTimeout) throws IOException {
        boolean timeOut =  exit();
        if (timeOut && throwOnTimeout)
            throw newTimeoutException(null);
    }

    public final boolean exit(){
        if (!inQueue)
            return false;
        inQueue = false;
        return cancelScheduledTimeout(this);
    }

回到前面说的的WatchDog,内部的run方法是一个while(true)的一个死循环,由于在while(true)里面锁住了内部的awaitTimeout的操作,这个await正是判断是否超时的真正地方。

static AsyncTimeout awaitTimeout() throws InterruptedException {
        //拿到下一个节点
        AsyncTimeout node = head.next;
        //如果queue为空,等待直到有node进队,或者触发IDLE_TIMEOUT_MILLS
        if (node == null) {
            long startNanos = System.nanoTime();
            AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLS);
            return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head : null;
        }
        long waitNanos = node.remainingNanos(System.nanoTime());
        //这个head依然还没有超时,继续等待
        if (waitNanos > 0) {
            long waitMills = waitNanos / 1000000L;
            waitNanos -= (waitMills * 1000000L);
            AsyncTimeout.class.wait(waitMills, (int) waitNanos);
            return null;
        }
        head.next = node.next;
        node.next = null;
        return node;
    }

这里就比较清晰了,主要就是通过这个remainNanos来判断预定的超时时间减去当前时间是否大于0,如果比0大就说明还没超时,于是wait剩余的时间,然后表示没有超时,如果小于0,就会把这个从链表中移除,根据前面的exit方法中的判断就能触发整个超时的方法。

所以Buffer的写操作,实际上就是不断增加Segment的一个过程,读操作,就是不断消耗Segment中的数据,如果数据读取完,则使用SegmentPool进行回收。Buffer更多的逻辑主要是跨Segment读取数据,需要把前一个Segment的前端拼接在一起,因此看起来代码相对很多,但是其实开销非常低

八、okio的优雅之处

(一)、okio类

在说okio的设计模式之前,先说下okio这这个类,该类是一个大工厂,为我们创建出各种各样的Sink、Source 对象,提供了三种数据源InputStream/OutputStream、Socket、File,我们可以把本该对这个三类数据源的IO操作通过okio库来实现,更方便,更高效。

(二) 图演示

1、okio的类图


image.png

2、okio读写流程图

okio操作图.png

(三) okio高效方便之处

  • 1、它对数据进行了分块处理(Segment),这样在大数据IO的时候可以以块为单位进行IO,这可以提高IO的吞吐率
  • 2、它对这些数据块使用链表来进行管理,这可以仅通过移动指针就进行数据的管理,而不用真正的处理数据,而且对扩容来说十分方便.
  • 3、闲置的块进行管理,通过一个块池(SegmentPool)的管理,避免系统GC和申请byte时的zero-fill。其他的还有一些小细节上的优化,比如如果你把一个UTF-8的String转化为ByteString,ByteString会保留一份对原来String的引用,这样当你下次需要decode这个String时,程序通过保留的引用直接返回对应的String,从而避免了转码过程。
  • 4、他为所有的Source、Sink提供了超时操作,这是在Java原生IO操作是没有的。
  • 5、okio它对数据的读写都进行了封装,调用者可以十分方便的进行各种值(Stringg,short,int,hex,utf-8,base64等)的转化。

九、FileSystem

public interface FileSystem {
  /** The host machine's local file system. */
  FileSystem SYSTEM = new FileSystem() {
    @Override public Source source(File file) throws FileNotFoundException {
      return Okio.source(file);
    }

    @Override public Sink sink(File file) throws FileNotFoundException {
      try {
        return Okio.sink(file);
      } catch (FileNotFoundException e) {
        // Maybe the parent directory doesn't exist? Try creating it first.
        file.getParentFile().mkdirs();
        return Okio.sink(file);
      }
    }

    @Override public Sink appendingSink(File file) throws FileNotFoundException {
      try {
        return Okio.appendingSink(file);
      } catch (FileNotFoundException e) {
        // Maybe the parent directory doesn't exist? Try creating it first.
        file.getParentFile().mkdirs();
        return Okio.appendingSink(file);
      }
    }

    @Override public void delete(File file) throws IOException {
      // If delete() fails, make sure it's because the file didn't exist!
      if (!file.delete() && file.exists()) {
        throw new IOException("failed to delete " + file);
      }
    }

    @Override public boolean exists(File file) {
      return file.exists();
    }

    @Override public long size(File file) {
      return file.length();
    }

    @Override public void rename(File from, File to) throws IOException {
      delete(to);
      if (!from.renameTo(to)) {
        throw new IOException("failed to rename " + from + " to " + to);
      }
    }

    @Override public void deleteContents(File directory) throws IOException {
      File[] files = directory.listFiles();
      if (files == null) {
        throw new IOException("not a readable directory: " + directory);
      }
      for (File file : files) {
        if (file.isDirectory()) {
          deleteContents(file);
        }
        if (!file.delete()) {
          throw new IOException("failed to delete " + file);
        }
      }
    }
  };

  /** Reads from {@code file}. */
  Source source(File file) throws FileNotFoundException;

  /**
   * Writes to {@code file}, discarding any data already present. Creates parent directories if
   * necessary.
   */
  Sink sink(File file) throws FileNotFoundException;

  /**
   * Writes to {@code file}, appending if data is already present. Creates parent directories if
   * necessary.
   */
  Sink appendingSink(File file) throws FileNotFoundException;

  /** Deletes {@code file} if it exists. Throws if the file exists and cannot be deleted. */
  void delete(File file) throws IOException;

  /** Returns true if {@code file} exists on the file system. */
  boolean exists(File file);

  /** Returns the number of bytes stored in {@code file}, or 0 if it does not exist. */
  long size(File file);

  /** Renames {@code from} to {@code to}. Throws if the file cannot be renamed. */
  void rename(File from, File to) throws IOException;

  /**
   * Recursively delete the contents of {@code directory}. Throws an IOException if any file could
   * not be deleted, or if {@code dir} is not a readable directory.
   */
  void deleteContents(File directory) throws IOException;
}

看完这段代码大家就会知道,FileSystem是一个接口,里面有一个他的实现类SYSTEM.所以可以FileSystem看成okhttp中文件系统对okio的桥接管理类。
看下他的所有方法:

image.png
  • 1、Source source(File): 获取File的source (用于读)
  • 2、Sinke sink(File):获取File的Sink (用于写)
  • 3、Sink appending(File): 获取File的Sink,拼接用的(用于写)
  • 4、delete(File): void 删除文件
  • 5、exists(File):文件是否存在
  • 6、size(Flie):获取文件的大小
  • 7、rename(File,File): 文件改名
  • 8、deleteContents(File):删除文件夹
上一篇下一篇

猜你喜欢

热点阅读