1. 程式人生 > >Java IO完全總結(轉載) --- 重點在原始碼分析

Java IO完全總結(轉載) --- 重點在原始碼分析

InputStreamByteArrayInputStream

江蘇無錫繆小東

本篇主要分析:1.如何將byte陣列適配至ByteArrayInputStream,對應與IO部分的介面卡模式;2.BufferedInputStream的工作原理,對應於IO的裝飾器模式,會首先研究InputStreamFilterInputStream的原始碼,同時會將要談談軟體設計中的快取相關的知識。後面專門一章分析PipedInputStreamPipedOutStream,簡單談談管道相關的知識,以及軟體架構的想法。

1 InputStream

InputStream 是輸入位元組流部分,裝飾器模式的頂層類。主要規定了輸入位元組流的公共方法。

package java.io;

public abstract class InputStream implements Closeable {

    private static final int SKIP_BUFFER_SIZE = 2048;  //用於skip方法,和skipBuffer相關

    private static byte[] skipBuffer;    // skipBuffer is initialized in skip(long), if needed.

 
    public abstract int read() throws IOException;      //從輸入流中讀取下一個位元組,

                                                        //正常返回0-255,到達檔案的末尾返回-1

                                                        //在流中還有資料,但是沒有讀到時該方法會阻塞(block)

                                                        //Java IO和New IO的區別就是阻塞流和非阻塞流

                                                        //抽象方法哦!不同的子類不同的實現哦!

 

         //將流中的資料讀入放在byte陣列的第off個位置先後的len個位置中

         //放回值為放入位元組的個數。

    public int read(byte b[], int off, int len) throws IOException {         

         if (b == null) {

             throw new NullPointerException();

         } else if (off < 0 || len < 0 || len > b.length - off) {

             throw new IndexOutOfBoundsException();

         } else if (len == 0) {

             return 0;

         }  //檢查輸入是否正常。一般情況下,檢查輸入是方法設計的第一步
         int c = read();                         //讀取下一個位元組
         if (c == -1) {    return -1;   }        //到達檔案的末端返回-1
         b[off] = (byte)c;                       //放回的位元組downcast
         int i = 1;                              //已經讀取了一個位元組
         try {
             for (; i < len ; i++) {             //最多讀取len個位元組,所以要迴圈len次
                   c = read();                   //每次迴圈從流中讀取一個位元組

                                                 //由於read方法阻塞,
                                                 //所以read(byte[],int,int)也會阻塞
                   if (c == -1) {  break;  }     //到達末尾,理所當然放回-1
                   b[off + i] = (byte)c;         //讀到就放入byte陣列中
             }
         } catch (IOException ee) {     }
         return i;

         //上面這個部分其實還有一點比較重要,int i = 1;在迴圈的外圍,或許你經常見到,

         //或許你只會在迴圈是才宣告,為什麼呢?

         //宣告在外面,增大了變數的生存週期(在迴圈外面),所以後面可以return返回

         //極其一般的想法。在類成員變數生命週期中使用同樣的理念。
         //在軟體設計中,類和類的關係中也是一樣的。
    } //這個方法在利用抽象方法read,某種意義上簡單的Templete模式。
    public int read(byte b[]) throws IOException {
          return read(b, 0, b.length);
    } //利用上面的方法read(byte[] b)

    public long skip(long n) throws IOException {
         long remaining = n;                                  //方法內部使用的、表示要跳過的位元組數目,
//使用它完成一系列位元組讀取的迴圈
         int nr;
         if (skipBuffer == null)
             skipBuffer = new byte[SKIP_BUFFER_SIZE];         //初始化一個跳轉的快取
         byte[] localSkipBuffer = skipBuffer;                 //本地化的跳轉快取
         if (n <= 0) {    return 0;      }                    //檢查輸入引數,應該放在方法的開始
         while (remaining > 0) {                              //一共要跳過n個,每次跳過部分,迴圈
             nr = read(localSkipBuffer, 0, (int) Math.min(SKIP_BUFFER_SIZE, remaining));
                                                              //利用上面的read(byte[],int,int)方法儘量讀取n個位元組  
             if (nr < 0) {  break;    }                       //讀到流的末端,則返回
             remaining -= nr;                                 //沒有完全讀到需要的,則繼續迴圈
         }       
         return n - remaining;                                //返回時要麼全部讀完,要麼因為到達檔案末端,讀取了部分
    }

 

