1. 程式人生 > >thinking in java (二十四) ----- IO之BufferedInputStream

thinking in java (二十四) ----- IO之BufferedInputStream

BufferedInputStream介紹

BufferedInputStream是緩衝輸入流,作用是為另外一個輸入流新增緩衝功能,以及mark reset功能。

本質上,緩衝功能是通過一個內部緩衝區陣列實現的,例如在新建某輸入流對應的BufferedInputStream後,當我們通過read方法讀取輸入流的資料時,BufferedInputStream會將輸入流的資料分批地填入到緩衝區中,每當緩衝區中的資料被讀完以後,輸入流會再次填充資料緩衝區,如此反覆,直到我們讀取完畢輸出流資料

原始碼分析

package java.io;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

public class BufferedInputStream extends FilterInputStream {

    // 預設的緩衝大小是8192位元組
    // BufferedInputStream 會根據“緩衝區大小”來逐次的填充緩衝區;
    // 即,BufferedInputStream填充緩衝區,使用者讀取緩衝區,讀完之後,BufferedInputStream會再次填充緩衝區。如此迴圈,直到讀完資料...
    private static int defaultBufferSize = 8192;

    // 緩衝陣列
    protected volatile byte buf[];

    // 快取陣列的原子更新器。
    // 該成員變數與buf陣列的volatile關鍵字共同組成了buf陣列的原子更新功能實現,
    // 即,在多執行緒中操作BufferedInputStream物件時,buf和bufUpdater都具有原子性(不同的執行緒訪問到的資料都是相同的)
    private static final
        AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
        AtomicReferenceFieldUpdater.newUpdater
        (BufferedInputStream.class,  byte[].class, "buf");

    // 當前緩衝區的有效位元組數。
    // 注意,這裡是指緩衝區的有效位元組數,而不是輸入流中的有效位元組數。
    protected int count;

    // 當前緩衝區的位置索引
    // 注意,這裡是指緩衝區的位置索引,而不是輸入流中的位置索引。
    protected int pos;

    // 當前緩衝區的標記位置
    // markpos和reset()配合使用才有意義。操作步驟:
    // (01) 通過mark() 函式,儲存pos的值到markpos中。
    // (02) 通過reset() 函式,會將pos的值重置為markpos。接著通過read()讀取資料時,就會從mark()儲存的位置開始讀取。
    protected int markpos = -1;

    // marklimit是標記的最大值。
    // 關於marklimit的原理,我們在後面的fill()函式分析中會詳細說明。這對理解BufferedInputStream相當重要。
    protected int marklimit;

    // 獲取輸入流
    private InputStream getInIfOpen() throws IOException {
        InputStream input = in;
        if (input == null)
            throw new IOException("Stream closed");
        return input;
    }

    // 獲取緩衝
    private byte[] getBufIfOpen() throws IOException {
        byte[] buffer = buf;
        if (buffer == null)
            throw new IOException("Stream closed");
        return buffer;
    }

    // 建構函式:新建一個緩衝區大小為8192的BufferedInputStream
    public BufferedInputStream(InputStream in) {
        this(in, defaultBufferSize);
    }

    // 建構函式:新建指定緩衝區大小的BufferedInputStream
    public BufferedInputStream(InputStream in, int size) {
        super(in);
        if (size <= 0) {
            throw new IllegalArgumentException("Buffer size <= 0");
        }
        buf = new byte[size];
    }

