1. 程式人生 > >Java之IO(八)PipedIutputStream和PipedOutputStream

Java之IO(八)PipedIutputStream和PipedOutputStream

gin lose 讀取數據 interrupt src link fin 字節 zed

  轉載請註明源出處:http://www.cnblogs.com/lighten/p/7056278.html

1.前言

  本章介紹Java的IO體系中最後一對字節流--管道流。之前在字節數組流的時候就說過,其可以充當輸入輸出流的轉換作用,Java中還有一個管道流可以完成相似的功能,但是其主要作用是用於不同線程間的通訊,下面就具體講一下管道流是如何實現的,以及相關例子。

  值得註意的是,在JDK源碼註釋中提到了,通常使用一個管道輸出流關聯一個管道輸入流形成管道會話。通常輸入流和輸出流是不在一個線程中的,如果在同一個線程中使用,可能會造成死鎖。當無法從輸入流中讀取數據的時候,輸出流會被中斷。

2.PipedOutputStream

  管道輸出流十分簡單,其繼承OutputStream,具體類結構如下:

技術分享

  其接受一個管道輸入流,調用connect方法,判斷通過後會重置相關數據並賦值。當然也可以直接new一個管道輸出流,再通過connect方法關聯。

技術分享

  兩個write方法都和輸出流沒有關系,是通過輸入流來進行操作的。

技術分享

技術分享

  flush方法和close方法也是通過管道輸入流進行操作的。

技術分享

技術分享

  整個輸入流的操作,都是通過輸入流完成的,所以接下來我們主要關註輸入流做了什麽。

3.PipedInputStream

  這個管道輸入流的內容也不多:

技術分享

  上面就是主要的內容了,輸入流有一個buffer字節數組,構造函數有輸出流的時候做了兩件事情,一個是initPipe初始化這個buffer,默認大小1024個字節,另一個就是調用connect方法,connect方法實際上就是調用輸出流的connect,反過來修改了輸入流的相關字段。如果輸入流構造時沒有輸出流,就需要使用connect方法進行關聯。但是切記,不管是輸入流初始化和輸出流初始化,connect方法無論是通過什麽途徑,都只能調用一次,也就是兩個對象相互關聯只能發生一次。

  下面我們關註一下輸出流write所調用輸入流的receive都做了些什麽:

技術分享

  管道流輸出的時候就會調用輸入管道流的receive方法,最終寫入buffer中。註意buffer寫完了就會重置寫入下標in。receive一個數組也是判斷buffer剩余空間是否足夠而已。read方法就是讀取這個數組中的內容了:

技術分享

3.1 死鎖探究

   之前說了,如果寫入和讀取這兩個操作是在同一線程可能會發生死鎖,這裏具體看下是如何死鎖的,測試代碼如下:

    @Test
	public void test() throws IOException {
		PipedInputStream pis = new PipedInputStream();
		PipedOutputStream pos = new PipedOutputStream(pis);
		byte[] read = new byte[10];
		byte[] write = new byte[100];
		int count = 0;
		while(true) {
			pos.write(write);
			pis.read(read);
			System.out.println("完成寫入讀取次數:"+(++count));
		}
	}

  結果如下:

技術分享

  計數到11的時候就不進行下去了,很顯然死鎖了。這個是怎麽產生的呢?回顧上面源碼:

技術分享

  寫入數據時,調用了receive方法,獲得pis的對象鎖。寫完了數據就會釋放鎖了。read方法也需要獲取鎖,然後讀完了就釋放鎖。乍一看好像沒什麽問題啊,不應該產生死鎖的啊。但是問題出在write方法調用receive的一個方法awaitSpace上。因為寫入比讀取速度快,按照源代碼的做法就會造成in的值追上out,然後就一直等待輸入流使用,由於輸入流和輸出流在同一個線程,這裏就變成了一個死循環了,wait之後依舊是在等寫入消耗。寫入比讀取快也會造成wait。註釋上說的死鎖不知是否是這個樣子,如果是那麽說死鎖就感覺有點不恰當了。

3.2 寫入讀取操作說明

  讀寫操作在不同線程的時候,數據從管道輸出流中寫入,調用管道輸入流的receive方法。裏面有一個緩存數組buffer,用於接收write方法寫入的字節。輸入流有兩個下標,一個in用於標記當前緩存到的字節數,一個out用於標記read方法讀取buffer的位置。寫入的時候,如果in==out,則暫停寫入,因為此時判斷寫入快過讀取,防止數據被覆蓋,這個時候寫入線程就會掛起,等待讀取線程讀取數據。讀取線程如果讀到in==out則認為寫入完成,讀取也就完成了。最初in是等於-1的,所以第一次讀取的時候,其也是一個掛起讀取線程等待寫入的過程。之後in只有在讀取線程讀完了所有寫入的時候才會為-1。下面這段在read()方法中,只有第一次讀取可能進入,因為如果有寫入的時候in就不會小於0。