    public int available() throws IOException {               //查詢流中還有多少可以讀取的位元組
                   return 0;
    }

 //該方法不會block。在java中抽象類方法的實現一般有以下幾種方式:

//1.丟擲異常(java.util);2.“弱”實現。象上面這種。子類在必要的時候覆蓋它。

//3.“空”實現。下面有例子。

 

    public void close() throws IOException {}
         //關閉當前流、同時釋放與此流相關的資源

    public synchronized void mark(int readlimit) {}

         //在當前位置對流進行標記,必要的時候可以使用reset方法返回。

         //markSupport可以查詢當前流是否支援mark

 

    public synchronized void reset() throws IOException {

                   throw new IOException("mark/reset not supported");

    }

         //對mark過的流進行復位。只有當流支援mark時才可以使用此方法。

         //看看mark、available和reset方法。體會為什麼?!

 

    public boolean markSupported() {           //查詢是否支援mark

                   return false;

    }                 //絕大部分不支援,因此提供預設實現,返回false。子類有需要可以覆蓋。

        

}

2 FilterInputStream

這是位元組輸入流部分裝飾器模式的核心。是我們在裝飾器模式中的Decorator物件,主要完成對其它流裝飾的基本功能。下面是它的原始碼:

package java.io;

 

//該類對被裝飾的流進行基本的包裹。不增加額外的功能。

//客戶在需要的時候可以覆蓋相應的方法。具體覆蓋可以在ByteInputStream中看到!

public class FilterInputStream extends InputStream {

    protected volatile InputStream in;                       //將要被裝飾的位元組輸入流

 

    protected FilterInputStream(InputStream in) {   //通過構造方法傳入此被裝飾的流

                   this.in = in;

    }

         //裝飾器的程式碼特徵:被裝飾的物件一般是裝飾器的成員變數

         //上面幾行可以看出。

 

         //下面這些方法,完成最小的裝飾――0裝飾,只是呼叫被裝飾流的方法而已

 

    public int read() throws IOException {

                   return in.read();

    }

 

    public int read(byte b[]) throws IOException {

                   return read(b, 0, b.length);

    }

 

    public int read(byte b[], int off, int len) throws IOException {

                   return in.read(b, off, len);

    }

 

    public long skip(long n) throws IOException {

                   return in.skip(n);

    }

 

    public int available() throws IOException {

                   return in.available();

    }

 

    public void close() throws IOException {

                   in.close();

    }

 

    public synchronized void mark(int readlimit) {

                   in.mark(readlimit);

    }

 

    public synchronized void reset() throws IOException {

                   in.reset();

    }

 

    public boolean markSupported() {

                   return in.markSupported();

}

//以上的方法,都是通過呼叫被裝飾物件in完成的。沒有新增任何額外功能

//裝飾器模式中的Decorator物件,不增加被裝飾物件的功能。

//它是裝飾器模式中的核心。更多關於裝飾器模式的理論請閱讀部落格中的文章。

}

以上分析了所有位元組輸入流的公共父類InputStream和裝飾器類FilterInputStream類。他們是裝飾器模式中兩個重要的類。更多細節請閱讀部落格中裝飾器模式的文章。下面將講解一個具體的流ByteArrayInputStream,不過它是採用介面卡設計模式。

3 ByteArrayByteArrayInputStream的適配

// ByteArrayInputStream內部有一個byte型別的buffer。

//很典型的介面卡模式的應用――將byte陣列適配流的介面。

//下面是原始碼分析:

 

package java.io;

 

public class ByteArrayInputStream extends InputStream {

