1. 程式人生 > >OkHttp原始碼詳解之Okio原始碼詳解

OkHttp原始碼詳解之Okio原始碼詳解

請在電腦上閱讀,效果更佳

本文將從兩個技術點講解OkHttp
1. 講解Okio,因為Okhttp的IO操作都是基於Okio,拋開Okio的OkHttp講解是不完美的
2. 講解OkHttp原始碼

Okio

1. Okio簡介

引用官方的一段介紹

Okio是一個補充java.io和java.nio的庫,使訪問,儲存和處理資料變得更加容易。 它最初是作為Android中包含的功能強大的HTTP客戶端OkHttp的一個元件。 它運作良好,隨時準備解決新問題。

2. 從HelloWorld開始

我們知道,在java.io中InputStream和OutputStream分別代表了輸入流,和輸出流。相應的在Okio中Source和Sink分別代表了輸入流和輸出流。接下來我們分別用java.io和Okio實現列印文字內容功能

假設有個檔案helloworld.txt檔案內容如下

Hello World!
Hello World!
  • java.io實現列印功能
try {
    File file = new File("helloworld.txt");
    FileInputStream fileInputStream = new FileInputStream(file);//1
    byte[] buffer = new byte[(int) file.length()];//2
    fileInputStream.read(buffer);//3
    System.out
.write(buffer);//4 } catch (Exception e) { e.printStackTrace(); }
  • Okio實現列印功能
try {
    File file = new File("helloworld.txt");
    Source source = Okio.source(file);//a
    Buffer buffer = new Buffer();//b
    source.read(buffer, file.length());//c
    Sink sink = Okio.sink(System.out);
    sink.write(buffer, file
.length());//d } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }

上面兩段程式碼實現的功能都是一樣的,實現思路總結如下
1. 獲取檔案的輸入流 //1和//a處實現
2. 將檔案輸入流讀取到緩衝區 //3和//c處實現
3. 將緩衝區的資料寫入到Sytem.out流中 //4和//d處

image

3.Source Sink原始碼講解

1. Okio Source Sink

從上面的例子我們可以把Source想象成InputStream,把Sink想象成OutputStream。通過下面的圖片,我們來看下Source和Sink的定義
image

  • Source通過read(Buffer sink,long byteCount)方法,把磁碟,網路,或者記憶體中的輸入流的資料讀取到記憶體的Buffer中。
  • Sink剛好相反,它通過write(Buffer source,long byteCount)方法把記憶體Buffer中的資料寫入到輸出流中。
  • Okio中定義了生成Source的靜態方法,source(File file)、source(InputStream in)、source(InputStream in,Timeout timeout)、source(Socket socket)。其中source(Socket socket)在OkHttp中被用來操作網路請求的Response。這很重要是OkHttp IO操作的核心。這四個過載方法真正的實現是在source(InputStream in,Timeout timeout)中
  • Okio中定義了生成Sink的靜態方法,sink(File file)、sink(OutputStream out)、source(OutputStream out,Timeout timeout)、source(Socket socket)。其中source(Socket socket)在OkHttp中被用來操作網路請求的Request。同樣這也是OkHttp IO操作的核心。這四個過載方法真正的實現是在sink(OutputStream out,Timeout timeout)中

2. Source Sink的建立

2.1 Okio source(InputStream in,Timeout timeout)
private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
          timeout.throwIfReached();
          Segment tail = sink.writableSegment(1);
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          tail.limit += bytesRead;
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

仔細看一眼程式碼,除了int bytesRead = in.read(tail.data, tail.limit, maxToCopy);看著眼熟,其它的程式碼如Segment tail = sink.writableSegment(1);初學者表示很懵逼呀。實話告訴各位,整個Okio的精髓就在這兩行程式碼裡。這才叫四兩撥千斤。好吧,讓我們來重溫一下InputStream的read(byte[] b, int off, int len)方法

read
public int read(byte[] b,
       int off,
       int len)
         throws IOException
Reads up to len bytes of data from the input stream into an array of bytes. An attempt is made to read as many as len bytes, but a smaller number may be read. The number of bytes actually read is returned as an integer.
This method blocks until input data is available, end of file is detected, or an exception is thrown.

翻譯如下:從輸入流中讀取len個位元組到位元組陣列b中。這個方法可能會被阻塞,直到輸入流資料可用。

如此說來tail.data應該是個byte[],tail.limit是讀取流的起始位置,maxToCopy是要讀取的位元組的長度了。沒錯是這樣的。

Segment tail = sink.writableSegment(1);這句程式碼目前我還是看不懂啊,Segment是什麼呀?它和Buffer之間的關係是什麼呀?上圖!一圖抵千言
image
image

雖然說一圖抵千言,還是做個簡單的講解吧。

  • Segment說白了就是byte[],每個綠色的或者白色的小方格代表一個byte。綠色表示已經有資料了,白色表示沒有資料。pos指向第一個綠色的格子,表示讀取資料的位置,limit指向第一個白色的格子,表示寫入資料的位置。
  • Buffer是一個由Segment組成的雙鏈表。每一個Segment最多可以容下8192個位元組。在向Buffer的Segment寫入資料時,如果超過了8192個位元組,那麼會從SegmentPool(一個物件池,最多可以容下8個Segment)拿一個Segment或者新建一個Segment(因為SegmentPool中的物件都被用光了)加入到雙鏈表的尾端

接下來我們來分析下Segment原始碼,畢竟Talk Is Cheap,Show Me The Code。由於Segment程式碼還是比較簡單的。所以我就在原始碼中加入註釋來講解

final class Segment {
  /** 每個Segment最大容量8KB */
  static final int SIZE = 8192;

  /** Segment分兩種,只讀和可寫。當Segment需要被拆分成兩個小的Segment的時候,如果被拆分
   * 出去的Segment的大小超過1024,那麼那個Segment會被定義成只讀的。(暫時不理解沒關係)
  */
  static final int SHARE_MINIMUM = 1024;

  /**真正儲存資料的byte陣列**/
  final byte[] data;

  /** 讀取資料的地方 參考前面的圖片解釋 */
  int pos;

  /** 寫資料的地方 參考前面的圖片解釋 */
  int limit;

  /** 只讀模式 */
  boolean shared;

  /** 可寫模式 */
  boolean owner;

  /** 雙鏈表的next指標 */
  Segment next;

  /** 雙鏈表的prev指標 */
  Segment prev;

  Segment() {
    this.data = new byte[SIZE];
    this.owner = true;//預設是可寫的
    this.shared = false;
  }

  /**當前Segment從雙鏈表中出隊**/
  public Segment pop() {
    Segment result = next != this ? next : null;
    prev.next = next;
    next.prev = prev;
    next = null;
    prev = null;
    return result;
  }

  /**插入一個新的Segment到當前的Segment的後面**/
   public Segment push(Segment segment) {
    segment.prev = this;
    segment.next = next;
    next.prev = segment;
    next = segment;
    return segment;
  }

    /**
    *把當前的Segment分成兩個Segment。
    *使用場景 把當前Segment A寫入到Segment B中。將設A中資料的大小是2KB(記得容量是8KB)
    *B中的資料是7KB(剩餘空間1KB),這樣A往B中寫資料,肯定是寫不完的,需要把A分成
    *A1(新建的Segment)和A(原來的A,需要更新pos)
    **/
   public Segment split(int byteCount) {
    if (byteCount <= 0 || byteCount > limit - pos) throw new IllegalArgumentException();
    Segment prefix;

    if (byteCount >= SHARE_MINIMUM) {
    //如果寫入的資料超過1kb 新建一個只讀的Segment,避免arrayCopy
      prefix = new Segment(this);
    } else {
    //從SegmentPool中拿一個Segment
      prefix = SegmentPool.take();
      System.arraycopy(data, pos, prefix.data, 0, byteCount);
    }

    prefix.limit = prefix.pos + byteCount;
    pos += byteCount;
    //插入到當前Segment A的前面 A1->A
    prev.push(prefix);
    return prefix;
  }

  /**對多個Segment的空間做壓縮,用來HashSource,HashSink,GzipSource,
  * GzipSink(還怕別人問你Gzip在OkHttp中的實現原理嗎)
  **/
  public void compact() {
    //如果只有一個Segment 不需要壓縮
    if (prev == this) throw new IllegalStateException();
    // 如果前面的Segment是隻讀的,沒法壓縮
    if (!prev.owner) return; 
    int byteCount = limit - pos;
    int availableByteCount = SIZE - prev.limit + (prev.shared ? 0 : prev.pos);
    //兩個Segment的總大小不大於8kb 可以合併成一個,否則返回
    if (byteCount > availableByteCount)
    //把當前的Segment的資料寫入到前面的Segment中
    writeTo(prev, byteCount);
    //當前的Segment出佇列
    pop();
    //回收Segment
    SegmentPool.recycle(this);
  }

  /**寫byteCount個數據到sink中**/
  public void writeTo(Segment sink, int byteCount) {
    if (!sink.owner) throw new IllegalArgumentException();
    if (sink.limit + byteCount > SIZE) {
      // We can't fit byteCount bytes at the sink's current position. Shift sink first.
      if (sink.shared) throw new IllegalArgumentException();
      if (sink.limit + byteCount - sink.pos > SIZE) throw new IllegalArgumentException();
      System.arraycopy(sink.data, sink.pos, sink.data, 0, sink.limit - sink.pos);
      sink.limit -= sink.pos;
      sink.pos = 0;
    }

    System.arraycopy(data, pos, sink.data, sink.limit, byteCount);
    sink.limit += byteCount;
    pos += byteCount;
  }
}

總結下Segment的知識

  • Segment其實就是個byte[]
  • Segemnt記錄了byte的讀寫指標pos和limit
  • Segment維護一個雙鏈表

接下來我們來分析 Segment tail = sink.writableSegment(1)

Buffer.java

Segment writableSegment(int minimumCapacity) {
    if (minimumCapacity < 1 || minimumCapacity > Segment.SIZE) throw new IllegalArgumentException();
    //如果buffer 還沒有初始化,從物件池拿一個Segment,同時初始化雙鏈表
    if (head == null) {
      head = SegmentPool.take(); // Acquire a first segment.
      return head.next = head.prev = head;
    }
    //拿到雙鏈表的最後一個Segment
    Segment tail = head.prev;
    //判斷最後tail的空間夠不夠,tail是不是隻讀的Segment
    if (tail.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
    //如果空間不夠,或者是隻讀的 重新拿一個Segment放入到連結串列尾部
      tail = tail.push(SegmentPool.take()); 
    }
    return tail;
  }

skink.writeableSegment(1)的功能就是,從Buffer 的Segment連結串列中取到連結串列最後一個Segment,這個Segment需要滿足兩個條件1.可寫 2.可寫空間大於1個位元組

到這裡咱們基本上把int bytesRead = in.read(tail.data, tail.limit, maxToCopy)和Segment tail = sink.writableSegment(1)講解清楚了。那麼我們再重新看下Okio.source(InputStream in,Timeout timeOut) return new Source()程式碼塊的read方法

@Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        if (byteCount == 0) return 0;
        try {
        //支援超時讀取
          timeout.throwIfReached();
          //拿到buffer中連結串列的最後一個可寫的Segment
          Segment tail = sink.writableSegment(1);
          //獲取最大能往tail中寫多少個位元組
          int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
          //計算往Segment寫了多少資料(為什麼是寫,對buffer來說就是寫)
          int bytesRead = in.read(tail.data, tail.limit, maxToCopy);
          if (bytesRead == -1) return -1;
          //更新寫的位置
          tail.limit += bytesRead;
          //增加buffer的資料總量
          sink.size += bytesRead;
          return bytesRead;
        } catch (AssertionError e) {
          if (isAndroidGetsocknameError(e)) throw new IOException(e);
          throw e;
        }
      }

總結下Okio.source(InputStream in, Timeout timeout)

  • 從Buffer(sink)中找到連結串列中最後一個可寫的並且還有寫入空間的Segment記做tail
  • 判斷最多能寫多少資料到Buffer(sink)中記做maxToCopy
  • 從InputStream(in)中讀取maxToCopy個數據到tail中

根據第三條的結論來看,比如你呼叫了soure.read(buffer,10*1024),那其實返回的肯定是比 10*1024少。舉例說明,拿前面的helloworld舉例。現在我從網路貼上了 老羅android開發之旅的一篇文章到helloword.txt裡並重復了3遍。檔案大小為35243個位元組

try {
            File file = new File("helloworld.txt");
            System.out.println("file.length "+file.length());
            Source source = Okio.source(file);
            Sink sink = Okio.sink(System.out);
            Buffer buffer = new Buffer();
            source.read(buffer, file.length());
            System.out.println(buffer.size());
//            sink.write(buffer, file.length());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

執行輸出結果如下

file.length 35243
buffer size 8192

如果需要正確的把所有資料都寫入到buffer中就需要用while迴圈了

 try {
            File file = new File("helloworld.txt");
            System.out.println("file.length "+file.length());
            long fileLength  = file.length();
            Source source = Okio.source(file);
            Sink sink = Okio.sink(System.out);
            Buffer buffer = new Buffer();
            while (fileLength!=0) {
                long hasRead = source.read(buffer, file.length());
                fileLength-=hasRead;
            }
            System.out.println("buffer size "+buffer.size());
//            sink.write(buffer, file.length());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

執行輸出結果如下

file.length 35243
buffer size 35243

好了,至此Source基本上講解完畢,接下來講解Sink,老規矩還是從Okio.sink(OutputStream out,Timeout timeout)講起

2.2 Okio.sink(final OutputStream out, final Timeout timeout)
private static Sink sink(final OutputStream out, final Timeout timeout) {
    if (out == null) throw new IllegalArgumentException("out == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");
    return new Sink() {
    //把Buffer中的資料寫入到sink中
      @Override public void write(Buffer source, long byteCount) throws IOException {
      //檢查buffer中的資料數量是否合法(如果buffer數量<byteCount就不合法)
        checkOffsetAndCount(source.size, 0, byteCount);
        //自帶while迴圈,直到把buffer中的資料耗盡
        while (byteCount > 0) {
          timeout.throwIfReached();
          //從buffer的第一個Segment開始
          Segment head = source.head;
          int toCopy = (int) Math.min(byteCount, head.limit - head.pos);
          out.write(head.data, head.pos, toCopy);

          head.pos += toCopy;
          byteCount -= toCopy;
          source.size -= toCopy;
        //如果當前Segment寫完了出佇列
          if (head.pos == head.limit) {
            source.head = head.pop();
            SegmentPool.recycle(head);
          }
        }
      }

      @Override public void flush() throws IOException {
        out.flush();
      }

      @Override public void close() throws IOException {
        out.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "sink(" + out + ")";
      }
    };

總結下Okio.sink(OutputStream out,Timeout timeout)

  • 檢查Buffer的size 和readCount是否合法
  • 獲取Buffer中的Head Segment,把Segment中的資料寫入到OutputStream,如果當前Segment資料寫完了,Segment出佇列,並放回物件池
  • 判斷資料是否寫完,如果沒寫完,重複第二部

4. BufferedSource BufferedSink原始碼

image

BufferedSource、BufferedSink 與Source和Sink的區別如下

  • BufferedSource、BufferedSink內部維護了一個Buffer物件
  • BufferedSource、BufferedSink內部分別引用了Source、Sink物件
  • BufferedSource的read(Buffer sink,long byteCount)使用內部的Source的read(Buffer sink,long byteCount)方法
  • BufferedSink的write(Buffer source,long byteCount)使用內部的Sink的write(Buffer source,long byteCount)方法
  • BufferedSource、BufferedSink內部擴充套件了很多readXX方法如 readByte/writeByte、readInt/writeInt等等

關於BufferedXX系列的原始碼可能需要再寫一篇文章詳細講解。不過也是挺簡單的。如果你看懂了本文自行分析BufferedXX應該是不在話下

5. 擴充套件Source Sink

Okio內部有不少已實現的Source和Sink。例如GzipSource/GzipSink、HashingSource/HashingSink。至於原始碼分析,請讀者自行分析。

6. 接下來要做的事情

Okio是OkHttp IO操作的基石。接下來我們將帶著Okio的學習成果進入OkHttp原始碼分析