1. 程式人生 > >java IO 包源碼解析

java IO 包源碼解析

bytes lte 分配 target 來看 等等 ase 大小限制 updater

本文參考連接:http://blog.csdn.net/class281/article/details/24849275

http://zhhphappy.iteye.com/blog/1562427

一、IO包簡要類圖

技術分享

Java I/O流部分分為兩個模塊,即Java1.0中就有的面向字節的流(Stream),以及Java1.1中大幅改動添加的面向字符的流(Reader & Writer)。添加面向字符的流主要是為了支持國際化,舊的I/O流僅支持8位的字節流,並不能很好的處理16位的Unicode字符(Java的基礎類型char也是16位的Unicode)。下面就針對這兩類流做一個簡要的分析。

二、面向字節的流

InputStream(OutputStream)是所有面向字節流的基類。它的子類分為兩大塊,一是諸如ByteArrayInputStream(ByteArrayOutputStream),FileInputStream(FileOutputStream)等等面向各種不同的輸入(輸出)源的子類,另一塊為了方便流操作的而進一步封裝的裝飾器系列FilterInputStream(FilterOutputStream)類及其子類。

BufferedInputStream

BufferedInputStream 是一個帶有內存緩沖的 InputStream.

1.首先來看類結構

技術分享

BufferedInputStream是繼承自FilterInputStream。
FilterInputStream繼承自InputStream屬於輸入流中的鏈接流,同時引用了InputStream,將InputStream封裝成一個內部變量,同時構造方法上需要傳入一個InputStream。這是一個典型的裝飾器模式,他的任何子類都可以對一個繼承自InputStream的原始流或其他鏈接流進行裝飾,如我們常用的使用BufferedInputStream對FileInputStream進行裝飾,使普通的文件輸入流具備了內存緩存的功能,通過內存緩沖減少磁盤io次數。

1 protected
volatile InputStream in; 2 protected FilterInputStream(InputStream in) { 3 this.in = in; 4 }

註意:成員變量in使用了volatile關鍵字修飾,保障了該成員變量多線程情況下的可見性。

2.內存緩沖的實現
概要的了解完BufferedInputStream的繼承關系,接下來詳細理解BufferedInputStream是如何實現內存緩沖。既是內存緩沖,就涉及到內存的分配,管理以及如何實現緩沖。
通過構造方法可以看到:初始化了一個byte數組作為內存緩沖區,大小可以由構造方法中的參數指定,也可以是默認的大小。

 1 protected volatile byte buf[];
 2 private static int defaultBufferSize = 8192;
 3 public BufferedInputStream(InputStream in, int size) {
 4     super(in);
 5     if (size <= 0) {
 6         throw new IllegalArgumentException("Buffer size <= 0");
 7     }
 8     buf = new byte[size];
 9 }
10 public BufferedInputStream(InputStream in) {
11     this(in, defaultBufferSize);
12 }

看完構造函數,大概可以了解其實現原理:通過初始化分配一個byte數組,一次性從輸入字節流中讀取多個字節的數據放入byte數組,程序讀取部分字節的時候直接從byte數組中獲取,直到內存中的數據用完再重新從流中讀取新的字節。那麽從api文檔中我們可以了解到BufferedStream大概具備如下的功能:

技術分享

從api可以了解到BufferedInputStream除了使用一個byte數組做緩沖外還具備打標記,重置當前位置到標記的位置重新讀取數據,忽略掉n個數據。這些功能都涉及到緩沖內存的管理,首先看下相關的幾個成員變量:

1 protected int count;
2 protected int pos;
3 protected int markpos = -1;
4 protected int marklimit;

count表示當前緩沖區內總共有多少有效數據;pos表示當前讀取到的位置(即byte數組的當前下標,下次讀取從該位置讀取);markpos:打上標記的位置;marklimit:最多能mark的字節長度,也就是從mark位置到當前pos的最大長度。

從最簡單的read()讀取一個字節的方法開始看:

1 public synchronized int read() throws IOException {
2     if (pos >= count) {
3         fill();
4         if (pos >= count)
5         return -1;
6     }
7     return getBufIfOpen()[pos++] & 0xff;
8 }
 1     /**
 2      * Fills the buffer with more data, taking into account
 3      * shuffling and other tricks for dealing with marks.
 4      * Assumes that it is being called by a synchronized method.
 5      * This method also assumes that all data has already been read in,
 6      * hence pos > count.
 7      */
 8     private void fill() throws IOException {
 9         byte[] buffer = getBufIfOpen();
10         if (markpos < 0)
11             pos = 0;            /* no mark: throw away the buffer */
12         else if (pos >= buffer.length)  /* no room left in buffer */
13             if (markpos > 0) {  /* can throw away early part of the buffer */
14                 int sz = pos - markpos;
15                 System.arraycopy(buffer, markpos, buffer, 0, sz);
16                 pos = sz;
17                 markpos = 0;
18             } else if (buffer.length >= marklimit) {
19                 markpos = -1;   /* buffer got too big, invalidate mark */
20                 pos = 0;        /* drop buffer contents */
21             } else if (buffer.length >= MAX_BUFFER_SIZE) {
22                 throw new OutOfMemoryError("Required array size too large");
23             } else {            /* grow buffer */
24                 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
25                         pos * 2 : MAX_BUFFER_SIZE;
26                 if (nsz > marklimit)
27                     nsz = marklimit;
28                 byte nbuf[] = new byte[nsz];
29                 System.arraycopy(buffer, 0, nbuf, 0, pos);
30                 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
31                     // Can‘t replace buf if there was an async close.
32                     // Note: This would need to be changed if fill()
33                     // is ever made accessible to multiple threads.
34                     // But for now, the only way CAS can fail is via close.
35                     // assert buf == null;
36                     throw new IOException("Stream closed");
37                 }
38                 buffer = nbuf;
39             }
40         count = pos;
41         int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
42         if (n > 0)
43             count = n + pos;
44     }
    /**
     * See the general contract of the <code>mark</code>
     * method of <code>InputStream</code>.
     *
     * @param   readlimit   the maximum limit of bytes that can be read before
     *                      the mark position becomes invalid.
     * @see     java.io.BufferedInputStream#reset()
     */
    public synchronized void mark(int readlimit) {
        marklimit = readlimit;
        markpos = pos;
    }

    /**
     * See the general contract of the <code>reset</code>
     * method of <code>InputStream</code>.
     * <p>
     * If <code>markpos</code> is <code>-1</code>
     * (no mark has been set or the mark has been
     * invalidated), an <code>IOException</code>
     * is thrown. Otherwise, <code>pos</code> is
     * set equal to <code>markpos</code>.
     *
     * @exception  IOException  if this stream has not been marked or,
     *                  if the mark has been invalidated, or the stream
     *                  has been closed by invoking its {@link #close()}
     *                  method, or an I/O error occurs.
     * @see        java.io.BufferedInputStream#mark(int)
     */
    public synchronized void reset() throws IOException {
        getBufIfOpen(); // Cause exception if closed
        if (markpos < 0)
            throw new IOException("Resetting to invalid mark");
        pos = markpos;
    }

當pos>=count的時候也就是表示當前的byte中的數據為空或已經被讀完,他調用了一個fill()方法,從字面理解就是填充的意思,實際上是從真正的輸入流中讀取一些新數據放入緩沖內存中,之後直到緩沖內存中的數據讀完前都不會再從真正的流中讀取數據。
看源碼中的fill()方法有很大一段是關於markpos的處理,其處理過程大致如下圖:

a.沒有markpos的情況很簡單:

技術分享

b.有mark的情況比較復雜:

技術分享

3.read()方法返回值
以上即為內存緩沖管理的完全過程,再回過頭看read()方法,當緩沖byte數組中有數據可以讀時,直接從數組中讀取一個字節,但最後的read方法返回的卻是int,而且還和0xff做了與運算。

1 return getBufIfOpen()[pos++] & 0xff;