    protected byte buf[];                //內部的buffer,一般通過構造器輸入

protected int pos;                   //當前位置的cursor。從0至byte陣列的長度。

//byte[pos]就是read方法讀取的位元組

    protected int mark = 0;           //mark的位置。

    protected int count;                          //流中位元組的數目。不一定與byte[]的長度一致???

 

    public ByteArrayInputStream(byte buf[]) {//從一個byte[]建立一個ByteArrayInputStream

         this.buf = buf;                                                      //初始化流中的各個成員變數

        this.pos = 0;

         this.count = buf.length;                              //count就等於buf.length

    }

 

    public ByteArrayInputStream(byte buf[], int offset, int length) {                //構造器

         this.buf = buf;

        this.pos = offset;                                                                                      //與上面不同

         this.count = Math.min(offset + length, buf.length);

        this.mark = offset;                                                                                             //與上面不同

    }

 

    public synchronized int read() {                                           //從流中讀取下一個位元組

                   return (pos < count) ? (buf[pos++] & 0xff) : -1; //返回下一個位置的位元組

                                                                                                                //流中沒有資料則返回-1

    }

 

         //下面這個方法很有意思!從InputStream中可以看出其提供了該方法的實現。

         //為什麼ByteArrayInputStream要覆蓋此方法呢?

         //同樣的我們在Java Collections Framework中可以看到:

//AbstractCollection利用iterator實現了Collecion介面的很多方法。但是,

//在ArrayList中卻有很多被子類覆蓋了。為什麼如此呢??

 

    public synchronized int read(byte b[], int off, int len) {

         if (b == null) {                                                               //首先檢查輸入引數的狀態是否正確

             throw new NullPointerException();

         } else if (off < 0 || len < 0 || len > b.length - off) {

             throw new IndexOutOfBoundsException();

         }

         if (pos >= count) {             return -1;             }

         if (pos + len > count) {      len = count - pos;         }

         if (len <= 0) {           return 0;     }

         System.arraycopy(buf, pos, b, off, len);                     //java中提供資料複製的方法

         pos += len;

         return len;

    }

         //出於速度的原因!他們都用到System.arraycopy方法。想想為什麼?

         //某些時候,父類不能完全實現子類的功能,父類的實現一般比較通用。

//當子類有更有效的方法時,我們會覆蓋這些方法。這樣可是不太OO的哦!

 

         //下面這個方法,在InputStream中也已經實現了。

//但是當時是通過將位元組讀入一個buffer中實現的,好像效率低了一點。

//看看下面這段程式碼,是否極其簡單呢?!

    public synchronized long skip(long n) {

         if (pos + n > count) {    n = count - pos;       }        //當前位置,可以跳躍的位元組數目

         if (n < 0) {       return 0;     }                                    //小於0,則不可以跳躍

         pos += n;                                                                              //跳躍後,當前位置變化

         return n;

    }                                    //比InputStream中的方法簡單、高效吧!

 

    public synchronized int available() {

                   return count - pos;

    }

         //查詢流中還有多少位元組沒有讀取。

//在我們的ByteArrayInputStream中就是當前位置以後位元組的數目。  

 

    public boolean markSupported() {                   

                   return true;

    }        //ByteArrayInputStream支援mark所以返回true

 

    public void mark(int readAheadLimit) {            

                   mark = pos;

    }

//在流中當前位置mark。

//在我們的ByteArrayInputStream中就是將當前位置賦給mark變數。

//讀取流中的位元組就是讀取位元組陣列中當前位置向後的的位元組。

 

    public synchronized void reset() {

                   pos = mark;

    }

         //重置流。即回到mark的位置。

 

    public void close() throws IOException {   }

         //關閉ByteArrayInputStream不會產生任何動作。為什麼?仔細考慮吧!!

}

上面我們分3小節講了裝飾器模式中的公共父類(對應於輸入位元組流的InputStream)、Decorator(對應於輸入位元組流的FilterInputStream)和基本被裝飾物件(對應於輸入位元組流的媒體位元組流)。下面我們就要講述裝飾器模式中的具體的包裝器(對應於輸入位元組流的包裝器流)。

