Okio原始碼分析
okio
是Square開源框架之一,它對 java.io
和 java.nio
做了補充,使訪問,儲存和資料處理變得更加容易。它最早是 Okhttp
元件之一。

1、ByteString與Buffer
Okio
主要圍繞 ByteString
與 Buffer
這兩個類展開,其主要功能都封裝在這兩個類中:
-
ByteString
:是一個類似String
的不可變類,它可以很容易的在byte
與String
之間進行轉換。該類提供了編/解碼為hex,md5,base64及UTF-8等方法。 -
Buffer
:是一個可變的位元組序列。 與ArrayList
一樣,無需提前調整緩衝區大小。Buffer
內部維護了一個雙向連結串列,從連結串列尾部寫入資料,頭部讀取資料。
ByteString
和 Buffer
做了一些節省CPU和記憶體的操作。 如果將一個字串編碼為 ByteString
, ByteString
就會快取對該字串的引用(以空間換時間),這樣如果以後對其進行編/解碼等操作,則無需在 byte
與 String
之間進行轉換。
//字串對應的位元組資料,避免再一次轉換 final byte[] data; //字串 transient String utf8; // Lazily computed. 複製程式碼
Buffer
內部維護了一個以 Segment
為節點的雙向連結串列。 當資料從一個 Buffer
移動到另一個 Buffer
時,僅需要進行一次資料拷貝,且它會重新分配 Segment
的所有權,而不是重新建立 Segment
物件。
2、Source與Sink
Okio
包含自己的流型別,稱為 Source
和 Sink
,其工作方式雖然類似 InputStream
和 OutputStream
,但它與Java I/O相比具有以下優勢(參考自 Android學習筆記——Okio ):
-
Okio
實現了I/O讀寫的超時機制(Timeout
),防止讀寫出錯從而導致一直阻塞。 - N合一,
OKio
精簡了輸入輸出流的類個數 - 低的CPU和記憶體消耗,引入
Segment
和SegmentPool
複用機制 - 使用方便。
ByteString
處理不變byte
,Buffer
處理可變byte
。 - 提供了一系列的工具。
OKio
支援md5、sha、base64等資料處理
Source
、 Sink
可以與 InputStream
、 OutputStream
互相操作。我們可以將任何 Source
視為 InputStream
,也可以將任何 InputStream
視為 Source
。同樣適用於 Sink
和 InputStream
。
3、Okio資料讀寫流程
前面簡單介紹了 Okio
,下面就來看看如何使用。
//okio實現圖片複製 public void copyImage(File sinkFile, File sourceFile) throws IOException { //try裡面的程式碼是Okio的標準寫法,不能改變 try (Sink sink = Okio.sink(sinkFile); BufferedSink bufferedSink = Okio.buffer(sink); //從檔案讀取資料 Source source = Okio.source(sourceFile); BufferedSource bufferedSource = Okio.buffer(source)) { //圖片複製 bufferedSink.write(bufferedSource.readByteArray()); //設定超時時間為1秒中, sink.timeout().deadline(1, TimeUnit.SECONDS); //寫入資料,將字串以UTF-8格式寫入,Okio專門針對utf-8做了處理 bufferedSink.writeUtf8(entry.getKey()) .writeUtf8("=") .writeUtf8(entry.getValue()) .writeUtf8("\n"); //讀取資料 String str=bufferedSource.readUtf8(); //讀取資料並返回一個ByteString ByteStringstr=bufferedSource.readByteString(); } } 複製程式碼
正如前面所說的那樣, Okio
使用起來非常方便。由於Java字串採用的是UTF-16編碼,而一般開發中使用的都是UTF-8編碼,所以 Okio
對字串編碼做了特殊處理。
3.1、Okio讀資料原理分析
Source
的意思是水源,它對應著輸入流,在 Okio
中通過 Okio.source
方法來獲得一個 Source
物件。
//在Okio這個類中關於source過載的方法還是蠻多的,這裡以檔案為例 public static Source source(File file) throws FileNotFoundException { if (file == null) throw new IllegalArgumentException("file == null"); return source(new FileInputStream(file)); } public static Source source(InputStream in) { return source(in, new Timeout()); } private static Source source(final InputStream in, final Timeout timeout) { ... //這裡才是真正讀去資料的地方 return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { ... try { //每次寫資料時都先檢查是否超時,預設未設定超時 timeout.throwIfReached(); //獲取連結串列的尾節點 Segment tail = sink.writableSegment(1); //由於每個Segment的SIZE為8KB,所以每一次拷貝不能超過這個值 int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); //通過InputStream讀取資料 int bytesRead = in.read(tail.data, tail.limit, maxToCopy); //資料讀取完畢 if (bytesRead == -1) return -1; //可寫取位置往後移 tail.limit += bytesRead; //讀取的總位元組數 sink.size += bytesRead; //返回當前讀取的位元組數 return bytesRead; } catch (AssertionError e) { ... } } ... }; } 複製程式碼
可以發現,這個的 Source
是一個匿名物件。得到 Source
物件後,通過 Okio.buffer
方法將該物件傳遞給 BufferedSource
, BufferedSource
是一個介面,它的具體實現類是 RealBufferedSource
。 在上面例子中是呼叫 RealBufferedSource
的 readByteArray
方法來讀取資料,下面就來看這個方法的實現。
//RealBufferedSource對應的Buffer public final Buffer buffer = new Buffer(); @Override public byte[] readByteArray() throws IOException { //將資料寫入buffer buffer.writeAll(source); //將所有資料已位元組陣列形式返回 return buffer.readByteArray(); } 複製程式碼
在 readByteArray
方法中會首先將資料寫入到 Buffer
中,並生成一個雙向連結串列。
@Override public long writeAll(Source source) throws IOException { if (source == null) throw new IllegalArgumentException("source == null"); long totalBytesRead = 0; //這裡的source就是前面在Okio中建立的匿名Source物件 for (long readCount; (readCount = source.read(this, Segment.SIZE)) != -1; ) { totalBytesRead += readCount; } return totalBytesRead; } 複製程式碼
將資料寫入 Buffer
後,呼叫 Buffer
的 readByteArray
方法生成一個位元組陣列並返回。
@Override public byte[] readByteArray() { try { //在讀取資料時,就會得到size的大小 return readByteArray(size); } catch (EOFException e) { throw new AssertionError(e); } } @Override public byte[] readByteArray(long byteCount) throws EOFException { checkOffsetAndCount(size, 0, byteCount); ... //建立一個大小為size的byte陣列 byte[] result = new byte[(int) byteCount]; //將讀取的資料寫入這個陣列中 readFully(result); return result; } @Override public void readFully(byte[] sink) throws EOFException { int offset = 0; while (offset < sink.length) { //不斷的將資料寫入sink陣列中 int read = read(sink, offset, sink.length - offset); if (read == -1) throw new EOFException(); offset += read; } } @Override public int read(byte[] sink, int offset, int byteCount) { checkOffsetAndCount(sink.length, offset, byteCount); Segment s = head; if (s == null) return -1; int toCopy = Math.min(byteCount, s.limit - s.pos); //進行資料拷貝 System.arraycopy(s.data, s.pos, sink, offset, toCopy); s.pos += toCopy; size -= toCopy; //釋放Segment並將其放入緩衝池 if (s.pos == s.limit) { head = s.pop(); SegmentPool.recycle(s); } return toCopy; } 複製程式碼
這樣就將資料寫入到一個新的陣列中,並將連結串列中的所有 Segment
重新初始化並放入池中。
3.2、Okio寫資料原理分析
Sink
的意思是水槽,它對應著輸出流。通過 Okio.sink
來獲取一個 Sink
物件。
public static Sink sink(File file) throws FileNotFoundException { if (file == null) throw new IllegalArgumentException("file == null"); return sink(new FileOutputStream(file)); } public static Sink sink(OutputStream out) { return sink(out, new Timeout()); } private static Sink sink(final OutputStream out, final Timeout timeout) { ... //建立一個匿名Sink物件 return new Sink() { @Override public void write(Buffer source, long byteCount) throws IOException { checkOffsetAndCount(source.size, 0, byteCount); //寫入資料 while (byteCount > 0) { //每次寫資料時都先檢查是否超時,預設未設定超時 timeout.throwIfReached(); //獲取頭結點 Segment head = source.head; //能copy的最小位元組 int toCopy = (int) Math.min(byteCount, head.limit - head.pos); //通過OutputStream來寫入資料 out.write(head.data, head.pos, toCopy); //可讀取的位置向後移動 head.pos += toCopy; //減少可寫入的位元組數 byteCount -= toCopy; //減少buffer中位元組數 source.size -= toCopy; //達到最大可寫的位置 if (head.pos == head.limit) { //釋放節點 source.head = head.pop(); SegmentPool.recycle(head); } } } ... }; } 複製程式碼
獲得 Sink
物件後,將該物件傳遞給 BufferedSink
, BufferedSink
是一個介面,它的具體實現是 RealBufferedSink
。
public static BufferedSink buffer(Sink sink) { return new RealBufferedSink(sink); } 複製程式碼
在3.1節中講了通過 InputStream
讀取資料並返回一個位元組陣列。這裡就將這個陣列通過 RealBufferedSink
的 write
方法寫入到新的檔案中。
@Override public BufferedSink write(byte[] source) throws IOException { if (closed) throw new IllegalStateException("closed"); buffer.write(source); return emitCompleteSegments(); } 複製程式碼
寫入資料跟讀取資料流程基本上一樣,需要先將資料寫入到 Buffer
中。
@Override public Buffer write(byte[] source) { if (source == null) throw new IllegalArgumentException("source == null"); return write(source, 0, source.length); } @Override public Buffer write(byte[] source, int offset, int byteCount) { ... int limit = offset + byteCount; while (offset < limit) { Segment tail = writableSegment(1); int toCopy = Math.min(limit - offset, Segment.SIZE - tail.limit); //進行資料拷貝 System.arraycopy(source, offset, tail.data, tail.limit, toCopy); offset += toCopy; tail.limit += toCopy; } size += byteCount; return this; } 複製程式碼
前面說過 Buffer
維護的是一個連結串列,所以這裡也是將資料寫入一個連結串列中,由於在資料讀取完畢後會將 Segment
物件重新初始化並放入到池中,所以這裡就不用建立新的 Segment
物件,直接從池中獲取即可。在寫入 Buffer
成功後,再呼叫 emitCompleteSegments
方法,該方法就是將資料從 Buffer
寫入到新檔案。
@Override public BufferedSink emitCompleteSegments() throws IOException { if (closed) throw new IllegalStateException("closed"); long byteCount = buffer.completeSegmentByteCount(); if (byteCount > 0) sink.write(buffer, byteCount); return this; } 複製程式碼
這裡的 Sink
就是在 Okio
中建立的匿名物件,在 Sink
物件中通過 OutputStream
將資料寫入到新檔案。 總體流程如下。

4、Segment及SegmentPool
Segment
是 Okio
中非常重要的一環,它可以說是 Buffer
中資料的載體。容量是8kb,頭結點為head。
final class Segment { //Segment的容量,最大為8kb static final int SIZE = 8192; //如果Segment中位元組數 > SHARE_MINIMUM時(大Segment),就可以共享,不能新增到SegmentPool static final int SHARE_MINIMUM = 1024; //儲存的資料 final byte[] data; //下一次讀取的開始位置 int pos; //寫入的開始位置 int limit; //當前Segment是否可以共享 boolean shared; //data是否僅當前Segment獨有,不share boolean owner; //後繼節點 Segment next; //前驅節點 Segment prev; ... //移除當前Segment public final @Nullable Segment pop() { Segment result = next != this ? next : null; prev.next = next; next.prev = prev; next = null; prev = null; return result; } //在當前節點後新增一個新的節點 public final Segment push(Segment segment) { segment.prev = this; segment.next = next; next.prev = segment; next = segment; return segment; } //將當前Segment分裂成2個Segment結點。前面結點pos~limit資料範圍是[pos..pos+byteCount),後面結點pos~limit資料範圍是[pos+byteCount..limit) public final Segment split(int byteCount) { if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException(); Segment prefix; //如果位元組數大於SHARE_MINIMUM則拆分成共享節點 if (byteCount >= SHARE_MINIMUM) { prefix = sharedCopy(); } else { prefix = SegmentPool.take(); System.arraycopy(data, pos, prefix.data, 0, byteCount); } prefix.limit = prefix.pos + byteCount; pos += byteCount; prev.push(prefix); return prefix; } //當前Segment結點和prev前驅結點合併成一個Segment,統一合併到prev,然後當前Segment結點從雙向連結串列移除並新增到SegmentPool複用。當然合併的前提是:2個Segment的位元組總和不超過8K。合併後可能會移動pos、limit public final void compact() { if (prev == this) throw new IllegalStateException(); if (!prev.owner) return; // Cannot compact: prev isn't writable. int byteCount = limit - pos; int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos); if (byteCount > availableByteCount) return; // Cannot compact: not enough writable space. writeTo(prev, byteCount); pop(); SegmentPool.recycle(this); } //從當前節點移動byteCount個位元組到sink中 public final void writeTo(Segment sink, int byteCount) { if (!sink.owner) throw new IllegalArgumentException(); if (sink.limit + byteCount > SIZE) { // We can't fit byteCount bytes at the sink's current position. Shift sink first. if (sink.shared) throw new IllegalArgumentException(); if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException(); System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos); sink.limit -= sink.pos; sink.pos = 0; } System.arraycopy(data, pos, sink.data, sink.limit, byteCount); sink.limit += byteCount; pos += byteCount; } } 複製程式碼
SegmentPool
是一個 Segment
池,內部維護了一個 Segment
單向連結串列,容量為64kb(8個 Segment
),回收不用的 Segment
物件。
final class SegmentPool { //SegmentPool的最大容量 static final long MAX_SIZE = 64 * 1024; // 64 KiB. //後繼節點 static Segment next; //當前池內的總位元組數 static long byteCount; private SegmentPool() { } //從池中獲取一個Segment物件 static Segment take() { synchronized (SegmentPool.class) { if (next != null) { Segment result = next; next = result.next; result.next = null; byteCount -= Segment.SIZE; return result; } } return new Segment(); // Pool is empty. Don't zero-fill while holding a lock. } //將Segment狀態初始化並放入池中 static void recycle(Segment segment) { if (segment.next != null || segment.prev != null) throw new IllegalArgumentException(); if (segment.shared) return; // This segment cannot be recycled. synchronized (SegmentPool.class) { if (byteCount + Segment.SIZE > MAX_SIZE) return; // Pool is full. byteCount += Segment.SIZE; segment.next = next; segment.pos = segment.limit = 0; next = segment; } } } 複製程式碼
當從 InputStream
中讀資料時,讀取的資料會寫進以 Segment
為節點的雙向連結串列中。如果 Segment
容量不夠(容量大於8kb),就會從 SegmentPool
中 take
一個 Segment
物件並新增到雙向連結串列尾部。 當通過 OutputStrem
寫資料時,會從雙向連結串列的 head
節點開始讀取,當 Segment
中的資料讀取完畢後,就會將該 Segment
從雙向連結串列中移除,並回收到 SegmentPool
中,等待下次複用。
5、超時機制
Okio
的亮點之一就是增加了超時機制,防止因為意外導致I/O一直阻塞的問題,預設的超時機制是同步的。 AsyncTimeout
是 Okio
中非同步超時機制的實現,它是一個單鏈表,結點按等待時間從小到大排序,head是一個頭結點,起佔位作用。使用了一個 WatchDog
的後臺執行緒來不斷的遍歷所有節點,如果某個節點超時就會將該節點從連結串列中移除,並關閉 Socket
。 AsyncTimeout
提供了3個方法 enter
、 exit
、 timeout
,分別用於流操作開始、結束、超時三種情況呼叫。
public class AsyncTimeout extends Timeout { //頭結點,佔位使用 static AsyncTimeout head; //是否在連結串列中 private boolean inQueue; //後繼節點 private AsyncTimeout next; //超時時間 private long timeoutAt; //把當前AsyncTimeout物件加入節點 public final void enter() { ... scheduleTimeout(this, timeoutNanos, hasDeadline); } private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { //建立佔位頭結點並開啟子執行緒 if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); } ... //插入到連結串列中,按照時間長短進行排序,等待事件越長越靠後 for (AsyncTimeout prev = head; true; prev = prev.next) { if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { node.next = prev.next; prev.next = node; if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; } } } //從連結串列中移除節點 public final boolean exit() { if (!inQueue) return false; inQueue = false; return cancelScheduledTimeout(this); } //執行真正的移除操作 private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) { // Remove the node from the linked list. for (AsyncTimeout prev = head; prev != null; prev = prev.next) { if (prev.next == node) { prev.next = node.next; node.next = null; return false; } } // The node wasn't found in the linked list: it must have timed out! return true; } //在子類中重寫了該方法,主要是進行socket的關閉 protected void timedOut() { } //監聽節點是否超時的子執行緒 private static final class Watchdog extends Thread { Watchdog() { super("Okio Watchdog"); setDaemon(true); } public void run() { while (true) { try { AsyncTimeout timedOut; synchronized (AsyncTimeout.class) { timedOut = awaitTimeout(); //代表頭結點的後繼節點已超時, if (timedOut == null) continue; //除頭結點外沒有任何其他節點 if (timedOut == head) { head = null; return; } } //關閉socket timedOut.timedOut(); } catch (InterruptedException ignored) { } } } } static AsyncTimeout awaitTimeout() throws InterruptedException { AsyncTimeout node = head.next; //除了頭結點外沒有任何其他節點 if (node == null) { long startNanos = System.nanoTime(); AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS); return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head// The idle timeout elapsed. : null; // The situation has changed. } long waitNanos = node.remainingNanos(System.nanoTime()); //進行等待 if (waitNanos > 0) { //等待 long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); AsyncTimeout.class.wait(waitMillis, (int) waitNanos); return null; } //代表node節點已超時 head.next = node.next; node.next = null; return node; } } 複製程式碼
預設都是未設定超時時間的,需要我們自己來設定,同步及非同步的超時時間設定方式是一樣的,通過下面程式碼即可。
sink.timeout().deadline(1, TimeUnit.SECONDS); source.timeout().deadline(1,TimeUnit.MILLISECONDS); 複製程式碼
6、生產者/消費者模型
在 Okio
中可以使用 Pipe
來實現一個生產者/消費者模型。 Pipe
維護了一個一定大小 Buffer
。當該 Buffer
容量達到最大時,執行緒就會等待直到該 Buffer
有剩餘的空間。
public final class Pipe { //Pipe的最大容量 final long maxBufferSize; //Pipe對應的Buffer final Buffer buffer = new Buffer(); boolean sinkClosed; boolean sourceClosed; //寫入流,對應著生產者 private final Sink sink = new PipeSink(); //讀取流,對應著消費者 private final Source source = new PipeSource(); public Pipe(long maxBufferSize) { //最大容量不能小於1 if (maxBufferSize < 1L) { throw new IllegalArgumentException("maxBufferSize < 1: " + maxBufferSize); } this.maxBufferSize = maxBufferSize; } ... //寫入資料到Pipe中 final class PipeSink implements Sink { final Timeout timeout = new Timeout(); @Override public void write(Buffer source, long byteCount) throws IOException { synchronized (buffer) { ... while (byteCount > 0) { ... long bufferSpaceAvailable = maxBufferSize - buffer.size(); if (bufferSpaceAvailable == 0) { //buffer中,沒有剩餘空間,等待消費者消費 timeout.waitUntilNotified(buffer); // Wait until the source drains the buffer. continue; } long bytesToWrite = Math.min(bufferSpaceAvailable, byteCount); buffer.write(source, bytesToWrite); byteCount -= bytesToWrite; //通知buffer,有新的資料了, buffer.notifyAll(); // Notify the source that it can resume reading. } } } ... } //從Pipe中讀取資料 final class PipeSource implements Source { final Timeout timeout = new Timeout(); @Override public long read(Buffer sink, long byteCount) throws IOException { synchronized (buffer) { ... while (buffer.size() == 0) { if (sinkClosed) return -1L; //Pipe中沒有資料,等待生產者寫入 timeout.waitUntilNotified(buffer); // Wait until the sink fills the buffer. } long result = buffer.read(sink, byteCount); buffer.notifyAll(); // Notify the sink that it can resume writing. return result; } } ... } } 複製程式碼
Pipe
的程式碼還是比較少的。下面就來如何使用 Pipe
。
public void pipe() throws IOException { //設定Pipe的容量為1024位元組,即1kb Pipe pipe = new Pipe(1024); new Thread(new Runnable() { @Override public void run() { try (BufferedSource bufferedSource = Okio.buffer(pipe.source())) { //將Pipe中資料寫入env4.txt這個檔案中 bufferedSource.readAll(Okio.sink(new File("file/env4.txt"))); } catch (IOException e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try (BufferedSink bufferedSink = Okio.buffer(pipe.sink())) { //將env3.txt中資料寫入到Pipe中 bufferedSink.writeAll(Okio.source(new File("file/env3.txt"))); } catch (IOException e) { e.printStackTrace(); } } }).start(); } 複製程式碼