    // 從“輸入流”中讀取資料,並填充到緩衝區中。
    // 後面會對該函式進行詳細說明!
    private void fill() throws IOException {
        byte[] buffer = getBufIfOpen();
        if (markpos < 0)
            pos = 0;            /* no mark: throw away the buffer */
        else if (pos >= buffer.length)  /* no room left in buffer */
            if (markpos > 0) {  /* can throw away early part of the buffer */
                int sz = pos - markpos;
                System.arraycopy(buffer, markpos, buffer, 0, sz);
                pos = sz;
                markpos = 0;
            } else if (buffer.length >= marklimit) {
                markpos = -1;   /* buffer got too big, invalidate mark */
                pos = 0;        /* drop buffer contents */
            } else {            /* grow buffer */
                int nsz = pos * 2;
                if (nsz > marklimit)
                    nsz = marklimit;
                byte nbuf[] = new byte[nsz];
                System.arraycopy(buffer, 0, nbuf, 0, pos);
                if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
                    throw new IOException("Stream closed");
                }
                buffer = nbuf;
            }
        count = pos;
        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
        if (n > 0)
            count = n + pos;
    }

    // 讀取下一個位元組
    public synchronized int read() throws IOException {
        // 若已經讀完緩衝區中的資料,則呼叫fill()從輸入流讀取下一部分資料來填充緩衝區
        if (pos >= count) {
            fill();
            if (pos >= count)
                return -1;
        }
        // 從緩衝區中讀取指定的位元組
        return getBufIfOpen()[pos++] & 0xff;
    }

    // 將緩衝區中的資料寫入到位元組陣列b中。off是位元組陣列b的起始位置,len是寫入長度
    private int read1(byte[] b, int off, int len) throws IOException {
        int avail = count - pos;
        if (avail <= 0) {
            // 加速機制。
            // 如果讀取的長度大於緩衝區的長度 並且沒有markpos,
            // 則直接從原始輸入流中進行讀取,從而避免無謂的COPY(從原始輸入流至緩衝區,讀取緩衝區全部資料,清空緩衝區, 
            //  重新填入原始輸入流資料)
            if (len >= getBufIfOpen().length && markpos < 0) {
                return getInIfOpen().read(b, off, len);
            }
            // 若已經讀完緩衝區中的資料,則呼叫fill()從輸入流讀取下一部分資料來填充緩衝區
            fill();
            avail = count - pos;
            if (avail <= 0) return -1;
        }
        int cnt = (avail < len) ? avail : len;
        System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
        pos += cnt;
        return cnt;
    }

    // 將緩衝區中的資料寫入到位元組陣列b中。off是位元組陣列b的起始位置,len是寫入長度
    public synchronized int read(byte b[], int off, int len)
        throws IOException
    {
        getBufIfOpen(); // Check for closed stream
        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        // 讀取到指定長度的資料才返回
        int n = 0;
        for (;;) {
            int nread = read1(b, off + n, len - n);
            if (nread <= 0)
                return (n == 0) ? nread : n;
            n += nread;
            if (n >= len)
                return n;
            // if not closed but no bytes available, return
            InputStream input = in;
            if (input != null && input.available() <= 0)
                return n;
        }
    }

    // 忽略n個位元組
    public synchronized long skip(long n) throws IOException {
        getBufIfOpen(); // Check for closed stream
        if (n <= 0) {
            return 0;
        }
        long avail = count - pos;

        if (avail <= 0) {
            // If no mark position set then don't keep in buffer
            if (markpos <0)
                return getInIfOpen().skip(n);

            // Fill in buffer to save bytes for reset
            fill();
            avail = count - pos;
            if (avail <= 0)
                return 0;
        }

        long skipped = (avail < n) ? avail : n;
        pos += skipped;
        return skipped;
    }

    // 下一個位元組是否存可讀
    public synchronized int available() throws IOException {
        int n = count - pos;
        int avail = getInIfOpen().available();
        return n > (Integer.MAX_VALUE - avail)
                    ? Integer.MAX_VALUE
                    : n + avail;
    }

    // 標記“緩衝區”中當前位置。
    // readlimit是marklimit,關於marklimit的作用,參考後面的說明。
    public synchronized void mark(int readlimit) {
        marklimit = readlimit;
        markpos = pos;
    }

    // 將“緩衝區”中當前位置重置到mark()所標記的位置
    public synchronized void reset() throws IOException {
        getBufIfOpen(); // Cause exception if closed
        if (markpos < 0)
            throw new IOException("Resetting to invalid mark");
        pos = markpos;
    }

    public boolean markSupported() {
        return true;
    }

    // 關閉輸入流
    public void close() throws IOException {
        byte[] buffer;
        while ( (buffer = buf) != null) {
            if (bufUpdater.compareAndSet(this, buffer, null)) {
                InputStream input = in;
                in = null;
                if (input != null)
                    input.close();
                return;
            }
            // Else retry in case a new buf was CASed in fill()
        }
    }
}

