thinking in java (二十四) ----- IO之BufferedInputStream
BufferedInputStream介紹
BufferedInputStream是緩衝輸入流,作用是為另外一個輸入流新增緩衝功能,以及mark reset功能。
本質上,緩衝功能是通過一個內部緩衝區陣列實現的,例如在新建某輸入流對應的BufferedInputStream後,當我們通過read方法讀取輸入流的資料時,BufferedInputStream會將輸入流的資料分批地填入到緩衝區中,每當緩衝區中的資料被讀完以後,輸入流會再次填充資料緩衝區,如此反覆,直到我們讀取完畢輸出流資料。
原始碼分析
package java.io; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; public class BufferedInputStream extends FilterInputStream { // 預設的緩衝大小是8192位元組 // BufferedInputStream 會根據“緩衝區大小”來逐次的填充緩衝區; // 即,BufferedInputStream填充緩衝區,使用者讀取緩衝區,讀完之後,BufferedInputStream會再次填充緩衝區。如此迴圈,直到讀完資料... private static int defaultBufferSize = 8192; // 緩衝陣列 protected volatile byte buf[]; // 快取陣列的原子更新器。 // 該成員變數與buf陣列的volatile關鍵字共同組成了buf陣列的原子更新功能實現, // 即,在多執行緒中操作BufferedInputStream物件時,buf和bufUpdater都具有原子性(不同的執行緒訪問到的資料都是相同的) private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater = AtomicReferenceFieldUpdater.newUpdater (BufferedInputStream.class, byte[].class, "buf"); // 當前緩衝區的有效位元組數。 // 注意,這裡是指緩衝區的有效位元組數,而不是輸入流中的有效位元組數。 protected int count; // 當前緩衝區的位置索引 // 注意,這裡是指緩衝區的位置索引,而不是輸入流中的位置索引。 protected int pos; // 當前緩衝區的標記位置 // markpos和reset()配合使用才有意義。操作步驟: // (01) 通過mark() 函式,儲存pos的值到markpos中。 // (02) 通過reset() 函式,會將pos的值重置為markpos。接著通過read()讀取資料時,就會從mark()儲存的位置開始讀取。 protected int markpos = -1; // marklimit是標記的最大值。 // 關於marklimit的原理,我們在後面的fill()函式分析中會詳細說明。這對理解BufferedInputStream相當重要。 protected int marklimit; // 獲取輸入流 private InputStream getInIfOpen() throws IOException { InputStream input = in; if (input == null) throw new IOException("Stream closed"); return input; } // 獲取緩衝 private byte[] getBufIfOpen() throws IOException { byte[] buffer = buf; if (buffer == null) throw new IOException("Stream closed"); return buffer; } // 建構函式:新建一個緩衝區大小為8192的BufferedInputStream public BufferedInputStream(InputStream in) { this(in, defaultBufferSize); } // 建構函式:新建指定緩衝區大小的BufferedInputStream public BufferedInputStream(InputStream in, int size) { super(in); if (size <= 0) { throw new IllegalArgumentException("Buffer size <= 0"); } buf = new byte[size]; } // 從“輸入流”中讀取資料,並填充到緩衝區中。 // 後面會對該函式進行詳細說明! private void fill() throws IOException { byte[] buffer = getBufIfOpen(); if (markpos < 0) pos = 0; /* no mark: throw away the buffer */ else if (pos >= buffer.length) /* no room left in buffer */ if (markpos > 0) { /* can throw away early part of the buffer */ int sz = pos - markpos; System.arraycopy(buffer, markpos, buffer, 0, sz); pos = sz; markpos = 0; } else if (buffer.length >= marklimit) { markpos = -1; /* buffer got too big, invalidate mark */ pos = 0; /* drop buffer contents */ } else { /* grow buffer */ int nsz = pos * 2; if (nsz > marklimit) nsz = marklimit; byte nbuf[] = new byte[nsz]; System.arraycopy(buffer, 0, nbuf, 0, pos); if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { throw new IOException("Stream closed"); } buffer = nbuf; } count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; } // 讀取下一個位元組 public synchronized int read() throws IOException { // 若已經讀完緩衝區中的資料,則呼叫fill()從輸入流讀取下一部分資料來填充緩衝區 if (pos >= count) { fill(); if (pos >= count) return -1; } // 從緩衝區中讀取指定的位元組 return getBufIfOpen()[pos++] & 0xff; } // 將緩衝區中的資料寫入到位元組陣列b中。off是位元組陣列b的起始位置,len是寫入長度 private int read1(byte[] b, int off, int len) throws IOException { int avail = count - pos; if (avail <= 0) { // 加速機制。 // 如果讀取的長度大於緩衝區的長度 並且沒有markpos, // 則直接從原始輸入流中進行讀取,從而避免無謂的COPY(從原始輸入流至緩衝區,讀取緩衝區全部資料,清空緩衝區, // 重新填入原始輸入流資料) if (len >= getBufIfOpen().length && markpos < 0) { return getInIfOpen().read(b, off, len); } // 若已經讀完緩衝區中的資料,則呼叫fill()從輸入流讀取下一部分資料來填充緩衝區 fill(); avail = count - pos; if (avail <= 0) return -1; } int cnt = (avail < len) ? avail : len; System.arraycopy(getBufIfOpen(), pos, b, off, cnt); pos += cnt; return cnt; } // 將緩衝區中的資料寫入到位元組陣列b中。off是位元組陣列b的起始位置,len是寫入長度 public synchronized int read(byte b[], int off, int len) throws IOException { getBufIfOpen(); // Check for closed stream if ((off | len | (off + len) | (b.length - (off + len))) < 0) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } // 讀取到指定長度的資料才返回 int n = 0; for (;;) { int nread = read1(b, off + n, len - n); if (nread <= 0) return (n == 0) ? nread : n; n += nread; if (n >= len) return n; // if not closed but no bytes available, return InputStream input = in; if (input != null && input.available() <= 0) return n; } } // 忽略n個位元組 public synchronized long skip(long n) throws IOException { getBufIfOpen(); // Check for closed stream if (n <= 0) { return 0; } long avail = count - pos; if (avail <= 0) { // If no mark position set then don't keep in buffer if (markpos <0) return getInIfOpen().skip(n); // Fill in buffer to save bytes for reset fill(); avail = count - pos; if (avail <= 0) return 0; } long skipped = (avail < n) ? avail : n; pos += skipped; return skipped; } // 下一個位元組是否存可讀 public synchronized int available() throws IOException { int n = count - pos; int avail = getInIfOpen().available(); return n > (Integer.MAX_VALUE - avail) ? Integer.MAX_VALUE : n + avail; } // 標記“緩衝區”中當前位置。 // readlimit是marklimit,關於marklimit的作用,參考後面的說明。 public synchronized void mark(int readlimit) { marklimit = readlimit; markpos = pos; } // 將“緩衝區”中當前位置重置到mark()所標記的位置 public synchronized void reset() throws IOException { getBufIfOpen(); // Cause exception if closed if (markpos < 0) throw new IOException("Resetting to invalid mark"); pos = markpos; } public boolean markSupported() { return true; } // 關閉輸入流 public void close() throws IOException { byte[] buffer; while ( (buffer = buf) != null) { if (bufUpdater.compareAndSet(this, buffer, null)) { InputStream input = in; in = null; if (input != null) input.close(); return; } // Else retry in case a new buf was CASed in fill() } } }
要想讀懂BufferedInputStream,就首先要理解其思想,BufferedInputStream的作用是為其他輸入流提供緩衝功能,建立BufferedInputStream時候,我們建構函式裡面會有一個輸入流作為引數,BufferedInputStream會將輸入資料分批次讀取,每次讀取一部分到快取中,操作完這部分緩衝資料以後,再從輸入流中讀取下一部分資料到快取中。
為什麼需要緩衝呢,因為快取中的資料實際上是儲存在記憶體中,而原始資料可能儲存在硬碟等儲存介質中,而我們知道從記憶體中讀取資料的速度是從硬碟中讀取資料速度的10倍以上。
那為什麼不一次性吧資料全部讀取到記憶體中呢?因為資料可能很大,讀取的時間會很長,還有就是記憶體價格昂貴
下面我們對BufferedInputStream中最重要的方法fill()進行說明:fill()原始碼如下
private void fill() throws IOException { byte[] buffer = getBufIfOpen(); if (markpos < 0) pos = 0; /* no mark: throw away the buffer */ else if (pos >= buffer.length) /* no room left in buffer */ if (markpos > 0) { /* can throw away early part of the buffer */ int sz = pos - markpos; System.arraycopy(buffer, markpos, buffer, 0, sz); pos = sz; markpos = 0; } else if (buffer.length >= marklimit) { markpos = -1; /* buffer got too big, invalidate mark */ pos = 0; /* drop buffer contents */ } else if (buffer.length >= MAX_BUFFER_SIZE) { throw new OutOfMemoryError("Required array size too large"); } else { /* grow buffer */ int nsz = (pos <= MAX_BUFFER_SIZE - pos) ? pos * 2 : MAX_BUFFER_SIZE; if (nsz > marklimit) nsz = marklimit; byte nbuf[] = new byte[nsz]; System.arraycopy(buffer, 0, nbuf, 0, pos); if (!bufUpdater.compareAndSet(this, buffer, nbuf)) { // Can't replace buf if there was an async close. // Note: This would need to be changed if fill() // is ever made accessible to multiple threads. // But for now, the only way CAS can fail is via close. // assert buf == null; throw new IOException("Stream closed"); } buffer = nbuf; } count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; }
根據原始碼中的if條件,我們可以將fill的情況分為五種,
- 情況1:讀取完buffer中的資料,並且buffer沒有被標記
執行流程如下
(1)read()函式中呼叫fill()
(2)fill()中的if(markpos<0)...
為了方便分析,我們將這種情況下的fill等價於以下程式碼
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0;
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
說明:這種情形發生的情況是-----輸入流中有很長的資料,我們每次從中取出一部分到buffer中,每次我們讀取完buffer中的資料之後,並且此時輸入流沒有被標記,那麼就接著從輸入流中讀取下一部分的資料到buffer中。
其中判斷是否讀完是通過if(pos>=count)判斷的,判斷輸入流有沒有被標記是通過if(pos<0)判斷
然後我們在捋一捋fill的程式碼
1,if(markpos < 0),它的作用是判斷“輸入流是否被標記”,如果被標記了markpoos>=0,沒否則markpos等於-1
2,在這種情況下,通過getInIfOpen()獲取輸入流,接著從輸入流中讀取buffer.length個位元組到buffer中
3,count=n+pos;這是根據從輸入流中讀取實際資料的多少,來更新buffer中資料的實際大小
- 情況2,讀取完buffer中的資料,buffer的標記位置>0,並且buffer中沒有多餘的空間
執行流程如下,
1,read()函式中呼叫fill()
2,fill()中的else if(pos >=buffer.length)...
3,fill()中的if(markpos > 0)...
這種情況下的fill等價於下面的程式碼
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos >= 0 && pos >= buffer.length) {
if (markpos > 0) {
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
}
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
說明:這種情況發生的情況是-----輸入流中還有很長的資料,我們每次從中讀取一部分到buffer中進行操作,當我們讀取完buffer中的資料之後,並且此時輸入流存在標記時,那麼就發生情況2,此時我們要保留“被標記位置”到buffer末尾的資料,然後再從輸入流讀取下一部分資料到buffer中。
其中判斷是否讀完buffer中的資料,通過if(pos >= count)判斷,
判斷輸入流是否被標記,通過if(markpos<0)來判斷
判斷buffer中是否有多餘的空間,通過if(pos >=buffer.length)來判斷
理解這個思想以後,我們再分析程式碼
1, int sz = pos - markpos; 作用是“獲取‘被標記位置’到‘buffer末尾’”的資料長度。
2,System.arraycopy(buffer, markpos, buffer, 0, sz); 作用是“將buffer中從markpos開始的資料”拷貝到buffer中(從位置0開始填充,填充長度是sz)。接著,將sz賦值給pos,即pos就是“被標記位置”到“buffer末尾”的資料長度。、
3, int n = getInIfOpen().read(buffer, pos, buffer.length - pos); 從輸入流中讀取出“buffer.length - pos”的資料,然後填充到buffer中。
4,通過第(02)和(03)步組合起來的buffer,就是包含了“原始buffer被標記位置到buffer末尾”的資料,也包含了“從輸入流中新讀取的資料”。
注意:執行過2以後嗎,markpos的值由大於0變成了等於0
- 情況3,讀取完buffer中的資料,buffer被標記位置=0.buffer中沒有多餘的空間,並且buffer.length>=marklimit。執行流程如下
執行流程如下,
(01) read() 函式中呼叫 fill()
(02) fill() 中的 else if (pos >= buffer.length) ...
(03) fill() 中的 else if (buffer.length >= marklimit) ...
為了方便分析,我們將這種情況下fill()執行的操作等價於以下程式碼:
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos >= 0 && pos >= buffer.length) {
if ( (markpos <= 0) && (buffer.length >= marklimit) ) {
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
}
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
說明:這種情況處理非常簡單,首先就是“取消標記”,即markpos = -1,然後設定初始化位置為0,即pos=0,最後再從輸入流中讀取下一部分資料到buffer中
- 情況4,讀取完buffer中的資料,buffer被標記位置=0,buffer中沒有多餘的空間,並且buffer.length<marklimit
執行流程如下,
(01) read() 函式中呼叫 fill()
(02) fill() 中的 else if (pos >= buffer.length) ...
(03) fill() 中的 else { int nsz = pos * 2; ... }
為了方便分析,我們將這種情況下fill()執行的操作等價於以下程式碼:
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos >= 0 && pos >= buffer.length) {
if ( (markpos <= 0) && (buffer.length < marklimit) ) {
int nsz = pos * 2;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
throw new IOException("Stream closed");
}
buffer = nbuf;
}
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
說明:
這種情況的處理非常簡單。
(01) 新建一個位元組陣列nbuf。nbuf的大小是“pos*2”和“marklimit”中較小的那個數。
int nsz = pos * 2;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
(02) 接著,將buffer中的資料拷貝到新陣列nbuf中。通過System.arraycopy(buffer, 0, nbuf, 0, pos)
(03) 最後,從輸入流讀取部分新資料到buffer中。通過getInIfOpen().read(buffer, pos, buffer.length - pos);
注意:在這裡,我們思考一個問題,“為什麼需要marklimit,它的存在到底有什麼意義?”我們結合“情況2”、“情況3”、“情況4”的情況來分析。
假設,marklimit是無限大的,而且我們設定了markpos。當我們從輸入流中每讀完一部分資料並讀取下一部分資料時,都需要儲存markpos所標記的資料;這就意味著,我們需要不斷執行情況4中的操作,要將buffer的容量擴大……隨著讀取次數的增多,buffer會越來越大;這會導致我們佔據的記憶體越來越大。所以,我們需要給出一個marklimit;當buffer>=marklimit時,就不再儲存markpos的值了
- 情況5:除了上面四種情況外的情況
-
執行流程如下,
(01) read() 函式中呼叫 fill()
(02) fill() 中的 count = pos...為了方便分析,我們將這種情況下fill()執行的操作等價於以下程式碼:
private void fill() throws IOException { byte[] buffer = getBufIfOpen(); count = pos; int n = getInIfOpen().read(buffer, pos, buffer.length - pos); if (n > 0) count = n + pos; }
說明:這種情況的處理非常簡單。直接從輸入流讀取部分新資料到buffer中。
示例程式碼
關於BufferedInputStream中API的詳細用法,參考示例程式碼(BufferedInputStreamTest.java):
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.lang.SecurityException;
/**
* BufferedInputStream 測試程式
*
* @author skywang
*/
public class BufferedInputStreamTest {
private static final int LEN = 5;
public static void main(String[] args) {
testBufferedInputStream() ;
}
/**
* BufferedInputStream的API測試函式
*/
private static void testBufferedInputStream() {
// 建立BufferedInputStream位元組流,內容是ArrayLetters陣列
try {
File file = new File("bufferedinputstream.txt");
InputStream in =
new BufferedInputStream(
new FileInputStream(file), 512);
// 從位元組流中讀取5個位元組。“abcde”,a對應0x61,b對應0x62,依次類推...
for (int i=0; i<LEN; i++) {
// 若能繼續讀取下一個位元組,則讀取下一個位元組
if (in.available() >= 0) {
// 讀取“位元組流的下一個位元組”
int tmp = in.read();
System.out.printf("%d : 0x%s\n", i, Integer.toHexString(tmp));
}
}
// 若“該位元組流”不支援標記功能,則直接退出
if (!in.markSupported()) {
System.out.println("make not supported!");
return ;
}
// 標記“當前索引位置”,即標記第6個位置的元素--“f”
// 1024對應marklimit
in.mark(1024);
// 跳過22個位元組。
in.skip(22);
// 讀取5個位元組
byte[] buf = new byte[LEN];
in.read(buf, 0, LEN);
// 將buf轉換為String字串。
String str1 = new String(buf);
System.out.printf("str1=%s\n", str1);
// 重置“輸入流的索引”為mark()所標記的位置,即重置到“f”處。
in.reset();
// 從“重置後的位元組流”中讀取5個位元組到buf中。即讀取“fghij”
in.read(buf, 0, LEN);
// 將buf轉換為String字串。
String str2 = new String(buf);
System.out.printf("str2=%s\n", str2);
in.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (SecurityException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
程式中讀取的bufferedinputstream.txt的內容如下:
abcdefghijklmnopqrstuvwxyz
0123456789
ABCDEFGHIJKLMNOPQRSTUVWXYZ
執行結果:
0 : 0x61
1 : 0x62
2 : 0x63
3 : 0x64
4 : 0x65
str1=01234
str2=fghij
原味:http://www.cnblogs.com/skywang12345/p/io_12.html