Java之IO(八)PipedIutputStream和PipedOutputStream
轉載請註明源出處:http://www.cnblogs.com/lighten/p/7056278.html
1.前言
本章介紹Java的IO體系中最後一對字節流--管道流。之前在字節數組流的時候就說過,其可以充當輸入輸出流的轉換作用,Java中還有一個管道流可以完成相似的功能,但是其主要作用是用於不同線程間的通訊,下面就具體講一下管道流是如何實現的,以及相關例子。
值得註意的是,在JDK源碼註釋中提到了,通常使用一個管道輸出流關聯一個管道輸入流形成管道會話。通常輸入流和輸出流是不在一個線程中的,如果在同一個線程中使用,可能會造成死鎖。當無法從輸入流中讀取數據的時候,輸出流會被中斷。
2.PipedOutputStream
管道輸出流十分簡單,其繼承OutputStream,具體類結構如下:
其接受一個管道輸入流,調用connect方法,判斷通過後會重置相關數據並賦值。當然也可以直接new一個管道輸出流,再通過connect方法關聯。
兩個write方法都和輸出流沒有關系,是通過輸入流來進行操作的。
flush方法和close方法也是通過管道輸入流進行操作的。
整個輸入流的操作,都是通過輸入流完成的,所以接下來我們主要關註輸入流做了什麽。
3.PipedInputStream
這個管道輸入流的內容也不多:
上面就是主要的內容了,輸入流有一個buffer字節數組,構造函數有輸出流的時候做了兩件事情,一個是initPipe初始化這個buffer,默認大小1024個字節,另一個就是調用connect方法,connect方法實際上就是調用輸出流的connect,反過來修改了輸入流的相關字段。如果輸入流構造時沒有輸出流,就需要使用connect方法進行關聯。但是切記,不管是輸入流初始化和輸出流初始化,connect方法無論是通過什麽途徑,都只能調用一次,也就是兩個對象相互關聯只能發生一次。
下面我們關註一下輸出流write所調用輸入流的receive都做了些什麽:
管道流輸出的時候就會調用輸入管道流的receive方法,最終寫入buffer中。註意buffer寫完了就會重置寫入下標in。receive一個數組也是判斷buffer剩余空間是否足夠而已。read方法就是讀取這個數組中的內容了:
3.1 死鎖探究
之前說了,如果寫入和讀取這兩個操作是在同一線程可能會發生死鎖,這裏具體看下是如何死鎖的,測試代碼如下:
@Test public void test() throws IOException { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(pis); byte[] read = new byte[10]; byte[] write = new byte[100]; int count = 0; while(true) { pos.write(write); pis.read(read); System.out.println("完成寫入讀取次數:"+(++count)); } }
結果如下:
計數到11的時候就不進行下去了,很顯然死鎖了。這個是怎麽產生的呢?回顧上面源碼:
寫入數據時,調用了receive方法,獲得pis的對象鎖。寫完了數據就會釋放鎖了。read方法也需要獲取鎖,然後讀完了就釋放鎖。乍一看好像沒什麽問題啊,不應該產生死鎖的啊。但是問題出在write方法調用receive的一個方法awaitSpace上。因為寫入比讀取速度快,按照源代碼的做法就會造成in的值追上out,然後就一直等待輸入流使用,由於輸入流和輸出流在同一個線程,這裏就變成了一個死循環了,wait之後依舊是在等寫入消耗。寫入比讀取快也會造成wait。註釋上說的死鎖不知是否是這個樣子,如果是那麽說死鎖就感覺有點不恰當了。
3.2 寫入讀取操作說明
讀寫操作在不同線程的時候,數據從管道輸出流中寫入,調用管道輸入流的receive方法。裏面有一個緩存數組buffer,用於接收write方法寫入的字節。輸入流有兩個下標,一個in用於標記當前緩存到的字節數,一個out用於標記read方法讀取buffer的位置。寫入的時候,如果in==out,則暫停寫入,因為此時判斷寫入快過讀取,防止數據被覆蓋,這個時候寫入線程就會掛起,等待讀取線程讀取數據。讀取線程如果讀到in==out則認為寫入完成,讀取也就完成了。最初in是等於-1的,所以第一次讀取的時候,其也是一個掛起讀取線程等待寫入的過程。之後in只有在讀取線程讀完了所有寫入的時候才會為-1。下面這段在read()方法中,只有第一次讀取可能進入,因為如果有寫入的時候in就不會小於0。
問題來了,如果寫入線程掛起了,讀取線程讀到目前寫入位置,即in==out時,是判斷讀取完成的,這個時候會出現什麽現象呢?
@Test public void test2() throws IOException, InterruptedException { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(pis); CountDownLatch latch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { byte[] b = new byte[1025]; b[1024] = 1; try { pos.write(b , 0, 1022); Thread.sleep(2000); pos.write(b, 1022, b.length); pos.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } }).start(); new Thread(new Runnable() { @Override public void run() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] buf = new byte[1024]; int len = 0; try { byte b; while((b = (byte) pis.read()) != -1) { baos.write(b); } } catch (IOException e) { e.printStackTrace(); } byte[] result = baos.toByteArray(); System.out.println(result.length + ":" + Arrays.toString(result)); latch.countDown(); } }).start(); latch.await(); }
結果是:
會出現異常,且只能讀取到第一次寫入的1022個字節。這就會產生一個問題,在實際使用的時候就會發生先寫入一批數據,再寫入一批數據,但是可能讀取判斷第一批結束了的尷尬情況。有沒有什麽好的方法解決這個問題呢?至少我沒有找到什麽好辦法。使用管道流的時候最好一次寫入,並且初始化管道流的buffer大小最好是寫入的大小。其它的方法並不能完全防止這種現象發生,只是概率小一點而已,比如控制讀寫速度。如果真的要實現多次寫入並且要可靠,我能想到的辦法就只有,讀寫線程共用一個鎖,再加一個流結束標誌符了。但是這種方法也要萬分小心,在開始很容易產生死鎖,如果讀線程先獲取公共鎖,內部又獲取了管道流的鎖,即便釋放了管道流的鎖,寫線程也拿到外層的公共鎖。下面給一個demo,僅供參考,可能會有問題:
static boolean end = false; @Test public void test3() throws IOException, InterruptedException { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(pis); Object monitor = new Object(); CountDownLatch latch = new CountDownLatch(2); new Thread(new Runnable() { @Override public void run() { int count = 3; while(count-->0) { synchronized (monitor) { System.out.println(count); byte[] b = new byte[1022]; try { pos.write(b , 0, b.length); Thread.sleep(2000); pos.write(1); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } monitor.notifyAll(); try { monitor.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } end = true; try { pos.close(); } catch (IOException e) { e.printStackTrace(); } latch.countDown(); } }).start(); new Thread(new Runnable() { @Override public void run() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] buf = new byte[1024]; int len = 0; try { System.out.println("reading"); synchronized (monitor) { while (!end) { len = pis.read(buf); System.out.println("reading..."); baos.write(buf, 0, len); monitor.notifyAll(); try { monitor.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException e) { e.printStackTrace(); } byte[] result = baos.toByteArray(); System.out.println(result.length + ":" + Arrays.toString(result)); latch.countDown(); } }).start(); latch.await(); }
再次聲明,此代碼沒有經過仔細思考,只是提供一個思路,出現問題概不負責,使用管道流最好還是一次性寫入所有數據比較好。
Java之IO(八)PipedIutputStream和PipedOutputStream