要想讀懂BufferedInputStream,就首先要理解其思想,BufferedInputStream的作用是為其他輸入流提供緩衝功能,建立BufferedInputStream時候,我們建構函式裡面會有一個輸入流作為引數,BufferedInputStream會將輸入資料分批次讀取,每次讀取一部分到快取中,操作完這部分緩衝資料以後,再從輸入流中讀取下一部分資料到快取中

為什麼需要緩衝呢,因為快取中的資料實際上是儲存在記憶體中,而原始資料可能儲存在硬碟等儲存介質中,而我們知道從記憶體中讀取資料的速度是從硬碟中讀取資料速度的10倍以上。

那為什麼不一次性吧資料全部讀取到記憶體中呢?因為資料可能很大,讀取的時間會很長,還有就是記憶體價格昂貴

下面我們對BufferedInputStream中最重要的方法fill()進行說明:fill()原始碼如下

 private void fill() throws IOException {
        byte[] buffer = getBufIfOpen();
        if (markpos < 0)
            pos = 0;            /* no mark: throw away the buffer */
        else if (pos >= buffer.length)  /* no room left in buffer */
            if (markpos > 0) {  /* can throw away early part of the buffer */
                int sz = pos - markpos;
                System.arraycopy(buffer, markpos, buffer, 0, sz);
                pos = sz;
                markpos = 0;
            } else if (buffer.length >= marklimit) {
                markpos = -1;   /* buffer got too big, invalidate mark */
                pos = 0;        /* drop buffer contents */
            } else if (buffer.length >= MAX_BUFFER_SIZE) {
                throw new OutOfMemoryError("Required array size too large");
            } else {            /* grow buffer */
                int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
                        pos * 2 : MAX_BUFFER_SIZE;
                if (nsz > marklimit)
                    nsz = marklimit;
                byte nbuf[] = new byte[nsz];
                System.arraycopy(buffer, 0, nbuf, 0, pos);
                if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
                    // Can't replace buf if there was an async close.
                    // Note: This would need to be changed if fill()
                    // is ever made accessible to multiple threads.
                    // But for now, the only way CAS can fail is via close.
                    // assert buf == null;
                    throw new IOException("Stream closed");
                }
                buffer = nbuf;
            }
        count = pos;
        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
        if (n > 0)
            count = n + pos;
    }

根據原始碼中的if條件,我們可以將fill的情況分為五種

  • 情況1:讀取完buffer中的資料,並且buffer沒有被標記

執行流程如下

(1)read()函式中呼叫fill()

 (2)fill()中的if(markpos<0)...

為了方便分析,我們將這種情況下的fill等價於以下程式碼

private void fill() throws IOException {
    byte[] buffer = getBufIfOpen();
    if (markpos < 0)
        pos = 0;

    count = pos;
    int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    if (n > 0)
        count = n + pos;
}

說明:這種情形發生的情況是-----輸入流中有很長的資料,我們每次從中取出一部分到buffer中,每次我們讀取完buffer中的資料之後,並且此時輸入流沒有被標記,那麼就接著從輸入流中讀取下一部分的資料到buffer中。

其中判斷是否讀完是通過if(pos>=count)判斷的,判斷輸入流有沒有被標記是通過if(pos<0)判斷

然後我們在捋一捋fill的程式碼

1,if(markpos  < 0),它的作用是判斷“輸入流是否被標記”,如果被標記了markpoos>=0,沒否則markpos等於-1

2,在這種情況下,通過getInIfOpen()獲取輸入流,接著從輸入流中讀取buffer.length個位元組到buffer中

3,count=n+pos;這是根據從輸入流中讀取實際資料的多少,來更新buffer中資料的實際大小

  • 情況2,讀取完buffer中的資料,buffer的標記位置>0,並且buffer中沒有多餘的空間

執行流程如下,

1,read()函式中呼叫fill()

2,fill()中的else if(pos >=buffer.length)...

3,fill()中的if(markpos > 0)...

這種情況下的fill等價於下面的程式碼