4 BufferedInputStream

4.1原理及其在軟體硬體中的應用

1.read――read(byte[] ,int , int)

2.BufferedInputStream

3.《由一個簡單的程式談起》

4. Cache

5.Pool

6.Spling Printer

(最近比較忙,不講了!)

4.2 BufferedInputStream原始碼分析

bufferedInputStream  的  fill() 函式 旨在擴容並且從in.read()中讀入新的內容,進行一次磁碟IO,(快取大小) 。

bufferinputStream 是一種特殊的機制,它一次向buf中寫入8192長度內容,這樣相當於一次磁碟IO就可以寫入足夠內容,不同想其他一樣多次進行磁碟IO浪費效率,畢竟read() 也是 阻塞的 極大地浪費效率。 

    所以在bufferinputstream中的read() 也會呼叫fill() 因為buf內容中  pos > count .read1()只是有多少讀多少,read()是貪婪讀,直至in中內容available為false;

package java.io;

 

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

 

//該類主要完成對被包裝流,加上一個快取的功能

public class BufferedInputStream extends FilterInputStream {

    private static int defaultBufferSize = 8192;       //預設快取的大小
    protected volatile byte buf[];                     //內部的快取
    protected int count;                               //buffer的大小
    protected int pos;                                 //buffer中cursor的位置
    protected int markpos = -1;                        //mark的位置
    protected int marklimit;                           //mark的範圍

 

//原子性更新。和一致性程式設計相關

    private static final
        AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
        AtomicReferenceFieldUpdater.newUpdater (BufferedInputStream.class,  byte[].class, "buf");

 

    private InputStream getInIfOpen() throws IOException {  //檢查輸入流是否關閉,同時返回被包裝流
        InputStream input = in;
         if (input == null)    throw new IOException("Stream closed");
        return input;
    }

 

    private byte[] getBufIfOpen() throws IOException {                      //檢查buffer的狀態,同時返回快取
        byte[] buffer = buf;
         if (buffer == null)   throw new IOException("Stream closed");      //不太可能發生的狀態
        return buffer;
    }

 

    public BufferedInputStream(InputStream in) {                    //構造器
         this(in, defaultBufferSize);                     //指定預設長度的buffer
    }

 

    public BufferedInputStream(InputStream in, int size) {          //構造器

        super(in);

        if (size <= 0) {                                            //檢查輸入引數
            throw new IllegalArgumentException("Buffer size <= 0");
        }
        buf = new byte[size];                            //建立指定長度的buffer

    }

