最近研究OKhttp 发现Okhttp内部使用Okio这个开源库来进行数据的读写,一时好奇就拿来研究,发现里面有很多可以提取的干货知识点,现在从源码解析的角度分享给大家。
public void writeFile() throws Exception {
File file = new File("./text.txt");
BufferedSink sink = Okio.buffer(Okio.sink(file));
sink.writeUtf8("Hello, java.io file!");
public void readFile() throws Exception {
BufferedSource source = Okio.buffer(Okio.source(file));
Segment : 在OKio中是映射作为一个内存缓冲段使用的,SIZE = 8192SegmentPool :在OKio中负责生成内存缓冲段,并在内部维护了一个链表用来回收这些内存段以供程序下次使用。
* A collection of unused segments, necessary to avoid GC churn and zero-fill.
* This pool is a thread-safe static singleton.
final class SegmentPool {
/** The maximum number of bytes to pool. */
static final long MAX_SIZE = 64 * 1024; // 64 KiB.
/** Singly-linked list of segments. */
static Segment next;
/** Total bytes in this pool. */
static long byteCount;
private SegmentPool() {}
static Segment take() {
synchronized (SegmentPool.class) { //独占锁保护多线程使用
if (next != null) { //取出链表中一个节点,缓存池中保存内存数减少
Segment result = next;
next = result.next;
result.next = null;
byteCount -= Segment.SIZE;
return result;
return new Segment(); // Pool is empty. Don't zero-fill while holding a lock.
static void recycle(Segment segment) {
if (segment.next != null || segment.prev != null) throw new IllegalArgumentException();
if (segment.shared) return; // This segment cannot be recycled.
synchronized (SegmentPool.class) {
if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full.
byteCount += Segment.SIZE;
segment.next = next;
segment.pos = segment.limit = 0;
next = segment;
public interface Sink extends Closeable, Flushable {
/** Removes {@code byteCount} bytes from {@code source} and appends them to this. */
void write(Buffer source, long byteCount) throws IOException;
/** Pushes all buffered bytes to their final destination. */
@Override void flush() throws IOException;
/** Returns the timeout for this sink. */
Timeout timeout();
- Pushes all buffered bytes to their final destination and releases the
- resources held by this sink. It is an error to write a closed sink. It is
- safe to close a sink more than once.
@Override void close() throws IOException;
public interface Source extends Closeable {
* Removes at least 1, and up to {@code byteCount} bytes from this and appends
* them to {@code sink}. Returns the number of bytes read, or -1 if this
* source is exhausted.
long read(Buffer sink, long byteCount) throws IOException;
/** Returns the timeout for this source. */
Timeout timeout();
* Closes this source and releases the resources held by this source. It is an
* error to read a closed source. It is safe to close a source more than once.
@Override void close() throws IOException;
Buffer: BufferedSink,BufferedSource接口的实现类,用于将数据写入缓冲区和从缓冲区里读取数据。(不过这个是内部使用的,外部操作另有其人)

这两个类的内部都有一个Buffer 和一个 Sink/Source 对象用于实际的IO操作,框架基本流程就是
/** Returns a sink that writes to {@code out}. */
public static Sink sink(OutputStream out) {
return sink(out, new Timeout());
private static Sink sink(final OutputStream out, final Timeout timeout) {
if (out == null) throw new IllegalArgumentException("out == null");
if (timeout == null) throw new IllegalArgumentException("timeout == null");
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0) {
timeout.throwIfReached(); //读取的时候判断是否超时
Segment head = source.head; //牵出缓存链表头部
int toCopy = (int) Math.min(byteCount, head.limit - head.pos); //判断链表节点数据长度
out.write(head.data, head.pos, toCopy); //写进实体IO中
head.pos += toCopy; //已经读取过数据的指针位移
byteCount -= toCopy; //数据总数指针位移
source.size -= toCopy; //缓存中数据改变
if (head.pos == head.limit) { //如果本节点数据完全读取则,删除本链表并删除这个节点
source.head = head.pop();
@Override public void flush() throws IOException {
@Override public void close() throws IOException {
@Override public Timeout timeout() {
return timeout;
@Override public String toString() {
return "sink(" + out + ")";
可以看到Sink方法返回了一个对IO流的实体操作类,操作类直接在方法中实现。Buffer source 返回一个内存链表,byteCount 返回的是缓存数据大小。
BufferedSink sink = Okio.buffer(Okio.sink(file)); sink.writeUtf8("Hello, java.io file!");
* Returns a new sink that buffers writes to {@code sink}. The returned sink
* will batch writes to {@code sink}. Use this wherever you write to a sink to
* get an ergonomic and efficient access to data.
public static BufferedSink buffer(Sink sink) {
return new RealBufferedSink(sink);
final class RealBufferedSink implements BufferedSink {
public final Buffer buffer = new Buffer();
public final Sink sink;
boolean closed;
RealBufferedSink(Sink sink) {
if (sink == null) throw new NullPointerException("sink == null");
this.sink = sink;
@Override public BufferedSink writeUtf8(String string) throws IOException {
if (closed) throw new IllegalStateException("closed");
return emitCompleteSegments();
@Override public BufferedSink emitCompleteSegments() throws IOException {
if (closed) throw new IllegalStateException("closed");
long byteCount = buffer.completeSegmentByteCount();
//将注入的数据实体操作类,进行操作,就是前面OKio sink方法生成的对象
if (byteCount > 0) sink.write(buffer, byteCount);
return this;
public final class Buffer implements BufferedSource, BufferedSink, Cloneable {
@Override public Buffer writeUtf8(String string) {
return writeUtf8(string, 0, string.length());
@Override public Buffer writeUtf8(String string, int beginIndex, int endIndex) {
if (string == null) throw new IllegalArgumentException("string == null");
if (beginIndex < 0) throw new IllegalAccessError("beginIndex < 0: " + beginIndex);
if (endIndex < beginIndex) {
throw new IllegalArgumentException("endIndex < beginIndex: " + endIndex + " < " + beginIndex);
if (endIndex > string.length()) {
throw new IllegalArgumentException(
"endIndex > string.length: " + endIndex + " > " + string.length());
// Transcode a UTF-16 Java String to UTF-8 bytes.
for (int i = beginIndex; i < endIndex;) {
int c = string.charAt(i);
if (c < 0x80) {
Segment tail = writableSegment(1);
byte[] data = tail.data;
int segmentOffset = tail.limit - i;
int runLimit = Math.min(endIndex, Segment.SIZE - segmentOffset);
// Emit a 7-bit character with 1 byte.
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
// Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance
// improvement over independent calls to writeByte().
while (i < runLimit) {
c = string.charAt(i);
if (c >= 0x80) break;
data[segmentOffset + i++] = (byte) c; // 0xxxxxxx
int runSize = i + segmentOffset - tail.limit; // Equivalent to i - (previous i).
tail.limit += runSize;
size += runSize;
} else if (c < 0x800) {
// Emit a 11-bit character with 2 bytes.
writeByte(c >> 6 | 0xc0); // 110xxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
} else if (c < 0xd800 || c > 0xdfff) {
// Emit a 16-bit character with 3 bytes.
writeByte(c >> 12 | 0xe0); // 1110xxxx
writeByte(c >> 6 & 0x3f | 0x80); // 10xxxxxx
writeByte(c & 0x3f | 0x80); // 10xxxxxx
} else {
// c is a surrogate. Make sure it is a high surrogate & that its successor is a low
// surrogate. If not, the UTF-16 is invalid, in which case we emit a replacement character.
int low = i + 1 < endIndex ? string.charAt(i + 1) : 0;
if (c > 0xdbff || low < 0xdc00 || low > 0xdfff) {
// UTF-16 high surrogate: 110110xxxxxxxxxx (10 bits)
// UTF-16 low surrogate: 110111yyyyyyyyyy (10 bits)
// Unicode code point: 00010000000000000000 + xxxxxxxxxxyyyyyyyyyy (21 bits)
int codePoint = 0x010000 + ((c & ~0xd800) << 10 | low & ~0xdc00);
// Emit a 21-bit character with 4 bytes.
writeByte(codePoint >> 18 | 0xf0); // 11110xxx
writeByte(codePoint >> 12 & 0x3f | 0x80); // 10xxxxxx
writeByte(codePoint >> 6 & 0x3f | 0x80); // 10xxyyyy
writeByte(codePoint & 0x3f | 0x80); // 10yyyyyy
i += 2;
return this;
从源码中可以看到 writeUtf8 将字符串分解逐一保存到内存中,开辟内存的方法是
Segment tail = writableSegment(1);
* Returns a tail segment that we can write at least {@code minimumCapacity}
* bytes to, creating it if necessary.
Segment writableSegment(int minimumCapacity) {
if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
if (head == null) {
head = SegmentPool.take(); // Acquire a first segment.
return head.next = head.prev = head;
Segment tail = head.prev;//获取最后一个节点,如果最后一个节点空间不够,则再添加一个空节点
if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()); // Append a new empty segment to fill up.
return tail;

public static Sink sink(Socket socket) throws IOException {
if (socket == null) throw new IllegalArgumentException("socket == null");
AsyncTimeout timeout = timeout(socket);
Sink sink = sink(socket.getOutputStream(), timeout);
return timeout.sink(sink);
public final Sink sink(final Sink sink) {
return new Sink() {
@Override public void write(Buffer source, long byteCount) throws IOException {
checkOffsetAndCount(source.size, 0, byteCount);
while (byteCount > 0L) {
// Count how many bytes to write. This loop guarantees we split on a segment boundary.
long toWrite = 0L;
for (Segment s = source.head; toWrite < TIMEOUT_WRITE_SIZE; s = s.next) {
int segmentSize = source.head.limit - source.head.pos;
toWrite += segmentSize;
if (toWrite >= byteCount) {
toWrite = byteCount;
// Emit one write. Only this section is subject to the timeout.
boolean throwOnTimeout = false;
try {
sink.write(source, toWrite);
byteCount -= toWrite;
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
@Override public void flush() throws IOException {
boolean throwOnTimeout = false;
try {
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
@Override public void close() throws IOException {
boolean throwOnTimeout = false;
try {
throwOnTimeout = true;
} catch (IOException e) {
throw exit(e);
} finally {
@Override public Timeout timeout() {
return AsyncTimeout.this;
@Override public String toString() {
return "AsyncTimeout.sink(" + sink + ")";
同样使用了装饰者模式为所有的IO操作添加了异步超时 enter();让我们进入这个方法里面看一下
public final void enter() {
if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
long timeoutNanos = timeoutNanos();
boolean hasDeadline = hasDeadline();
if (timeoutNanos == 0 && !hasDeadline) {
return; // No timeout and no deadline? Don't bother with the queue.
inQueue = true;
scheduleTimeout(this, timeoutNanos, hasDeadline);
private static synchronized void scheduleTimeout(
AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
// Start the watchdog thread and create the head node when the first timeout is scheduled.
if (head == null) {
head = new AsyncTimeout();
new Watchdog().start();
long now = System.nanoTime();
if (timeoutNanos != 0 && hasDeadline) {
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
// Math.min() is undefined for absolute values, but meaningful for relative ones.
node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
} else if (timeoutNanos != 0) {
node.timeoutAt = now + timeoutNanos;
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime();
} else {
throw new AssertionError();
// Insert the node in sorted order.
long remainingNanos = node.remainingNanos(now);
for (AsyncTimeout prev = head; true; prev = prev.next) {
if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
node.next = prev.next;
prev.next = node;
if (prev == head) {
AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
new Watchdog().start();
private static final class Watchdog extends Thread {
public Watchdog() {
super("Okio Watchdog");
public void run() {
while (true) {
try {
AsyncTimeout timedOut;
synchronized (AsyncTimeout.class) {
timedOut = awaitTimeout();
// Didn't find a node to interrupt. Try again.
if (timedOut == null) continue;
// The queue is completely empty. Let this thread exit and let another watchdog thread
// get created on the next call to scheduleTimeout().
if (timedOut == head) {
head = null;
// Close the timed out node.
} catch (InterruptedException ignored) {
static AsyncTimeout awaitTimeout() throws InterruptedException {
// Get the next eligible node.
AsyncTimeout node = head.next;
// The queue is empty. Wait until either something is enqueued or the idle timeout elapses.
if (node == null) {
long startNanos = System.nanoTime();
return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS
? head // The idle timeout elapsed.
: null; // The situation has changed.
long waitNanos = node.remainingNanos(System.nanoTime());
// The head of the queue hasn't timed out yet. Await that.
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
return null;
// The head of the queue has timed out. Remove it.
head.next = node.next;
node.next = null;
return node;
这个方法计算节点的超时时间并阻塞线程,等到节点超时后移除节点。并通知前方节点超时,最终 timeout.sink 的exit()方法会根据节点是否存在 && IO是否操作完成来判断是否抛出异常。从这里我们可以学到一个,使用装饰者模式添加超时监听的方法。将原本的操作上封装一层,然后启动超时监听队列,去判断是否超时并更新界面。这种排序链式超时请求监听的方式,可以在编程中借鉴。
整体可以看出,Okio的整体设计思想是以空间来换取时间,无论是在内存方面还是CPU使用的密集度上面都做了极致的优化,这些在我们以后编程过程中可以借鉴,比如它的链式内存管理池。在一个是设计上,使用装饰者加组合模式的应用,使整个代码逻辑看上去非常简洁和明了,而且扩展性很强,框架不仅仅只提供 RealBufferedSink 这样的接口,还提供了许多不同的任务接口,来提供不同的IO服务。第三就是超时机制的应用,对于大量对象的超时管理一般都是比较复杂的,它同样使用了排队链表进行管理,以回调的方式通知主体操作类。这样的技术同样能够为我们以后的编程提供帮助。