private void fill() throws IOException {
    byte[] buffer = getBufIfOpen();
    if (markpos >= 0 && pos >= buffer.length) {
        if (markpos > 0) {
            int sz = pos - markpos;
            System.arraycopy(buffer, markpos, buffer, 0, sz);
            pos = sz;
            markpos = 0;
        }
    }

    count = pos;
    int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    if (n > 0)
        count = n + pos;
}

說明:這種情況發生的情況是-----輸入流中還有很長的資料,我們每次從中讀取一部分到buffer中進行操作,當我們讀取完buffer中的資料之後,並且此時輸入流存在標記時,那麼就發生情況2,此時我們要保留“被標記位置”到buffer末尾的資料,然後再從輸入流讀取下一部分資料到buffer中。

其中判斷是否讀完buffer中的資料,通過if(pos >= count)判斷,

判斷輸入流是否被標記,通過if(markpos<0)來判斷

判斷buffer中是否有多餘的空間,通過if(pos >=buffer.length)來判斷

理解這個思想以後,我們再分析程式碼

1, int sz = pos - markpos; 作用是“獲取‘被標記位置’到‘buffer末尾’”的資料長度。

2,System.arraycopy(buffer, markpos, buffer, 0, sz); 作用是“將buffer中從markpos開始的資料”拷貝到buffer中(從位置0開始填充,填充長度是sz)。接著,將sz賦值給pos,即pos就是“被標記位置”到“buffer末尾”的資料長度。、

3, int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 從輸入流中讀取出“buffer.length - pos”的資料,然後填充到buffer中。

4,通過第(02)和(03)步組合起來的buffer,就是包含了“原始buffer被標記位置到buffer末尾”的資料,也包含了“從輸入流中新讀取的資料”

注意:執行過2以後嗎,markpos的值由大於0變成了等於0

  • 情況3,讀取完buffer中的資料,buffer被標記位置=0.buffer中沒有多餘的空間,並且buffer.length>=marklimit。執行流程如下

執行流程如下,
(01) read() 函式中呼叫 fill()
(02) fill() 中的 else if (pos >= buffer.length) ...
(03) fill() 中的 else if (buffer.length >= marklimit) ...

為了方便分析,我們將這種情況下fill()執行的操作等價於以下程式碼:

private void fill() throws IOException {
    byte[] buffer = getBufIfOpen();
    if (markpos >= 0 && pos >= buffer.length) {
        if ( (markpos <= 0) && (buffer.length >= marklimit) ) {
            markpos = -1;   /* buffer got too big, invalidate mark */
            pos = 0;        /* drop buffer contents */
        }
    }

    count = pos;
    int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    if (n > 0)
        count = n + pos;
}

說明:這種情況處理非常簡單,首先就是“取消標記”,即markpos = -1,然後設定初始化位置為0,即pos=0,最後再從輸入流中讀取下一部分資料到buffer中

  • 情況4,讀取完buffer中的資料,buffer被標記位置=0,buffer中沒有多餘的空間,並且buffer.length<marklimit

執行流程如下,
(01) read() 函式中呼叫 fill()
(02) fill() 中的 else if (pos >= buffer.length) ...
(03) fill() 中的 else { int nsz = pos * 2; ... }

為了方便分析,我們將這種情況下fill()執行的操作等價於以下程式碼:

private void fill() throws IOException {
    byte[] buffer = getBufIfOpen();
    if (markpos >= 0 && pos >= buffer.length) {
        if ( (markpos <= 0) && (buffer.length < marklimit) ) {
            int nsz = pos * 2;
            if (nsz > marklimit)
                nsz = marklimit;
            byte nbuf[] = new byte[nsz];
            System.arraycopy(buffer, 0, nbuf, 0, pos);
            if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
                throw new IOException("Stream closed");
            }
            buffer = nbuf;
        }
    }

    count = pos;
    int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
    if (n > 0)
        count = n + pos;
}

說明:

這種情況的處理非常簡單。
(01) 新建一個位元組陣列nbuf。nbuf的大小是“pos*2”和“marklimit”中較小的那個數。

int nsz = pos * 2;
if (nsz > marklimit)
    nsz = marklimit;
byte nbuf[] = new byte[nsz];