  //   fill() 函式 旨在擴容並且從in.read()中讀入新的內容,進行一次磁碟IO。(快取大小)
//private void fill() throws IOException {
//
//byte[] buffer = getBufIfOpen();                                 //得到buffer
//
//if (markpos < 0)
//    pos = 0;                                                   //mark位置小於0,此時pos為0
//else if (pos >= buffer.length)                                 //pos大於buffer的長度
//    if (markpos > 0) {        
//          int sz = pos - markpos;                           
//          System.arraycopy(buffer, markpos, buffer, 0, sz);    //System.arraycopy()使用native方法,所以效率極高
//
//          pos = sz;
//
//          markpos = 0;
//
//    } else if (buffer.length >= marklimit) {                   //buffer的長度大於marklimit時,mark失效
//          markpos = -1;                                        
//          pos = 0;                                             //丟棄buffer中的內容
//
//    } else {                                                   //buffer的長度小於marklimit時對buffer擴容
//
//          int nsz = pos * 2;
//
//          if (nsz > marklimit)           nsz = marklimit;      //擴容為原來的2倍,太大則為marklimit大小
//
//          byte nbuf[] = new byte[nsz];                    
//
//          System.arraycopy(buffer, 0, nbuf, 0, pos);           //將buffer中的位元組拷貝如擴容後的buf中
//
//          if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
//
//                                                               //在buffer在被操作時,不能取代此buffer
//              throw new IOException("Stream closed");
//
//         }
//
//         buffer = nbuf;                                         //將新buf賦值給buffer
//
//    }
//
//count = pos;
//
//int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
//
//if (n > 0)     count = n + pos;
//
//}

/**
     * Fills the buffer with more data, taking into account
     * shuffling and other tricks for dealing with marks.
     * Assumes that it is being called by a synchronized method.
     * This method also assumes that all data has already been read in,
     * hence pos > count.
     */
    private void fill() throws IOException {
        byte[] buffer = getBufIfOpen();
        if (markpos < 0)
            pos = 0;            /* no mark: throw away the buffer */
        else if (pos >= buffer.length)  /* no room left in buffer */
            if (markpos > 0) {  /* can throw away early part of the buffer */
                int sz = pos - markpos;
                System.arraycopy(buffer, markpos, buffer, 0, sz);
                pos = sz;
                markpos = 0;
            } else if (buffer.length >= marklimit) {
                markpos = -1;   /* buffer got too big, invalidate mark */
                pos = 0;        /* drop buffer contents */
            } else if (buffer.length >= MAX_BUFFER_SIZE) {
                throw new OutOfMemoryError("Required array size too large");
            } else {            /* grow buffer */
                int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
                        pos * 2 : MAX_BUFFER_SIZE;
                if (nsz > marklimit)
                    nsz = marklimit;
                byte nbuf[] = new byte[nsz];
                System.arraycopy(buffer, 0, nbuf, 0, pos);
                if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
                    // Can't replace buf if there was an async close.
                    // Note: This would need to be changed if fill()
                    // is ever made accessible to multiple threads.
                    // But for now, the only way CAS can fail is via close.
                    // assert buf == null;
                    throw new IOException("Stream closed");
                }
                buffer = nbuf;
            }
        count = pos;
        int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
        if (n > 0)
            count = n + pos;
    }

PipedInputStream/PipedOutputStream談起

江蘇無錫繆小東

本篇主要從分析PipeInputStremPipedOutputStream談起。談及軟體設計的變化,以及如何將軟體拆分、組合,適配……

原始碼分析

下面將詳細分析PipedInputStreamPipedOutputStream的原始碼。

1.1 PipedInputStream

package java.io;

//PipedInputStream必須和PipedOutputStream聯合使用。即必須連線輸入部分。

//其原理為:PipedInputStream內部有一個Buffer,

//PipedInputStream可以使用InputStream的方法讀取其Buffer中的位元組。

//PipedInputStream中Buffer中的位元組是PipedOutputStream呼叫PipedInputStream的方法放入的。

 

public class PipedInputStream extends InputStream {

    boolean closedByWriter = false;                                                             //標識有讀取方或寫入方關閉

    volatile boolean closedByReader = false;

    boolean connected = false;                                                                     //是否建立連線

    Thread readSide;                                                                                             //標識哪個執行緒

    Thread writeSide;

 

    protected static final int PIPE_SIZE = 1024;                         //緩衝區的預設大小

    protected byte buffer[] = new byte[PIPE_SIZE];                  //緩衝區

    protected int in = -1;               //下一個寫入位元組的位置。0代表空,in==out代表滿

    protected int out = 0;               //下一個讀取位元組的位置

 

    public PipedInputStream(PipedOutputStream src) throws IOException {         //給定源的輸入流

                   connect(src);

    }

 

    public PipedInputStream() {    }                                             //預設構造器,下部一定要connect源

 

    public void connect(PipedOutputStream src) throws IOException {              //連線輸入源

                   src.connect(this);                                            //呼叫源的connect方法連線當前物件

    }

 

    protected synchronized void receive(int b) throws IOException {             //只被PipedOuputStream呼叫

        checkStateForReceive();                                                 //檢查狀態,寫入

        writeSide = Thread.currentThread();                                     //永遠是PipedOuputStream

        if (in == out)     awaitSpace();                                        //輸入和輸出相等,等待空間

         if (in < 0) {

             in = 0;

             out = 0;

         }

         buffer[in++] = (byte)(b & 0xFF);                                        //放入buffer相應的位置

         if (in >= buffer.length) {      in = 0;         }                       //in為0表示buffer已空

    }

 