為什麽不直接返回一個byte,而是一個與運算後的int。首先宏觀的看InputStream和Reader兩個輸入流的抽象類都定義了read接口而且都返回int,一個是字節流,一個是字符流。我們知道字節用byte表示,字符用char表示。首先看java中基本類型的取值範圍:

技術分享

從取值範圍來看int包含了char和byte,這為使用int作為返回值類型提供了可能。
在應用中我們一般用read()接口的返回值是-1則表示已經讀到文件尾(EOF)。

char的取值範圍本身不包含負數,所有用int的-1表示文件讀完沒問題,但byte的取值範圍-128 ~ 127,包含了-1,讀取的有效數據範圍就是-128~127,沒辦法用這個取值範圍中的任何一個數字表示異常或者數據已經讀完,所以接口如果直接使用byte作為返回值不可行,直接將byte強制類型轉換成int也不行,因為如果讀到一個byte的-1,轉為int了也是-1,會被理解為文件已經讀完。所以這裏做了一個特殊處理return getBufIfOpen()[pos++] & 0xff。


0xff是int類型,二進制為0000 0000 0000 0000 0000 0000 1111 1111。

上述的與運算實際上讀取的byte先被強制轉換成了int,例如byte的-1(最高位表示符號位,以補碼的形式表示負數為:1111 1111)

轉換為int之後的二進制1111 1111 1111 1111 1111 1111 1111 1111

& 0xff之後高位去0

最後返回的結果是0000 0000 0000 0000 0000 0000 1111 1111, 為int值為256

其-128~-1被轉為int中128~256的正數表示。

這樣解決了可以用-1表示文件已經讀完。但關鍵是數據的值發生了變化,真正要用讀取的數據時是否還能拿到原始的byte。還拿上面那個例子來看,當讀取返回一個256時,將其強制類型轉換為byte,(byte)256得到byte的-1,因為byte只有8位,當int的高位被丟棄後就只剩下1111 1111,在byte中高位的1表示符號位為負數,最終的結果即是byte的-1;同樣byte的-128(1000 0000)被轉為int的128(0000 0000 0000 0000 0000 0000 1000 0000),強制類型轉換後還原byte的1000 0000。

4.線程安全

返回值中還有一個細節是getBufIfOpen()[pos++],直接將pos++來獲取下一個未讀取的數據,這裏涉及到的兩個元素:一個內存數組,一個當前讀取的數據下標都是全局變量,pos++也不是線程安全。那麽BufferedInputStream如何保證對內存緩沖數組的操作線程安全?源碼中有操作的public方法除了close方法之外,其他方法上都加上了synchronized關鍵字,以保障上面描述的整個內存緩存數組的操作是線程安全的。但為什麽close方法沒有synchronized,我們看這個方法做了些什麽事情:

byte[] buffer;
while ( (buffer = buf) != null) {
    if (bufUpdater.compareAndSet(this, buffer, null)) {
    InputStream input = in;
    in = null;
    if (input != null)
        input.close();
    return;
    }
    // Else retry in case a new buf was CASed in fill()
}

簡單來看做了兩個操作:把內存數組置為null,將引用的inputStream置為null,同時將引用的inputStream.close();
這兩個操作的核心都是關閉原始流,釋放資源,如果加了synchronized關鍵字,會導致當前線程正在執行read方法,而且系統消耗很大時,想釋放資源無法釋放。此時read方法還沒執行完,我們知道synchronized的鎖是加在整個對象上的,所以close方法就必須等到read結束後才能執行,這樣很明顯不能滿足close的需求,甚至會導致大量的io資源被阻塞不能關閉。
但該方法用一個while循環,而且只有當bufUpdater.compareAndSet(this, buffer, null)成功時,才執行上述的資源釋放。
先看bufUpdater這個全局變量

protected volatile byte buf[];
private static final 
        AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = 
        AtomicReferenceFieldUpdater.newUpdater
        (BufferedInputStream.class,  byte[].class, "buf");