(02) 接著,將buffer中的資料拷貝到新陣列nbuf中。通過System.arraycopy(buffer, 0, nbuf, 0, pos)
(03) 最後,從輸入流讀取部分新資料到buffer中。通過getInIfOpen().read(buffer, pos, buffer.length - pos);
注意:在這裡,我們思考一個問題,“為什麼需要marklimit,它的存在到底有什麼意義?”我們結合“情況2”、“情況3”、“情況4”的情況來分析。

假設,marklimit是無限大的,而且我們設定了markpos。當我們從輸入流中每讀完一部分資料並讀取下一部分資料時,都需要儲存markpos所標記的資料;這就意味著,我們需要不斷執行情況4中的操作,要將buffer的容量擴大……隨著讀取次數的增多,buffer會越來越大;這會導致我們佔據的記憶體越來越大。所以,我們需要給出一個marklimit;當buffer>=marklimit時,就不再儲存markpos的值了

  • 情況5:除了上面四種情況外的情況
  • 執行流程如下,
    (01) read() 函式中呼叫 fill()
    (02) fill() 中的 count = pos...

    為了方便分析,我們將這種情況下fill()執行的操作等價於以下程式碼:

    private void fill() throws IOException {
        byte[] buffer = getBufIfOpen();
    
        count = pos;
        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
        if (n > 0)
            count = n + pos;
    }

    說明:這種情況的處理非常簡單。直接從輸入流讀取部分新資料到buffer中。

示例程式碼

關於BufferedInputStream中API的詳細用法,參考示例程式碼(BufferedInputStreamTest.java):

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.lang.SecurityException;

/**
 * BufferedInputStream 測試程式
 *
 * @author skywang
 */
public class BufferedInputStreamTest {

    private static final int LEN = 5;

    public static void main(String[] args) {
        testBufferedInputStream() ;
    }

    /**
     * BufferedInputStream的API測試函式
     */
    private static void testBufferedInputStream() {

        // 建立BufferedInputStream位元組流,內容是ArrayLetters陣列
        try {
            File file = new File("bufferedinputstream.txt");
            InputStream in =
                  new BufferedInputStream(
                      new FileInputStream(file), 512);

            // 從位元組流中讀取5個位元組。“abcde”,a對應0x61,b對應0x62,依次類推...
            for (int i=0; i<LEN; i++) {
                // 若能繼續讀取下一個位元組,則讀取下一個位元組
                if (in.available() >= 0) {
                    // 讀取“位元組流的下一個位元組”
                    int tmp = in.read();
                    System.out.printf("%d : 0x%s\n", i, Integer.toHexString(tmp));
                }
            }

            // 若“該位元組流”不支援標記功能,則直接退出
            if (!in.markSupported()) {
                System.out.println("make not supported!");
                return ;
            }
              
            // 標記“當前索引位置”,即標記第6個位置的元素--“f”
            // 1024對應marklimit
            in.mark(1024);

            // 跳過22個位元組。
            in.skip(22);

            // 讀取5個位元組
            byte[] buf = new byte[LEN];
            in.read(buf, 0, LEN);
            // 將buf轉換為String字串。
            String str1 = new String(buf);
            System.out.printf("str1=%s\n", str1);

            // 重置“輸入流的索引”為mark()所標記的位置,即重置到“f”處。
            in.reset();
            // 從“重置後的位元組流”中讀取5個位元組到buf中。即讀取“fghij”
            in.read(buf, 0, LEN);
            // 將buf轉換為String字串。
            String str2 = new String(buf);
            System.out.printf("str2=%s\n", str2);

            in.close();
       } catch (FileNotFoundException e) {
           e.printStackTrace();
       } catch (SecurityException e) {
           e.printStackTrace();
       } catch (IOException e) {
           e.printStackTrace();
       }
    }
}

程式中讀取的bufferedinputstream.txt的內容如下:

abcdefghijklmnopqrstuvwxyz
0123456789
ABCDEFGHIJKLMNOPQRSTUVWXYZ

執行結果

0 : 0x61
1 : 0x62
2 : 0x63
3 : 0x64
4 : 0x65
str1=01234
str2=fghij

原味:http://www.cnblogs.com/skywang12345/p/io_12.html