    synchronized void receive(byte b[], int off, int len)  throws IOException {

        checkStateForReceive();

        writeSide = Thread.currentThread();                                     //從PipedOutputStream可以看出

        int bytesToTransfer = len;

        while (bytesToTransfer > 0) {

            if (in == out)    awaitSpace();                                 //滿了,會通知讀取的;空會通知寫入

            int nextTransferAmount = 0;

            if (out < in) {

                nextTransferAmount = buffer.length - in;

            } else if (in < out) {

                if (in == -1) {

                    in = out = 0;

                    nextTransferAmount = buffer.length - in;

                } else {

                    nextTransferAmount = out - in;

                }

            }

            if (nextTransferAmount > bytesToTransfer)     nextTransferAmount = bytesToTransfer;

            assert(nextTransferAmount > 0);

            System.arraycopy(b, off, buffer, in, nextTransferAmount);

            bytesToTransfer -= nextTransferAmount;

            off += nextTransferAmount;

            in += nextTransferAmount;

            if (in >= buffer.length) {     in = 0;      }

        }

    }

 

    private void checkStateForReceive() throws IOException {                           //檢查當前狀態,等待輸入

        if (!connected) {

            throw new IOException("Pipe not connected");

        } else if (closedByWriter || closedByReader) {

             throw new IOException("Pipe closed");

         } else if (readSide != null && !readSide.isAlive()) {

            throw new IOException("Read end dead");

        }

    }

 

    private void awaitSpace() throws IOException {                            //Buffer已滿,等待一段時間

         while (in == out) {                                                  //in==out表示滿了,沒有空間

             checkStateForReceive();                                          //檢查接受端的狀態

             notifyAll();                                                     //通知讀取端

             try {

                 wait(1000);

             } catch (InterruptedException ex) {

                   throw new java.io.InterruptedIOException();

             }

         }

    }

 

    synchronized void receivedLast() {                  //通知所有等待的執行緒()已經接受到最後的位元組

         closedByWriter = true;                            

         notifyAll();

    }

 

    public synchronized int read()  throws IOException {

        if (!connected) {                                                         //檢查一些內部狀態

            throw new IOException("Pipe not connected");

        } else if (closedByReader) {

             throw new IOException("Pipe closed");

         } else if (writeSide != null && !writeSide.isAlive()&& !closedByWriter && (in < 0)) {

            throw new IOException("Write end dead");

         }

        readSide = Thread.currentThread();                                          //當前執行緒讀取

         int trials = 2;                                                            //重複兩次????

         while (in < 0) {

             if (closedByWriter) {              return -1;        }                 //輸入斷關閉返回-1

             if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { //狀態錯誤

                   throw new IOException("Pipe broken");

             }

             notifyAll();                                                             // 空了,通知寫入端可以寫入

             try {

                 wait(1000);

             } catch (InterruptedException ex) {

                   throw new java.io.InterruptedIOException();

             }

        }

         int ret = buffer[out++] & 0xFF;                                                        //

         if (out >= buffer.length) {             out = 0;                }

         if (in == out) {           in = -1;                 }                             //沒有任何位元組

         return ret;

    }

 

    public synchronized int read(byte b[], int off, int len)  throws IOException {

     if (b == null) {                                                                       //檢查輸入引數的正確性

         throw new NullPointerException();

     } else if (off < 0 || len < 0 || len > b.length - off) {

         throw new IndexOutOfBoundsException();

     } else if (len == 0) {

         return 0;

     }

     int c = read();                                                                //讀取下一個

     if (c < 0) {    return -1;       }                                             //已經到達末尾了,返回-1

     b[off] = (byte) c;                                                             //放入外部buffer中

     int rlen = 1;                                                                  //return-len

     while ((in >= 0) && (--len > 0)) {                                             //下一個in存在,且沒有到達len

         b[off + rlen] = buffer[out++];                                            //依次放入外部buffer

         rlen++;

         if (out >= buffer.length) {         out = 0;           }                 //讀到buffer的末尾,返回頭部

         if (in == out) {     in = -1;      }               //讀、寫位置一致時,表示沒有資料

     }

     return rlen;                                                                            //返回填充的長度

    }

 