技術分享

  問題來了,如果寫入線程掛起了,讀取線程讀到目前寫入位置,即in==out時,是判斷讀取完成的,這個時候會出現什麽現象呢?

    @Test
	public void test2() throws IOException, InterruptedException {
		PipedInputStream pis = new PipedInputStream();
		PipedOutputStream pos = new PipedOutputStream(pis);
		CountDownLatch latch = new CountDownLatch(2);
		new Thread(new Runnable() {
			@Override
			public void run() {
				byte[] b = new byte[1025];
				b[1024] = 1;
				try {
					pos.write(b , 0, 1022);
					Thread.sleep(2000);
					pos.write(b, 1022, b.length);
					pos.close();
				} catch (IOException e) {
					e.printStackTrace();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} finally {
					latch.countDown();
				}
			}
		}).start();
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				ByteArrayOutputStream baos = new ByteArrayOutputStream();
                byte[] buf = new byte[1024];
                int len = 0;
                try {
                	byte b;
                	while((b = (byte) pis.read()) != -1) {
                		baos.write(b);
                	}
		} catch (IOException e) {
			e.printStackTrace();
		}
                byte[] result = baos.toByteArray();
                System.out.println(result.length + ":" + Arrays.toString(result));
                latch.countDown();
			}
		}).start();
		latch.await();
	}

  結果是:

技術分享

  會出現異常,且只能讀取到第一次寫入的1022個字節。這就會產生一個問題,在實際使用的時候就會發生先寫入一批數據,再寫入一批數據,但是可能讀取判斷第一批結束了的尷尬情況。有沒有什麽好的方法解決這個問題呢?至少我沒有找到什麽好辦法。使用管道流的時候最好一次寫入,並且初始化管道流的buffer大小最好是寫入的大小。其它的方法並不能完全防止這種現象發生,只是概率小一點而已,比如控制讀寫速度。如果真的要實現多次寫入並且要可靠,我能想到的辦法就只有,讀寫線程共用一個鎖,再加一個流結束標誌符了。但是這種方法也要萬分小心,在開始很容易產生死鎖,如果讀線程先獲取公共鎖,內部又獲取了管道流的鎖,即便釋放了管道流的鎖,寫線程也拿到外層的公共鎖。下面給一個demo,僅供參考,可能會有問題:

        static boolean end = false;
	@Test
	public void test3() throws IOException, InterruptedException {
		PipedInputStream pis = new PipedInputStream();
		PipedOutputStream pos = new PipedOutputStream(pis);
		Object monitor = new Object();
		CountDownLatch latch = new CountDownLatch(2);
		new Thread(new Runnable() {
			@Override
			public void run() {
				int count = 3;
				while(count-->0) {
					synchronized (monitor) {
						System.out.println(count);
						byte[] b = new byte[1022];
						try {
							pos.write(b , 0, b.length);
							Thread.sleep(2000);
							pos.write(1);
						} catch (IOException e) {
							e.printStackTrace();
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						monitor.notifyAll();
						try {
							monitor.wait(1000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
				end = true;
				try {
					pos.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
				latch.countDown();
			}
		}).start();
		
		new Thread(new Runnable() {
			@Override
			public void run() {
				ByteArrayOutputStream baos = new ByteArrayOutputStream();
                byte[] buf = new byte[1024];
                int len = 0;
                try {
                	System.out.println("reading");
                	synchronized (monitor) {
                		while (!end) {
                			len = pis.read(buf);
    						System.out.println("reading...");
    					    baos.write(buf, 0, len);
    					    monitor.notifyAll();
							try {
								monitor.wait(1000);
							} catch (InterruptedException e) {
								e.printStackTrace();
							}
    					}
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
                byte[] result = baos.toByteArray();
                System.out.println(result.length + ":" + Arrays.toString(result));
                latch.countDown();
			}
		}).start();
		latch.await();
	}

技術分享

  再次聲明,此代碼沒有經過仔細思考,只是提供一個思路,出現問題概不負責,使用管道流最好還是一次性寫入所有數據比較好。

  

Java之IO(八)PipedIutputStream和PipedOutputStream