AtomicReferenceFieldUpdater是一個抽象類,但該類的內部已經給出了包訪問控制級別的一個實現AtomicReferenceFieldUpdaterImpl,原理是利用反射將一個 被聲明成volatile 的屬性通過JNI調用,使用cpu指令級的命令將一個變量進行更新,保障該操作是原子的。也就是通過上面定義的bufUpdater將buf這個byte數組的跟新變為原子操作,其作用是保障其原子更新。
BufferedInputStream源代碼中總共有兩個地方用到了這個bufUpdater,一個是我們上面看到的close方法中,另外一個是再前面說道的fill()方法中。既然BufferedInputStream的所有操作上都用了synchronized來做同步,那為什麽這裏還需要用這個原子更新器呢?帶著問題上面提到過fill()方法中的最後一個步驟:當有mark,而且markLimit的長度又大於初始數組的長度時,需要對內存數組擴容,即創建一個尺寸更大的數組,將原來數組中的數據拷貝到新數組中,再將指向原數組的應用指向新的數組。bufUpdater正是用在了將原數組引用指向新數組的操作上,同樣close的方法使用的bufUpdater也是用在對數組引用的改變上,這樣看來就比較清晰了,主要是為了防止一個線程在執行close方法時,將buffer賦值為null這個時候另外一個線程正在執行fill()方法的最後一個步驟又將buffer賦值給了一個新的數組,從而導致資源沒有釋放掉。

BufferedOutputStream

下面看一下 bufferedOutputStream的源碼,

相比之下緩沖輸出流就簡單很多,基本上write方法可以說明一切了:

  • 在緩沖區(也是字節數組)寫滿之前調用write方法只是將數據復制到內部緩沖區;

  • 在緩沖區寫滿之後,現將舊的數據寫入到底層輸出流,然後將新的數據暫存到緩沖區(新的數據並未同時寫入)

於是flush方法也找到了存在的意義:將現有數據全部寫入。

 1 package java.io;
 2 
 3 public class BufferedOutputStream extends FilterOutputStream {
 4     // 保存“緩沖輸出流”數據的字節數組
 5     protected byte buf[];
 6 
 7     // 緩沖中數據的大小
 8     protected int count;
 9 
10     // 構造函數:新建字節數組大小為8192的“緩沖輸出流”
11     public BufferedOutputStream(OutputStream out) {
12         this(out, 8192);
13     }
14 
15     // 構造函數:新建字節數組大小為size的“緩沖輸出流”
16     public BufferedOutputStream(OutputStream out, int size) {
17         super(out);
18         if (size <= 0) {
19             throw new IllegalArgumentException("Buffer size <= 0");
20         }
21         buf = new byte[size];
22     }
23 
24     // 將緩沖數據都寫入到輸出流中
25     private void flushBuffer() throws IOException {
26         if (count > 0) {
27             out.write(buf, 0, count);
28             count = 0;
29         }
30     }
31 
32     // 將“數據b(轉換成字節類型)”寫入到輸出流中
33     public synchronized void write(int b) throws IOException {
34         // 若緩沖已滿,則先將緩沖數據寫入到輸出流中。
35         if (count >= buf.length) {
36             flushBuffer();
37         }
38         // 將“數據b”寫入到緩沖中
39         buf[count++] = (byte)b;
40     }
41 
42     public synchronized void write(byte b[], int off, int len) throws IOException {
43         // 若“寫入長度”大於“緩沖區大小”,則先將緩沖中的數據寫入到輸出流,然後直接將數組b寫入到輸出流中
44         if (len >= buf.length) {
45             flushBuffer();
46             out.write(b, off, len);
47             return;
48         }
49         // 若“剩余的緩沖空間 不足以 存儲即將寫入的數據”,則先將緩沖中的數據寫入到輸出流中
50         if (len > buf.length - count) {
51             flushBuffer();
52         }
53         System.arraycopy(b, off, buf, count, len);
54         count += len;
55     }
56 
57     // 將“緩沖數據”寫入到輸出流中
58     public synchronized void flush() throws IOException {
59         flushBuffer();
60         out.flush();
61     }
62 }

FileInputStream & FileOutputStream