    public synchronized int available() throws IOException {             //返回還有多少位元組可以讀取

         if(in < 0)

             return 0;                                                   //到達末端,沒有位元組

         else if(in == out)

             return buffer.length;                                       //寫入的和讀出的一致,表示滿

         else if (in > out)

             return in - out;                                             //寫入的大於讀出

         else

             return in + buffer.length - out;                             //寫入的小於讀出的

    }

 

    public void close()  throws IOException {                //關閉當前流,同時釋放與其相關的資源

         closedByReader = true;                                             //表示由輸入流關閉

        synchronized (this) {     in = -1;    }        //同步化當前物件,in為-1

    }

}

 

1.2 PipedOutputStream
// PipedOutputStream一般必須和一個PipedInputStream連線。共同構成一個pipe。

//它們的職能是:

 

package java.io;

import java.io.*;

 

public class PipedOutputStream extends OutputStream {

    private PipedInputStream sink;                //包含一個PipedInputStream

 

    public PipedOutputStream(PipedInputStream snk)throws IOException {       //帶有目的地的構造器

                   connect(snk);

    }

   

    public PipedOutputStream() {  }                      //預設構造器,必須使用下面的connect方法連線

   

    public synchronized void connect(PipedInputStream snk) throws IOException {

        if (snk == null) {                                                  //檢查輸入引數的正確性

            throw new NullPointerException();

        } else if (sink != null || snk.connected) {

             throw new IOException("Already connected");

         }

         sink = snk;                                                       //一系列初始化工作

         snk.in = -1;

         snk.out = 0;

        snk.connected = true;

    }

 

    public void write(int b) throws IOException {                        //向流中寫入資料

        if (sink == null) {    throw new IOException("Pipe not connected");      }

         sink.receive(b);            //本質上是,呼叫PipedInputStream的receive方法接受此位元組

    }

 

    public void write(byte b[], int off, int len) throws IOException {

        if (sink == null) {                                                  //首先檢查輸入引數的正確性

            throw new IOException("Pipe not connected");

        } else if (b == null) {

             throw new NullPointerException();

         } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {

             throw new IndexOutOfBoundsException();

         } else if (len == 0) {

             return;

         }

         sink.receive(b, off, len);                                       //呼叫PipedInputStream的receive方法接受

    }

 

    public synchronized void flush() throws IOException {                 //flush輸出流

         if (sink != null) {

            synchronized (sink) {     sink.notifyAll();     } //本質是通知輸入流,可以讀取

         }

    }

 

    public void close()  throws IOException {                         //關閉流同時釋放相關資源

         if (sink != null) {    sink.receivedLast();         }

    }

}

2 Buffer的狀態

上圖是PipedInputStream中快取的狀態圖。在程式中我們利用了byte陣列,迴圈地向其中寫入資料,寫入有一個cursorin),讀出也有一個cursorout)。上圖表示inout不同位置時,buffer中的各個位置的狀態。藍色的代表可以讀取的位元組。白色的表示此位置沒有位元組,或者此位置已經被PipedInputStream讀取了。

互動簡圖

下圖是從原始碼部分轉換過來的關於PipedInputStreamPipedOutputStream的互動圖。

 

從圖中可以看出:

1.整個PipedInputStream是這對管道的核心。管道本身是一個byte的陣列。

2.PipedOutputStream物件通過Delegate方法複用PipedInputStream,同時遮蔽了其中的讀取的方法,我們僅僅可以構造PipedOutputStream物件。(從這一點可以看出Delegate複用比繼承複用的優越性了!)從設計模式的角度更象Adapter――PipedInputStream本身提供讀取和寫入的功能,將寫入的功能適配到OutputStream,就成為一個PipedOutputStream。這樣就形成一個類,適配後形成兩種功能的類。

