Java IO完全總結(轉載) --- 重點在原始碼分析
從InputStream到ByteArrayInputStream
江蘇無錫繆小東
本篇主要分析:1.如何將byte陣列適配至ByteArrayInputStream,對應與IO部分的介面卡模式;2.BufferedInputStream的工作原理,對應於IO的裝飾器模式,會首先研究InputStream和FilterInputStream的原始碼,同時會將要談談軟體設計中的快取相關的知識。後面專門一章分析PipedInputStream和PipedOutStream,簡單談談管道相關的知識,以及軟體架構的想法。
1 InputStream
InputStream 是輸入位元組流部分,裝飾器模式的頂層類。主要規定了輸入位元組流的公共方法。
package java.io; public abstract class InputStream implements Closeable { private static final int SKIP_BUFFER_SIZE = 2048; //用於skip方法,和skipBuffer相關 private static byte[] skipBuffer; // skipBuffer is initialized in skip(long), if needed. public abstract int read() throws IOException; //從輸入流中讀取下一個位元組, //正常返回0-255,到達檔案的末尾返回-1 //在流中還有資料,但是沒有讀到時該方法會阻塞(block) //Java IO和New IO的區別就是阻塞流和非阻塞流 //抽象方法哦!不同的子類不同的實現哦! //將流中的資料讀入放在byte陣列的第off個位置先後的len個位置中 //放回值為放入位元組的個數。 public int read(byte b[], int off, int len) throws IOException { if (b == null) { throw new NullPointerException(); } else if (off < 0 || len < 0 || len > b.length - off) { throw new IndexOutOfBoundsException(); } else if (len == 0) { return 0; } //檢查輸入是否正常。一般情況下,檢查輸入是方法設計的第一步 int c = read(); //讀取下一個位元組 if (c == -1) { return -1; } //到達檔案的末端返回-1 b[off] = (byte)c; //放回的位元組downcast int i = 1; //已經讀取了一個位元組 try { for (; i < len ; i++) { //最多讀取len個位元組,所以要迴圈len次 c = read(); //每次迴圈從流中讀取一個位元組 //由於read方法阻塞, //所以read(byte[],int,int)也會阻塞 if (c == -1) { break; } //到達末尾,理所當然放回-1 b[off + i] = (byte)c; //讀到就放入byte陣列中 } } catch (IOException ee) { } return i; //上面這個部分其實還有一點比較重要,int i = 1;在迴圈的外圍,或許你經常見到, //或許你只會在迴圈是才宣告,為什麼呢? //宣告在外面,增大了變數的生存週期(在迴圈外面),所以後面可以return返回 //極其一般的想法。在類成員變數生命週期中使用同樣的理念。 //在軟體設計中,類和類的關係中也是一樣的。 } //這個方法在利用抽象方法read,某種意義上簡單的Templete模式。 public int read(byte b[]) throws IOException { return read(b, 0, b.length); } //利用上面的方法read(byte[] b) public long skip(long n) throws IOException { long remaining = n; //方法內部使用的、表示要跳過的位元組數目, //使用它完成一系列位元組讀取的迴圈 int nr; if (skipBuffer == null) skipBuffer = new byte[SKIP_BUFFER_SIZE]; //初始化一個跳轉的快取 byte[] localSkipBuffer = skipBuffer; //本地化的跳轉快取 if (n <= 0) { return 0; } //檢查輸入引數,應該放在方法的開始 while (remaining > 0) { //一共要跳過n個,每次跳過部分,迴圈 nr = read(localSkipBuffer, 0, (int) Math.min(SKIP_BUFFER_SIZE, remaining)); //利用上面的read(byte[],int,int)方法儘量讀取n個位元組 if (nr < 0) { break; } //讀到流的末端,則返回 remaining -= nr; //沒有完全讀到需要的,則繼續迴圈 } return n - remaining; //返回時要麼全部讀完,要麼因為到達檔案末端,讀取了部分 } public int available() throws IOException { //查詢流中還有多少可以讀取的位元組 return 0; } //該方法不會block。在java中抽象類方法的實現一般有以下幾種方式: //1.丟擲異常(java.util);2.“弱”實現。象上面這種。子類在必要的時候覆蓋它。 //3.“空”實現。下面有例子。 public void close() throws IOException {} //關閉當前流、同時釋放與此流相關的資源 public synchronized void mark(int readlimit) {} //在當前位置對流進行標記,必要的時候可以使用reset方法返回。 //markSupport可以查詢當前流是否支援mark public synchronized void reset() throws IOException { throw new IOException("mark/reset not supported"); } //對mark過的流進行復位。只有當流支援mark時才可以使用此方法。 //看看mark、available和reset方法。體會為什麼?! public boolean markSupported() { //查詢是否支援mark return false; } //絕大部分不支援,因此提供預設實現,返回false。子類有需要可以覆蓋。 }
2 FilterInputStream
這是位元組輸入流部分裝飾器模式的核心。是我們在裝飾器模式中的Decorator物件,主要完成對其它流裝飾的基本功能。下面是它的原始碼:
package java.io; //該類對被裝飾的流進行基本的包裹。不增加額外的功能。 //客戶在需要的時候可以覆蓋相應的方法。具體覆蓋可以在ByteInputStream中看到! public class FilterInputStream extends InputStream { protected volatile InputStream in; //將要被裝飾的位元組輸入流 protected FilterInputStream(InputStream in) { //通過構造方法傳入此被裝飾的流 this.in = in; } //裝飾器的程式碼特徵:被裝飾的物件一般是裝飾器的成員變數 //上面幾行可以看出。 //下面這些方法,完成最小的裝飾――0裝飾,只是呼叫被裝飾流的方法而已 public int read() throws IOException { return in.read(); } public int read(byte b[]) throws IOException { return read(b, 0, b.length); } public int read(byte b[], int off, int len) throws IOException { return in.read(b, off, len); } public long skip(long n) throws IOException { return in.skip(n); } public int available() throws IOException { return in.available(); } public void close() throws IOException { in.close(); } public synchronized void mark(int readlimit) { in.mark(readlimit); } public synchronized void reset() throws IOException { in.reset(); } public boolean markSupported() { return in.markSupported(); } //以上的方法,都是通過呼叫被裝飾物件in完成的。沒有新增任何額外功能 //裝飾器模式中的Decorator物件,不增加被裝飾物件的功能。 //它是裝飾器模式中的核心。更多關於裝飾器模式的理論請閱讀部落格中的文章。 }
以上分析了所有位元組輸入流的公共父類InputStream和裝飾器類FilterInputStream類。他們是裝飾器模式中兩個重要的類。更多細節請閱讀部落格中裝飾器模式的文章。下面將講解一個具體的流ByteArrayInputStream,不過它是採用介面卡設計模式。
3 ByteArray到ByteArrayInputStream的適配
// ByteArrayInputStream內部有一個byte型別的buffer。
//很典型的介面卡模式的應用――將byte陣列適配流的介面。
//下面是原始碼分析:
package java.io;
public class ByteArrayInputStream extends InputStream {
protected byte buf[]; //內部的buffer,一般通過構造器輸入
protected int pos; //當前位置的cursor。從0至byte陣列的長度。
//byte[pos]就是read方法讀取的位元組
protected int mark = 0; //mark的位置。
protected int count; //流中位元組的數目。不一定與byte[]的長度一致???
public ByteArrayInputStream(byte buf[]) {//從一個byte[]建立一個ByteArrayInputStream
this.buf = buf; //初始化流中的各個成員變數
this.pos = 0;
this.count = buf.length; //count就等於buf.length
}
public ByteArrayInputStream(byte buf[], int offset, int length) { //構造器
this.buf = buf;
this.pos = offset; //與上面不同
this.count = Math.min(offset + length, buf.length);
this.mark = offset; //與上面不同
}
public synchronized int read() { //從流中讀取下一個位元組
return (pos < count) ? (buf[pos++] & 0xff) : -1; //返回下一個位置的位元組
//流中沒有資料則返回-1
}
//下面這個方法很有意思!從InputStream中可以看出其提供了該方法的實現。
//為什麼ByteArrayInputStream要覆蓋此方法呢?
//同樣的我們在Java Collections Framework中可以看到:
//AbstractCollection利用iterator實現了Collecion介面的很多方法。但是,
//在ArrayList中卻有很多被子類覆蓋了。為什麼如此呢??
public synchronized int read(byte b[], int off, int len) {
if (b == null) { //首先檢查輸入引數的狀態是否正確
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (pos >= count) { return -1; }
if (pos + len > count) { len = count - pos; }
if (len <= 0) { return 0; }
System.arraycopy(buf, pos, b, off, len); //java中提供資料複製的方法
pos += len;
return len;
}
//出於速度的原因!他們都用到System.arraycopy方法。想想為什麼?
//某些時候,父類不能完全實現子類的功能,父類的實現一般比較通用。
//當子類有更有效的方法時,我們會覆蓋這些方法。這樣可是不太OO的哦!
//下面這個方法,在InputStream中也已經實現了。
//但是當時是通過將位元組讀入一個buffer中實現的,好像效率低了一點。
//看看下面這段程式碼,是否極其簡單呢?!
public synchronized long skip(long n) {
if (pos + n > count) { n = count - pos; } //當前位置,可以跳躍的位元組數目
if (n < 0) { return 0; } //小於0,則不可以跳躍
pos += n; //跳躍後,當前位置變化
return n;
} //比InputStream中的方法簡單、高效吧!
public synchronized int available() {
return count - pos;
}
//查詢流中還有多少位元組沒有讀取。
//在我們的ByteArrayInputStream中就是當前位置以後位元組的數目。
public boolean markSupported() {
return true;
} //ByteArrayInputStream支援mark所以返回true
public void mark(int readAheadLimit) {
mark = pos;
}
//在流中當前位置mark。
//在我們的ByteArrayInputStream中就是將當前位置賦給mark變數。
//讀取流中的位元組就是讀取位元組陣列中當前位置向後的的位元組。
public synchronized void reset() {
pos = mark;
}
//重置流。即回到mark的位置。
public void close() throws IOException { }
//關閉ByteArrayInputStream不會產生任何動作。為什麼?仔細考慮吧!!
}
上面我們分3小節講了裝飾器模式中的公共父類(對應於輸入位元組流的InputStream)、Decorator(對應於輸入位元組流的FilterInputStream)和基本被裝飾物件(對應於輸入位元組流的媒體位元組流)。下面我們就要講述裝飾器模式中的具體的包裝器(對應於輸入位元組流的包裝器流)。
4 BufferedInputStream
4.1原理及其在軟體硬體中的應用
1.read――read(byte[] ,int , int)
2.BufferedInputStream
3.《由一個簡單的程式談起》
4. Cache
5.Pool
6.Spling Printer
(最近比較忙,不講了!)
4.2 BufferedInputStream原始碼分析
bufferedInputStream 的 fill() 函式 旨在擴容並且從in.read()中讀入新的內容,進行一次磁碟IO,(快取大小) 。
bufferinputStream 是一種特殊的機制,它一次向buf中寫入8192長度內容,這樣相當於一次磁碟IO就可以寫入足夠內容,不同想其他一樣多次進行磁碟IO浪費效率,畢竟read() 也是 阻塞的 極大地浪費效率。
所以在bufferinputstream中的read() 也會呼叫fill() 因為buf內容中 pos > count .read1()只是有多少讀多少,read()是貪婪讀,直至in中內容available為false;
package java.io;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
//該類主要完成對被包裝流,加上一個快取的功能
public class BufferedInputStream extends FilterInputStream {
private static int defaultBufferSize = 8192; //預設快取的大小
protected volatile byte buf[]; //內部的快取
protected int count; //buffer的大小
protected int pos; //buffer中cursor的位置
protected int markpos = -1; //mark的位置
protected int marklimit; //mark的範圍
//原子性更新。和一致性程式設計相關
private static final
AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
AtomicReferenceFieldUpdater.newUpdater (BufferedInputStream.class, byte[].class, "buf");
private InputStream getInIfOpen() throws IOException { //檢查輸入流是否關閉,同時返回被包裝流
InputStream input = in;
if (input == null) throw new IOException("Stream closed");
return input;
}
private byte[] getBufIfOpen() throws IOException { //檢查buffer的狀態,同時返回快取
byte[] buffer = buf;
if (buffer == null) throw new IOException("Stream closed"); //不太可能發生的狀態
return buffer;
}
public BufferedInputStream(InputStream in) { //構造器
this(in, defaultBufferSize); //指定預設長度的buffer
}
public BufferedInputStream(InputStream in, int size) { //構造器
super(in);
if (size <= 0) { //檢查輸入引數
throw new IllegalArgumentException("Buffer size <= 0");
}
buf = new byte[size]; //建立指定長度的buffer
}
// fill() 函式 旨在擴容並且從in.read()中讀入新的內容,進行一次磁碟IO。(快取大小)
//private void fill() throws IOException {
//
//byte[] buffer = getBufIfOpen(); //得到buffer
//
//if (markpos < 0)
// pos = 0; //mark位置小於0,此時pos為0
//else if (pos >= buffer.length) //pos大於buffer的長度
// if (markpos > 0) {
// int sz = pos - markpos;
// System.arraycopy(buffer, markpos, buffer, 0, sz); //System.arraycopy()使用native方法,所以效率極高
//
// pos = sz;
//
// markpos = 0;
//
// } else if (buffer.length >= marklimit) { //buffer的長度大於marklimit時,mark失效
// markpos = -1;
// pos = 0; //丟棄buffer中的內容
//
// } else { //buffer的長度小於marklimit時對buffer擴容
//
// int nsz = pos * 2;
//
// if (nsz > marklimit) nsz = marklimit; //擴容為原來的2倍,太大則為marklimit大小
//
// byte nbuf[] = new byte[nsz];
//
// System.arraycopy(buffer, 0, nbuf, 0, pos); //將buffer中的位元組拷貝如擴容後的buf中
//
// if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
//
// //在buffer在被操作時,不能取代此buffer
// throw new IOException("Stream closed");
//
// }
//
// buffer = nbuf; //將新buf賦值給buffer
//
// }
//
//count = pos;
//
//int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
//
//if (n > 0) count = n + pos;
//
//}
/**
* Fills the buffer with more data, taking into account
* shuffling and other tricks for dealing with marks.
* Assumes that it is being called by a synchronized method.
* This method also assumes that all data has already been read in,
* hence pos > count.
*/
private void fill() throws IOException {
byte[] buffer = getBufIfOpen();
if (markpos < 0)
pos = 0; /* no mark: throw away the buffer */
else if (pos >= buffer.length) /* no room left in buffer */
if (markpos > 0) { /* can throw away early part of the buffer */
int sz = pos - markpos;
System.arraycopy(buffer, markpos, buffer, 0, sz);
pos = sz;
markpos = 0;
} else if (buffer.length >= marklimit) {
markpos = -1; /* buffer got too big, invalidate mark */
pos = 0; /* drop buffer contents */
} else if (buffer.length >= MAX_BUFFER_SIZE) {
throw new OutOfMemoryError("Required array size too large");
} else { /* grow buffer */
int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
pos * 2 : MAX_BUFFER_SIZE;
if (nsz > marklimit)
nsz = marklimit;
byte nbuf[] = new byte[nsz];
System.arraycopy(buffer, 0, nbuf, 0, pos);
if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
// Can't replace buf if there was an async close.
// Note: This would need to be changed if fill()
// is ever made accessible to multiple threads.
// But for now, the only way CAS can fail is via close.
// assert buf == null;
throw new IOException("Stream closed");
}
buffer = nbuf;
}
count = pos;
int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
if (n > 0)
count = n + pos;
}
從PipedInputStream/PipedOutputStream談起
江蘇無錫繆小東
本篇主要從分析PipeInputStrem和PipedOutputStream談起。談及軟體設計的變化,以及如何將軟體拆分、組合,適配……
1 原始碼分析
下面將詳細分析PipedInputStream和PipedOutputStream的原始碼。
1.1 PipedInputStream
package java.io;
//PipedInputStream必須和PipedOutputStream聯合使用。即必須連線輸入部分。
//其原理為:PipedInputStream內部有一個Buffer,
//PipedInputStream可以使用InputStream的方法讀取其Buffer中的位元組。
//PipedInputStream中Buffer中的位元組是PipedOutputStream呼叫PipedInputStream的方法放入的。
public class PipedInputStream extends InputStream {
boolean closedByWriter = false; //標識有讀取方或寫入方關閉
volatile boolean closedByReader = false;
boolean connected = false; //是否建立連線
Thread readSide; //標識哪個執行緒
Thread writeSide;
protected static final int PIPE_SIZE = 1024; //緩衝區的預設大小
protected byte buffer[] = new byte[PIPE_SIZE]; //緩衝區
protected int in = -1; //下一個寫入位元組的位置。0代表空,in==out代表滿
protected int out = 0; //下一個讀取位元組的位置
public PipedInputStream(PipedOutputStream src) throws IOException { //給定源的輸入流
connect(src);
}
public PipedInputStream() { } //預設構造器,下部一定要connect源
public void connect(PipedOutputStream src) throws IOException { //連線輸入源
src.connect(this); //呼叫源的connect方法連線當前物件
}
protected synchronized void receive(int b) throws IOException { //只被PipedOuputStream呼叫
checkStateForReceive(); //檢查狀態,寫入
writeSide = Thread.currentThread(); //永遠是PipedOuputStream
if (in == out) awaitSpace(); //輸入和輸出相等,等待空間
if (in < 0) {
in = 0;
out = 0;
}
buffer[in++] = (byte)(b & 0xFF); //放入buffer相應的位置
if (in >= buffer.length) { in = 0; } //in為0表示buffer已空
}
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread(); //從PipedOutputStream可以看出
int bytesToTransfer = len;
while (bytesToTransfer > 0) {
if (in == out) awaitSpace(); //滿了,會通知讀取的;空會通知寫入
int nextTransferAmount = 0;
if (out < in) {
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer) nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) { in = 0; }
}
}
private void checkStateForReceive() throws IOException { //檢查當前狀態,等待輸入
if (!connected) {
throw new IOException("Pipe not connected");
} else if (closedByWriter || closedByReader) {
throw new IOException("Pipe closed");
} else if (readSide != null && !readSide.isAlive()) {
throw new IOException("Read end dead");
}
}
private void awaitSpace() throws IOException { //Buffer已滿,等待一段時間
while (in == out) { //in==out表示滿了,沒有空間
checkStateForReceive(); //檢查接受端的狀態
notifyAll(); //通知讀取端
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
}
synchronized void receivedLast() { //通知所有等待的執行緒()已經接受到最後的位元組
closedByWriter = true;
notifyAll();
}
public synchronized int read() throws IOException {
if (!connected) { //檢查一些內部狀態
throw new IOException("Pipe not connected");
} else if (closedByReader) {
throw new IOException("Pipe closed");
} else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {
throw new IOException("Write end dead");
}
readSide = Thread.currentThread(); //當前執行緒讀取
int trials = 2; //重複兩次????
while (in < 0) {
if (closedByWriter) { return -1; } //輸入斷關閉返回-1
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { //狀態錯誤
throw new IOException("Pipe broken");
}
notifyAll(); // 空了,通知寫入端可以寫入
try {
wait(1000);
} catch (InterruptedException ex) {
throw new java.io.InterruptedIOException();
}
}
int ret = buffer[out++] & 0xFF; //
if (out >= buffer.length) { out = 0; }
if (in == out) { in = -1; } //沒有任何位元組
return ret;
}
public synchronized int read(byte b[], int off, int len) throws IOException {
if (b == null) { //檢查輸入引數的正確性
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int c = read(); //讀取下一個
if (c < 0) { return -1; } //已經到達末尾了,返回-1
b[off] = (byte) c; //放入外部buffer中
int rlen = 1; //return-len
while ((in >= 0) && (--len > 0)) { //下一個in存在,且沒有到達len
b[off + rlen] = buffer[out++]; //依次放入外部buffer
rlen++;
if (out >= buffer.length) { out = 0; } //讀到buffer的末尾,返回頭部
if (in == out) { in = -1; } //讀、寫位置一致時,表示沒有資料
}
return rlen; //返回填充的長度
}
public synchronized int available() throws IOException { //返回還有多少位元組可以讀取
if(in < 0)
return 0; //到達末端,沒有位元組
else if(in == out)
return buffer.length; //寫入的和讀出的一致,表示滿
else if (in > out)
return in - out; //寫入的大於讀出
else
return in + buffer.length - out; //寫入的小於讀出的
}
public void close() throws IOException { //關閉當前流,同時釋放與其相關的資源
closedByReader = true; //表示由輸入流關閉
synchronized (this) { in = -1; } //同步化當前物件,in為-1
}
}
1.2 PipedOutputStream
// PipedOutputStream一般必須和一個PipedInputStream連線。共同構成一個pipe。
//它們的職能是:
package java.io;
import java.io.*;
public class PipedOutputStream extends OutputStream {
private PipedInputStream sink; //包含一個PipedInputStream
public PipedOutputStream(PipedInputStream snk)throws IOException { //帶有目的地的構造器
connect(snk);
}
public PipedOutputStream() { } //預設構造器,必須使用下面的connect方法連線
public synchronized void connect(PipedInputStream snk) throws IOException {
if (snk == null) { //檢查輸入引數的正確性
throw new NullPointerException();
} else if (sink != null || snk.connected) {
throw new IOException("Already connected");
}
sink = snk; //一系列初始化工作
snk.in = -1;
snk.out = 0;
snk.connected = true;
}
public void write(int b) throws IOException { //向流中寫入資料
if (sink == null) { throw new IOException("Pipe not connected"); }
sink.receive(b); //本質上是,呼叫PipedInputStream的receive方法接受此位元組
}
public void write(byte b[], int off, int len) throws IOException {
if (sink == null) { //首先檢查輸入引數的正確性
throw new IOException("Pipe not connected");
} else if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
sink.receive(b, off, len); //呼叫PipedInputStream的receive方法接受
}
public synchronized void flush() throws IOException { //flush輸出流
if (sink != null) {
synchronized (sink) { sink.notifyAll(); } //本質是通知輸入流,可以讀取
}
}
public void close() throws IOException { //關閉流同時釋放相關資源
if (sink != null) { sink.receivedLast(); }
}
}
2 Buffer的狀態
上圖是PipedInputStream中快取的狀態圖。在程式中我們利用了byte陣列,迴圈地向其中寫入資料,寫入有一個cursor(in),讀出也有一個cursor(out)。上圖表示in和out不同位置時,buffer中的各個位置的狀態。藍色的代表可以讀取的位元組。白色的表示此位置沒有位元組,或者此位置已經被PipedInputStream讀取了。
3 互動簡圖
下圖是從原始碼部分轉換過來的關於PipedInputStream和PipedOutputStream的互動圖。
從圖中可以看出:
1.整個PipedInputStream是這對管道的核心。管道本身是一個byte的陣列。
2.PipedOutputStream物件通過Delegate方法複用PipedInputStream,同時遮蔽了其中的讀取的方法,我們僅僅可以構造PipedOutputStream物件。(從這一點可以看出Delegate複用比繼承複用的優越性了!)從設計模式的角度更象Adapter――PipedInputStream本身提供讀取和寫入的功能,將寫入的功能適配到OutputStream,就成為一個PipedOutputStream。這樣就形成一個類,適配後形成兩種功能的類。
3.呼叫PipedOutputStream的連線方法實際就是呼叫PipedInputStream的連線方法。
4.呼叫PipedOutputStream的寫相關的方法實際就是呼叫PipedInputStream的對應方法。
以上也是一種適配,將管道的概念適配到流的概念,同時將兩者的職能分開。
4 將Chanel放入PipedOutputStream
上面的例子中,Chanel放在PipedInputStream中,我們仔細思考後可以順理成章地將其Chanel放入PipedOutputStream中。請注意synchronized方法是得到哪個位元組流的鎖!!
5 Chanel移出的一個例子
在上面兩個例子中Buffer要麼在寫入物件的內部,要麼在讀取物件的內部。主要通過適配該物件的方法,達到自己的需求而已。下面是一個一般的例子――將Chanel移出,Chanel提供了寫入與讀取的功能。這也完全合乎OO的“Single Responsibility Protocol――SRP”。輸入部分使用Delegate複用此Chanel,將其適配至InputStream和OutputStream。下面是簡單的Source code。
//PipedChanel.java
import java.io.IOException ;
public class PipedChanel {
protected static final int PIPE_SIZE = 1024;
protected byte buffer[] = new byte[PIPE_SIZE];
protected int in = -1;
protected int out = 0;
public PipedChanel(){ }
public PipedChanel(int size){
buffer = new byte[size] ;
}
public synchronized int read() throws IOException { }
public synchronized int read(byte b[], int off, int len) throws IOException { }
public synchronized int available() throws IOException {}
public synchronized void close() throws IOException {}
public synchronized void write(int b) throws IOException {}
public synchronized void write(byte b[]) throws IOException {}
public synchronized void write(byte b[], int off, int len) throws IOException {}
public synchronized void flush() throws IOException {}
public void waitWhileFull(){ } //當Chanel已經滿了,寫執行緒等待
public void waitWhileEmpty{ } //當Chanel為空,讀取執行緒等待
//以上是兩個操作Chanel時的狀態相關的方法。
//是一致性程式設計部分,典型的設計模式。
//這兩個方法,包含在對應讀或寫方法的最前面。
}
// PipedChanelInputStream.java
import java.io.*;
public class PipedChanelInputStream extends InputStream {
private PipedChanel chanel ;
public PipedChanelInputStream(PipedChanel chanel){
this.chanel = chanel ;
}
public int read() throws IOException {
return chanel.read();
}
public int read(byte b[], int off, int len) throws IOException {
return chanel.read(b,off,len);
}
public int available() throws IOException {
return chanel.available();
}
public void close() throws IOException {
chanel.close();
}
}
// PipedChanelOutputStream.java
import java.io.*;
public class PipedChanelOutputStream extends OutputStream {
private PipedChanel chanel ;
public PipedChanelOutputStream(PipedChanel chanel){
this.chanel = chanel ;
}
public synchronized void write(int b) throws IOException {
chanel.write(b);
}
public synchronized void write(byte b[]) throws IOException {
chanel.write(b);
}
public synchronized void write(byte b[], int off, int len) throws IOException {
chanel.write(b,off,len);
}
public synchronized void flush() throws IOException {
chanel.flush();
}
public synchronized void close() throws IOException {
chanel.close();
}
}
很簡單的例子。我們可以體會介面卡模式,可以體會軟體設計的靈活性……
上面的關於PipedInputStream和PipedOutputStream的例子,本質上是對一個Chanel的幾個不同的適配。Chanel作為一種程式設計模式,在軟體設計中有極其廣泛的應用。下面一節是JMS的簡潔闡述!
以上的例子其實是一個典型的使用介面卡。
6 JMS的架構
JMS為