以文件為輸入輸出目標的類,其實也可以想象得到,讀寫本地文件的類追溯上去肯定是本地方法。所以當然它的一系列read(write)方法都是native的。這個以後如果以機會的話再研究。

目前能看到的輔助功能有:

  • FileInputStream(FileOutputStream)利用ThreadLocal類來判斷打開這個流的線程數(ThreadLocal中定義了一個two-sized ThreadLocalMap,具體原理待我看完HashMap再回來 /_\)。
  • 都可以調用getChannel()方法,利用sun.nio.ch.FileChannelImpl類返回一個FileChannel對象,即可以將文件流轉為通道操作。

讀寫文件的這一溜方法大都在sun.nio.ch和sun.misc包裏,有興趣的可以去看openjdk提供的源碼,不過Java裏面也是調用native方法,而且考慮到跨平臺特性估計設計上也會更加復雜,所以推薦先去了解C的文件讀寫。

ByteArrayInputStream & ByteArrayOutputStream

這兩個類在很多地方被翻譯成內存輸入(輸出)流,當時俺就被這高大上的名字深深的折服了。
其實它們的功能、實現都非常簡單,先把所有的數據全存到它內部的字節數組裏,然後用這個數組來繼續讀寫,這個時候底層流你就可以不用管了,愛關就關沒有影響。

插播:ByteArrayInputStream vs BufferedInputStream

經常會有人把這兩個類混在一起,於是特地在此比劃一番。說到它們的區別,但實際上從類的組織結構上可以看出來,這兩個類其實沒有什麽聯系:一個是以內存中的字節數組為輸入目標的類,一個是為了更好的操作字節輸入流而提供的帶有緩沖區的裝飾器類。它們的使用目的本就不一樣,只不過由於名字似曾相識,打扮得(實現方式)也差不多,所以經常被誤認為兩兄弟。這兩兄弟的差距還是蠻大的:

  • ByteArrayInputStream需要在內部的保存流的所有數據,所以需要一個足夠大的字節數組,但數組的容量是受JVM中堆空間大小限制的,更極端的情況,即使你為JVM分配了一個很大的空間,由於Java數組使用的是int型索引,所以你也猜到了,它還是會被限制在INT_MAX範圍以內。在底層流數據全部保存到ByteArrayInputStream後,你就可以不用再管流,轉而去從ByteArrayInputStream讀取數據了。

  • 而BufferedInputStream只會用一個有限大小的緩存數組保存底層流的一小部分數據,你在讀取數據的時候其實還是在和底層流打交道,只不過BufferedInputStream為了滿足你變幻莫測的讀取要求提供了緩沖區,讓你的讀取操作更加犀利流暢。

所以總結起來,它們除了同樣從InputStream派生而來,同樣使用了字節數組(這是個經常發生的巧合)以外,沒有任何聯系。

DataInputStream & DataOutputStream

這兩個類允許我們以基本類型的形式操作字節流,可以理解為一個基本數據類型到字節之間的映射轉換。

例如我想要從輸入流中讀取一個int類型數據(4字節),那麽就需要先讀4個字節,然後按照每個字節在int中的位置作相應移位處理,就得到這4個字節所代表的int型數據了。

PushbackInputStream

這個類的功能其實比較隱晦(至少我一開始是理解錯了)。按照字面意思理解,它應該是為了在讀取一定量字節之後,允許我們調用unread方法,重讀這部分字節。實現上,它的內部也有一個字節數組作緩沖,恩,看起來一切正常。

可是測試它的unread(byte[])方法的時候發現被坑了,調用unread(byte[])之後再讀取,讀到的其實是你push進去的這個字節數組。它其實沒有想象的那麽聰明,在調用unread(byte[])方法的時候,只是很萌的把你傳給它的這個字節數組當成你之前讀取的數據,把它直接復制到內部緩沖區裏。

也就是說,完全是 push what, get what…

其他

ObjectInputStream(ObjectOutputStream)與Serializable接口等一起構成了Java的序列化機制,其中牽涉到對象數據的描述、保存與恢復,在此暫不討論。

java IO 包源碼解析