3.呼叫PipedOutputStream的連線方法實際就是呼叫PipedInputStream的連線方法。

4.呼叫PipedOutputStream的寫相關的方法實際就是呼叫PipedInputStream的對應方法。

以上也是一種適配,將管道的概念適配到流的概念,同時將兩者的職能分開。

Chanel放入PipedOutputStream

上面的例子中,Chanel放在PipedInputStream中,我們仔細思考後可以順理成章地將其Chanel放入PipedOutputStream中。請注意synchronized方法是得到哪個位元組流的鎖!!

5 Chanel移出的一個例子

在上面兩個例子中Buffer要麼在寫入物件的內部,要麼在讀取物件的內部。主要通過適配該物件的方法,達到自己的需求而已。下面是一個一般的例子――將Chanel移出,Chanel提供了寫入與讀取的功能。這也完全合乎OO的“Single Responsibility Protocol――SRP”。輸入部分使用Delegate複用此Chanel,將其適配至InputStreamOutputStream。下面是簡單的Source code

//PipedChanel.java

       import java.io.IOException ;

 

public class PipedChanel {

    protected static final int PIPE_SIZE = 1024;

    protected byte buffer[] = new byte[PIPE_SIZE];   

    protected int in = -1;

protected int out = 0;

   

    public PipedChanel(){  }       

    public PipedChanel(int size){

           buffer = new byte[size]  ; 

    }   

   

    public synchronized int read() throws IOException {    }       

    public synchronized int read(byte b[], int off, int len)  throws IOException {    }   

    public synchronized int available() throws IOException {}   

    public synchronized void close()  throws IOException {}       

   

public synchronized void write(int b)  throws IOException {}

public synchronized void write(byte b[]) throws IOException {}

    public synchronized void write(byte b[], int off, int len) throws IOException {}         

public synchronized void flush() throws IOException {}       

 

    public void waitWhileFull(){    }            //當Chanel已經滿了,寫執行緒等待

public void waitWhileEmpty{    }        //當Chanel為空,讀取執行緒等待

//以上是兩個操作Chanel時的狀態相關的方法。

//是一致性程式設計部分,典型的設計模式。

//這兩個方法,包含在對應讀或寫方法的最前面。

}

 

 

       // PipedChanelInputStream.java

import java.io.*;

 

public class PipedChanelInputStream extends InputStream {

       private PipedChanel chanel ;

      

       public PipedChanelInputStream(PipedChanel chanel){

              this.chanel = chanel ;

       }

      

       public int read() throws IOException {

              return chanel.read();

       }   

   

    public  int read(byte b[], int off, int len)  throws IOException {

           return chanel.read(b,off,len);

    }

   

    public  int available() throws IOException {

           return chanel.available();

    }

   

    public  void close()  throws IOException {

           chanel.close();      

    }        

      

}

 

 

       // PipedChanelOutputStream.java

import java.io.*;

 

public class PipedChanelOutputStream extends OutputStream {

       private PipedChanel chanel ;

      

       public PipedChanelOutputStream(PipedChanel chanel){

              this.chanel = chanel ;

       }

   

    public synchronized void write(int b)  throws IOException {

           chanel.write(b);     

    }

    public synchronized void write(byte b[]) throws IOException {

           chanel.write(b);     

    }

    public synchronized void write(byte b[], int off, int len) throws IOException {

           chanel.write(b,off,len); 

    }    

    public synchronized void flush() throws IOException {

           chanel.flush();

    }

    public synchronized void close()  throws IOException {

           chanel.close();      

    }   

}

很簡單的例子。我們可以體會介面卡模式,可以體會軟體設計的靈活性……

上面的關於PipedInputStreamPipedOutputStream的例子,本質上是對一個Chanel的幾個不同的適配。Chanel作為一種程式設計模式,在軟體設計中有極其廣泛的應用。下面一節是JMS的簡潔闡述!

以上的例子其實是一個典型的使用介面卡。

6 JMS的架構

JMS