1. 程式人生 > >Java多執行緒學習筆記13之執行緒間通訊

Java多執行緒學習筆記13之執行緒間通訊

詳細程式碼見:github程式碼地址

 

本節內容:

1)  生產者消費者模型

    多個生產者和多個消費者: 操作值假死及解決

    多個生產者和多個消費者: 操作棧假死及解決

2) 通過管道進行執行緒間通訊

    執行緒間常用的通訊方式介紹

    PipedInputStream和PipedOutputStream位元組流/PipedReader和PipedWriter字元流

    JDK英文文件翻譯及方法翻譯介紹

    PipedInputStream/PipedOutputStream和PipedReader/PipedWriter使用

 

(2) 多個生產者和多個消費者: 操作值-假死

假死:

"假死"現象就是執行緒進入WAITING等待狀態。如果全部執行緒都進入WAITING狀態,則程式就
不在執行任何業務功能了,整個專案呈停止狀態。這在使用生產者和消費者模式中經常遇到的。


舉個例子:

package chapter03.section1.thread_3_1_11.project_2_p_c_allWait;

public class ValueObject {
	public static String value = "";
}


package chapter03.section1.thread_3_1_11.project_2_p_c_allWait;

//生產者
public class P {
	
	private String lock;
	
	public P(String lock) {
		super();
		this.lock = lock;
	}
	
	public void setValue() {
		try {
			synchronized(lock) {
				while(!ValueObject.value.equals("")) {
					System.out.println("生產者 "  
							+ Thread.currentThread().getName() + " WAITING了*");
					lock.wait();
				} 
				System.out.println("生產者 " + Thread.currentThread().getName()
						+ " RUNNABLE了");
				String value = System.currentTimeMillis() + "_"
						+ System.nanoTime();
				ValueObject.value = value;
				lock.notify();
//				lock.notifyAll();
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_2_p_c_allWait;

//消費者
public class C {
	private String lock;
	
	public C(String lock) {
		super();
		this.lock = lock;
	}
	
	public void getValue() {
		try {
			synchronized(lock) {
				while(ValueObject.value.equals("")) {
					//消費者等待生產者生產
					System.out.println("消費者 " 
							+ Thread.currentThread().getName() + " WAITING了#");
					lock.wait();
				}
				System.out.println("消費者 " + Thread.currentThread().getName()
						+ " RUNNABLE了");
				ValueObject.value = "";
				lock.notify();
//				lock.notifyAll();
			}
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_2_p_c_allWait;

public class ThreadP extends Thread{
	
	private P p;
	
	public ThreadP(P p) {
		super();
		this.p = p;
	}
	
	@Override
	public void run() {
		while(true) {
			p.setValue();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_2_p_c_allWait;

public class ThreadC extends Thread{
	private C r;
	
	public ThreadC(C r) {
		super();
		this.r = r;
	}
	
	@Override
	public void run() {
		while(true) { 
			r.getValue();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_2_p_c_allWait;

public class Run {
	public static void main(String[] args) throws InterruptedException{
		
		String lock = new String("");
		P p =new P(lock);
		C r = new C(lock);
		
		ThreadP[] pThread = new ThreadP[2];
		ThreadC[] rThread = new ThreadC[2];
		
		for(int i = 0; i < 2; i++) {
			pThread[i] = new ThreadP(p);
			pThread[i].setName("生產者" + (i + 1));
			
			rThread[i] = new ThreadC(r);
			rThread[i].setName("消費者" + (i + 1));
			
			pThread[i].start();
			rThread[i].start();
		}
		
		Thread.sleep(5000);
		/**
		 * Thread.currentThread().getThreadGroup()返回當前執行緒的所屬執行緒組
		 * 當前所有執行緒包括main執行緒都屬於main執行緒組,main執行緒組屬於system執行緒組
		 * ThreadGroup類的activeCount()方法返回當前執行緒組的子執行緒組和active執行緒數量
		 * public int enumerate(Thread list[])方法用來將執行緒組中的內容拷貝進去
		 */
//		System.out.println(Thread.currentThread().getThreadGroup().getParent().getName());
		Thread[] threadArray = new Thread[Thread.currentThread()
		                                  .getThreadGroup().activeCount()];
		Thread.currentThread().getThreadGroup().enumerate(threadArray);
		
		for(Thread thread : threadArray) {
			System.out.println(thread.getName() + " " 
					+ thread.getState());
		}
		
	}
}
/*
result:
.......................
消費者 消費者1 RUNNABLE了
消費者 消費者1 WAITING了#
消費者 消費者2 WAITING了#
生產者 生產者2 RUNNABLE了
生產者 生產者2 WAITING了*
生產者 生產者1 WAITING了*
system
main RUNNABLE
生產者1 WAITING
消費者1 WAITING
生產者2 WAITING
消費者2 WAITING
*/

結果分析:
可以看到最後所有的執行緒都呈假死狀態。雖然使用了wait/notify通訊,但不保證nofity喚醒
的是異類,也許是同類。
比如"生產者"喚醒"生產者",或"消費者"喚醒"消費者"這樣的情況。最後會導致所有的程式都
執行不下去。notify()喚醒是由執行緒排程器隨機挑選出一個呈wait狀態的執行緒,對其發出通
知notify,並使它等待獲取該物件的物件鎖。


如何解決假死問題?
假死的原因是因為喚醒的是同類的執行緒,只要不光喚醒同類執行緒,也包括異類就可以了,因此
將P.java和C.java中的notify()改成notifyAll()方法

 

(3) 一個生產者一個消費者: 操作棧


使生產者向堆疊List物件中放入資料,使消費者從List堆疊中取出資料。List最大容量是1
舉例如下:

package chapter03.section1.thread_3_1_11.project_3_stack_1;

import java.util.ArrayList;
import java.util.List;

public class MyStack {
	private List<String> list = new ArrayList<>();
	
	synchronized public void push() {
		try {
			if(list.size() == 1) {
				this.wait();
			}
			list.add("anyString=" + Math.random());
			this.notify(); 
			System.out.println("push=" + list.size());
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	synchronized public String pop(){
		String returnValue = "";
		try {
			if(list.size() == 0) {
				System.out.println("pop操作中的: "
						+ Thread.currentThread().getName() + " 執行緒呈wait狀態");
				this.wait();
			}
			returnValue = "" + list.get(0);
			list.remove(0);
			this.notify();
			System.out.println("pop=" + list.size());
		} catch (InterruptedException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
		return returnValue;
	}
}


package chapter03.section1.thread_3_1_11.project_3_stack_1;

public class P {

	private MyStack myStack;

	public P(MyStack myStack) {
		super();
		this.myStack = myStack;
	}

	public void pushService() {
		myStack.push();
	}
}


package chapter03.section1.thread_3_1_11.project_3_stack_1;

public class C {

	private MyStack myStack;

	public C(MyStack myStack) {
		super();
		this.myStack = myStack;
	}

	public void popService() {
		System.out.println("pop=" + myStack.pop());
	}
}


package chapter03.section1.thread_3_1_11.project_3_stack_1;

public class P_Thread extends Thread {

	private P p;

	public P_Thread(P p) {
		super();
		this.p = p;
	}

	@Override
	public void run() {
		while (true) {
			p.pushService();
		}
	}

}


package chapter03.section1.thread_3_1_11.project_3_stack_1;

public class C_Thread extends Thread {

	private C r;

	public C_Thread(C r) {
		super();
		this.r = r;
	}

	@Override
	public void run() {
		while (true) {
			r.popService();
		}
	}
}


package chapter03.section1.thread_3_1_11.project_3_stack_1;

public class Run {
	public static void main(String[] args) {
		MyStack myStack = new MyStack();

		P p = new P(myStack);
		C r = new C(myStack);

		P_Thread pThread = new P_Thread(p);
		C_Thread rThread = new C_Thread(r);
		pThread.start();
		rThread.start();
	}
}
/*
result:
.................................
pop=anyString=0.28538502891582784
push=1
pop=0
pop=anyString=0.7583543613696966
push=1
pop=0
pop=anyString=0.1512771979278016
push=1
pop=0
 */

 

(4) 一個生產者多個消費者-操作棧: 解決wait條件改變與假死
使用一個生產者向堆疊List物件中放入資料,而多個消費者從List堆疊中取出資料
舉個例子:
我們直接使用上方的例子,只需改變Run類即可

package chapter03.section1.thread_3_1_11.project_4_stack_2_old;

public class Run {
	public static void main(String[] args) throws InterruptedException {
		MyStack myStack = new MyStack();

		P p = new P(myStack);

		C r1 = new C(myStack);
		C r2 = new C(myStack);
		C r3 = new C(myStack);
		C r4 = new C(myStack);
		C r5 = new C(myStack);

		P_Thread pThread = new P_Thread(p);
		pThread.start();

		C_Thread cThread1 = new C_Thread(r1);
		C_Thread cThread2 = new C_Thread(r2);
		C_Thread cThread3 = new C_Thread(r3);
		C_Thread cThread4 = new C_Thread(r4);
		C_Thread cThread5 = new C_Thread(r5);
		cThread1.start();
		cThread2.start();
		cThread3.start();
		cThread4.start();
		cThread5.start();
	}
}
/*
result:
push=1
pop=0
pop=anyString=0.2277061445557783
pop操作中的: Thread-4 執行緒呈wait狀態
pop操作中的: Thread-1 執行緒呈wait狀態
pop操作中的: Thread-3 執行緒呈wait狀態
pop操作中的: Thread-2 執行緒呈wait狀態
pop操作中的: Thread-5 執行緒呈wait狀態
push=1
pop=0
pop=anyString=0.919935358446316
pop操作中的: Thread-4 執行緒呈wait狀態
Exception in thread "Thread-1" java.lang.IndexOutOfBoundsException: Index 0 out-of-bounds for length 0
	at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
	at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
	at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
	at java.base/java.util.Objects.checkIndex(Unknown Source)
	at java.base/java.util.ArrayList.get(Unknown Source)
	at chapter03.section1.thread_3_1_11.project_4_stack_2_old.MyStack.pop(MyStack.java:31)
	at chapter03.section1.thread_3_1_11.project_4_stack_2_old.C.popService(C.java:13)
	at chapter03.section1.thread_3_1_11.project_4_stack_2_old.C_Thread.run(C_Thread.java:15)
可以看到還沒有進行push操作,Thread-1被喚醒繼續執行remove(0)方法,因此而陣列越界異常
 */

結果分析:
為了解決如上問題,我們可以將if(list.size() == 1)換成while(list.size()==1),並
且為了避免假死,將notify()方法改成notifyAll()方法 


(5) 多個生產者多個消費者-操作棧
只需將Run類作如下改變即可
 

package chapter03.section1.thread_3_1_11.project_8_stack_4;

public class Run {
	public static void main(String[] args) throws InterruptedException {
		MyStack myStack = new MyStack();

		P p1 = new P(myStack);
		P p2 = new P(myStack);
		P p3 = new P(myStack);
		P p4 = new P(myStack);
		P p5 = new P(myStack);
		P p6 = new P(myStack);

		P_Thread pThread1 = new P_Thread(p1);
		P_Thread pThread2 = new P_Thread(p2);
		P_Thread pThread3 = new P_Thread(p3);
		P_Thread pThread4 = new P_Thread(p4);
		P_Thread pThread5 = new P_Thread(p5);
		P_Thread pThread6 = new P_Thread(p6);
		pThread1.start();
		pThread2.start();
		pThread3.start();
		pThread4.start();
		pThread5.start();
		pThread6.start();

		C r1 = new C(myStack);
		C r2 = new C(myStack);
		C r3 = new C(myStack);
		C r4 = new C(myStack);
		C r5 = new C(myStack);
		C r6 = new C(myStack);
		C r7 = new C(myStack);
		C r8 = new C(myStack);

		C_Thread cThread1 = new C_Thread(r1);
		C_Thread cThread2 = new C_Thread(r2);
		C_Thread cThread3 = new C_Thread(r3);
		C_Thread cThread4 = new C_Thread(r4);
		C_Thread cThread5 = new C_Thread(r5);
		C_Thread cThread6 = new C_Thread(r6);
		C_Thread cThread7 = new C_Thread(r7);
		C_Thread cThread8 = new C_Thread(r8);

		cThread1.start();
		cThread2.start();
		cThread3.start();
		cThread4.start();
		cThread5.start();
		cThread6.start();
		cThread7.start();
		cThread8.start();
	}
}
/*
result:
..................................
pop操作中的: Thread-8 執行緒呈wait狀態
pop操作中的: Thread-6 執行緒呈wait狀態
push=1
pop=0
pop=anyString=0.030452886723291828
push=1
pop=0
pop=anyString=0.8499823296867753
pop操作中的: Thread-6 執行緒呈wait狀態
pop操作中的: Thread-8 執行緒呈wait狀態
..................................
*/

 

3. 通過管道進行執行緒間通訊
我們知道執行緒程序之間通訊一般都是利用共享記憶體(全域性變數等)、訊息佇列,redis訂閱釋出
機制快取等,以及管道等方式
。下面我們來介紹Java語言中執行緒利用管道進行通訊
Java語言中提供了各種各樣的輸入輸出流,讓我們可以很方便地對資料進行操作,其中的
pipeStream管道流是一種特殊的流,用於在不同執行緒間直接傳輸資料。一個執行緒傳送資料到
輸出管道,另一個執行緒從輸入管道中讀資料,通過使用管道,實現不同執行緒間的通訊,而無
須藉助於類似臨時檔案之類的東西

在Java的JDK中提供了4個類來使執行緒間可以進行通訊:

1) PipedInputStream和PipedOutputStream 位元組流 
    PipedInputStream繼承自抽象類InputStream,而PipedOutputStream繼承自抽象類
OutputStream
2) PipedReader和PipedWriter 字元流
     PipedReader和PipedWriter類分別繼承抽象類Reader和Writer.

(1) 字元流
PipedInputStream/PipedOutputStream文件翻譯:
1)PipedInputStream類翻譯:

PipedInputStream類介紹
public class PipedInputStream
extends InputStream
A piped input stream should be connected to a piped output stream; the piped input stream then 
provides whatever data bytes are written to the piped output stream. Typically, data is read 
from a PipedInputStream object by one thread and data is written to the corresponding 
PipedOutputStream by some other thread. Attempting to use both objects from a single thread 
is not recommended, as it may deadlock the thread. The piped input stream contains a buffer, 
decoupling read operations from write operations, within limits. A pipe is said to be  broken 
if a thread that was providing data bytes to the connected piped output stream is no longer alive.
PipedInpustream繼承InputStream抽象類
一個管道輸入流應該連線到一個管道輸出流;管道輸入流提供寫入管道輸出流的任何資料位元組。通常,一個執行緒從PipedInput
Stream物件讀取資料,另一個執行緒將資料寫入相應的PipedOutputStream。不建議在單個執行緒使用這兩個物件,因為這會導致
執行緒死鎖。管道輸入流包含一個緩衝區,在一定範圍內將讀操作與寫操作分離。如果向連線的管道輸出流提供資料位元組的執行緒
不再是活動的,則稱管道斷開。



constructors建構函式:
PipedInputStream():
	Creates a PipedInputStream so that it is not yet connected. It must be connected to a 
PipedOutputStream before being used.
	建立一個還未連線的PipedInputStream物件。在使用之前,必須連線到PipedOutputStream.

PipedInputStream(int pipeSize):
	Creates a PipedInputStream so that it is not yet connected and uses the specified pipe 
size for the pipe's buffer. It must be connected to a PipedOutputStream before being used.
	建立一個尚未連線的PipedInputStream物件,併為管道的緩衝區指定大小。



read():
public int read​()
         throws IOException
Reads the next byte of data from this piped input stream. The value byte is returned as an int 
in the range 0 to 255. This method blocks until input data is available, the end of the stream
is detected, or an exception is thrown.
從管道輸入流中讀取下一個位元組的資料。值位元組以int形式返回,範圍是0-255.此方法將阻塞,知道輸入
資料可用,檢測到流的結束或丟擲異常為止。
Specified by:
read in class InputStream
Returns:
the next byte of data, or -1 if the end of the stream is reached.
資料的下一個位元組,如果結束為-1
Throws:
IOException - if the pipe is unconnected, broken, closed, or if an I/O error occurs.
IOException - 如果管道沒有連線、破損、關閉、或者I/O錯誤發生了



read(byte[] b, int off, int len):
public int read​(byte[] b,
                int off,
                int len)
         throws IOException
Reads up to len bytes of data from this piped input stream into an array of bytes. Less than len 
bytes will be read if the end of the data stream is reached or if len exceeds the pipe's buffer 
size. If len is zero, then no bytes are read and 0 is returned; otherwise, the method blocks 
until at least 1 byte of input is available, end of the stream has been detected, or an exception 
is thrown.
從這個管道輸入流中讀取len位元組資料,然後賦值到一個位元組陣列中。如果到達資料流的末尾或者len長度超過管道緩衝區的
大小,那麼將讀取小於len位元組資料。如果len是0,則不讀取位元組,返回0;否則,該方法將阻塞,直到至少1位元組的輸入可用,
流的末尾被檢測到,或者丟擲異常。
Overrides:
read in class InputStream
Parameters:
b - the buffer into which the data is read.
b 位元組緩衝區,讀取接收到的資料
off - the start offset in the destination array b
b中開始賦值的偏移量
len - the maximum number of bytes read.
讀取的最大位元組數
Returns:
the total number of bytes read into the buffer, or -1 if there is no more data because the end of 
the stream has been reached.
從緩衝區讀取的總位元組數,如果到達資料流末尾沒有更多的資料,返回-1
Throws:
NullPointerException - If b is null.
b是null,丟擲NullPointerException空指標異常
IndexOutOfBoundsException - If off is negative, len is negative, or len is greater than b.length - off
陣列越界異常- 如果off是負數,len是負數,或者len比b大
IOException - if the pipe is broken, unconnected, closed, or if an I/O error occurs.



connect(PipedOutputStream src):
public void connect​(PipedOutputStream src)
             throws IOException
Causes this piped input stream to be connected to the piped output stream src. If this object is 
already connected to some other piped output stream, an IOException is thrown.
使管道輸入流連線到管道輸出流src.如果這個物件已經連線到其他的輸出流,則丟擲IOException
If src is an unconnected piped output stream and snk is an unconnected piped input stream, they 
may be connected by either the call:
如果src是一個未連線的管道輸出流,而snk是一個未連線的管道輸入流,則它們可通過以下任一方式連線

snk.connect(src) 
or the call:

src.connect(snk) 
The two calls have the same effect. 兩個呼叫作用一樣

Parameters:
src - The piped output stream to connect to.
src是連線的管道輸出流
Throws:
IOException - if an I/O error occurs.



close():
public void close​()
           throws IOException
Closes this piped input stream and releases any system resources associated with the stream.
關閉這個管道輸入流並且釋放與這個流有關的任何系統資源
Throws:
IOException - if an I/O error occurs.

 

2) PipedOutputStream類翻譯

PipedOutputStream類介紹
public class PipedOutputStream
extends OutputStream
A piped output stream can be connected to a piped input stream to create a communications pipe. 
The piped output stream is the sending end of the pipe. Typically, data is written to a 
PipedOutputStream object by one thread and data is read from the connected PipedInputStream 
by some other thread. Attempting to use both objects from a single thread is not recommended 
as it may deadlock the thread. The pipe is said to be  broken if a thread that was reading 
data bytes from the connected piped input stream is no longer alive.
一個管道輸出流可以連線到一個管道輸入流以建立通訊管道。管道輸出流是管道的傳送端。通常,資料由一個執行緒寫入
到PipedOutputStream物件中,資料通過其它執行緒從連線的PipedOutputStream物件中讀取。不建議在單執行緒中使用
這兩個物件,因為它可能會導致執行緒死鎖。如果從連線的管道輸入流中讀取資料位元組的執行緒不在活動,則管道
將被斷開。

constructors建構函式:
PipedOutputStream():
	Creates a piped output stream that is not yet connected to a piped input stream. It must
be connected to a piped input stream, either by the receiver or the sender, before being used.
建立尚未連線到管道輸入流的管道輸出流物件。在使用之前,它必須通過接收方或傳送方連線到管道輸入流。

PipedOutputStream(PipedInputStream snk):
	Creates a piped output stream connected to the specified piped input stream. Data bytes 
written to this stream will then be available as input from snk.
建立連線到特定管道輸入流的管道輸出流物件。資料位元組將被寫入這個流以便從snk輸入
Parameters:
snk - The piped input stream to connect to.
Throws:
IOException - if an I/O error occurs.



write(int b):
public void write​(int b)
           throws IOException
Writes the specified byte to the piped output stream.
將指定的位元組寫入管道輸出流

Implements the write method of OutputStream.

Specified by:
write in class OutputStream
Parameters:
b - the byte to be written.
Throws:
IOException - if the pipe is broken, unconnected, closed, or if an I/O error occurs.



write(byte[] b, int off, int len):
public void write​(byte[] b,
                  int off,
                  int len)
           throws IOException
Writes len bytes from the specified byte array starting at offset off to this piped output 
stream. This method blocks until all the bytes are written to the output stream.
從偏移位置開始的指定位元組陣列中寫入len位元組到這個管道輸出流。此方法將阻塞,直到所有位元組都寫入輸出流。
Overrides:
write in class OutputStream
Parameters:
b - the data.
off - the start offset in the data.
len - the number of bytes to write.
Throws:
IOException - if the pipe is broken, unconnected, closed, or if an I/O error occurs.



connect(PipedInputStream snk):
public void connect​(PipedInputStream snk)
             throws IOException
Connects this piped output stream to a receiver. If this object is already connected to some 
other piped input stream, an IOException is thrown.
If snk is an unconnected piped input stream and src is an unconnected piped output stream, 
they may be connected by either the call:
此用法和PipedInputStream類似

 src.connect(snk)
or the call:
 snk.connect(src)
The two calls have the same effect.
Parameters:
snk - the piped input stream to connect to.
Throws:
IOException - if an I/O error occurs.



close():
public void close​()
           throws IOException
Closes this piped output stream and releases any system resources associated with this stream. 
This stream may no longer be used for writing bytes.
關閉管道輸出流並釋放與此流關聯的任何系統資源。
此流可能不再用於寫入位元組。
Throws:
IOException - if an I/O error occurs.

 

使用例子:

package chapter03.section1.thread_3_1_12.project_1_pipeInputOutput;

import java.io.IOException;
import java.io.PipedOutputStream;

public class WriteData {
	
	public void writeMethod(PipedOutputStream out) {
		try {
			System.out.println("write :");
			for(int i = 0; i < 50; i++) {
				String outData = "" + (i + 1);
				out.write(outData.getBytes());
				System.out.print(outData);
			}
			System.out.println();
			out.close();
		} catch (IOException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_12.project_1_pipeInputOutput;

import java.io.IOException;
import java.io.PipedInputStream;

public class ReadData {
	public void readMethod(PipedInputStream input) {
		try {
			System.out.println("read : ");
			byte[] byteArray = new byte[20];
			int readLength = input.read(byteArray);
			//返回從緩衝區讀取的位元組數
			while(readLength != -1) {
				String newData = new String(byteArray, 0, readLength);
				System.out.print(newData);
//				readLength = input.read(byteArray, 0, byteArray.length);
				readLength = input.read(byteArray);//一次最多讀取20個位元組的資料
			}
			System.out.println();
			input.close();
		} catch (IOException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_12.project_1_pipeInputOutput;

import java.io.PipedOutputStream;

public class ThreadWrite extends Thread{
	
	private WriteData write;
	private PipedOutputStream out;
	
	public ThreadWrite(WriteData write, PipedOutputStream out) {
		this.write = write;
		this.out = out;
	}
	
	@Override
	public void run() {
		write.writeMethod(out);
	}
}


package chapter03.section1.thread_3_1_12.project_1_pipeInputOutput;

import java.io.PipedInputStream;

public class ThreadRead extends Thread{
	private ReadData read;
	private PipedInputStream input;
	
	public ThreadRead(ReadData read, PipedInputStream input) {
		this.read = read;
		this.input = input;
	}
	
	@Override
	public void run() {
		read.readMethod(input);
	}
}


package chapter03.section1.thread_3_1_12.project_1_pipeInputOutput;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;

public class Run {
	public static void main(String[] args) {
		try {
			WriteData writeData = new WriteData();
			ReadData readData = new ReadData();
			
			PipedInputStream inputStream = new PipedInputStream();
			PipedOutputStream outputStream = new PipedOutputStream();
			
			//inputStream.connect(outputStream); 兩者作用一樣
			outputStream.connect(inputStream);
			
			ThreadRead threadRead = new ThreadRead(readData, inputStream);
			threadRead.start();
			
			Thread.sleep(2000);
			
			ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
			threadWrite.start();	
		} catch (IOException e) {
			// TODO: handle exception
			e.printStackTrace();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

結果分析:
讀取執行緒先啟動,由於當時沒有資料被寫入,所以執行緒阻塞在int readLength = in.read
(byteArray);直到有資料被寫入,才繼續向下執行。


(2) 字元流

PipedWriter/PipedReader文件翻譯:
1)PidedWrite翻譯

PidedWriter類介紹:
public class PipedWriter
extends Writer
Piped character-output streams.
管道字元輸出流


constructors:
PipedWriter():
Creates a piped writer that is not yet connected to a piped reader. It must be connected to 
a piped reader, either by the receiver or the sender, before being used.
建立一個尚未連線到管道輸入流的管道輸出流物件。在使用之前,它必須連線到一個管道輸入流,從接收端或者傳送端
連線均可

PipedWriter(PipedReader snk) throws IOException:
Connects this piped writer to a receiver. If this object is already connected to some other
piped reader, an IOException is thrown.
If snk is an unconnected piped reader and src is an unconnected piped writer, they may be 
connected by either the call:
將這個管道輸出流物件連線到接收端。如果這個物件已經連線到了其他的管道輸出流物件,丟擲IOException異常
實際上建構函式主要工作就是對snk的一些屬性做了初始化與合法判斷
如果snk與src都沒有連線,他們可以通過下面任一方法連線

 src.connect(snk)
or the call:
 snk.connect(src)
The two calls have the same effect.
Parameters:
snk - the piped reader to connect to.
Throws:
IOException - if an I/O error occurs.



connect(PipedReader snk):
public void connect​(PipedReader snk)
             throws IOException
Connects this piped writer to a receiver. If this object is already connected to some other 
piped reader, an IOException is thrown.
If snk is an unconnected piped reader and src is an unconnected piped writer, they may be 
connected by either the call:
將管道輸出物件與接收端連線。如果此物件已經連線了其他的管道輸入物件,丟擲IOException異常

 src.connect(snk)
or the call:
 snk.connect(src)
The two calls have the same effect.
Parameters:
snk - the piped reader to connect to.
Throws:
IOException - if an I/O error occurs.



write(int c):
public void write​(int c)
           throws IOException
Writes the specified char to the piped output stream. If a thread was reading data characters 
from the connected piped input stream, but the thread is no longer alive, then an IOException 
is thrown.
將指定的字元寫入管道輸出流。如果執行緒正在從連線的管道輸出流讀取資料字元,但執行緒不在存活isAlive()方法返回
false,那麼丟擲IOException異常
Implements the write method of Writer.

Overrides:
write in class Writer 
Parameters:
c - the char to be written. 寫入的字元
Throws:
IOException - if the pipe is broken, unconnected, closed or an I/O error occurs.



write(char[] cbuf, int off, int len):
public void write​(char[] cbuf,
                  int off,
                  int len)
           throws IOException
Writes len characters from the specified character array starting at offset off to this piped 
output stream. This method blocks until all the characters are written to the output stream. 
If a thread was reading data characters from the connected piped input stream, but the thread 
is no longer alive, then an IOException is thrown.
從偏移位置開始的指定字元陣列中寫入len字元到管道輸出流。此方法將阻塞,直到所有的字元都寫入輸出流。如果
一個執行緒正在從連線的管道輸入流中讀取資料字元,但該執行緒不再是活動的,則丟擲IOException
Specified by:
write in class Writer
Parameters:
cbuf - the data.
off - the start offset in the data.
len - the number of characters to write.
Throws:
IndexOutOfBoundsException - If off is negative, or len is negative, or off + len is negative 
or greater than the length of the given array
IOException - if the pipe is broken, unconnected, closed or an I/O error occurs.



close():
public void close​()
           throws IOException
Closes this piped output stream and releases any system resources associated with this stream. 
This stream may no longer be used for writing characters.
關閉此管道輸出流並釋放於此流關聯的任何系統資源。此流可能不再用於寫入字元。
Throws:
IOException - if an I/O error occurs.

 

2)PipedReader翻譯

PipedReader類介紹
extends Reader
Piped character-input streams.
管道字元輸入流

constructors:
PipedReader():
public PipedReader​()
Creates a PipedReader so that it is not yet connected. It must be connected to a PipedWriter 
before being used.


PipedReader(int pipeSize):
public PipedReader​(int pipeSize)
Creates a PipedReader so that it is not yet connected and uses the specified pipe size for 
the pipe's buffer. It must be connected to a PipedWriter before being used.
建立尚未連線的PipedReader物件,使用指定緩衝區大小。在使用之前必須要連線到一個PipedWriter物件
Parameters:
pipeSize - the size of the pipe's buffer.
Throws:
IllegalArgumentException - if pipeSize <= 0.


PipedReader(PipedWriter src):
public PipedReader​(PipedWriter src)
            throws IOException
Creates a PipedReader so that it is connected to the piped writer src. Data written to src 
will then be available as input from this stream.
建立一個PipedReader物件,使其連線到管道輸出流物件src.寫入資料的src將會作為此流的輸入提供者

Parameters:
src - the stream to connect to.
Throws:
IOException - if an I/O error occurs.


connect(PipedWriter src):
public void connect​(PipedWriter src)
             throws IOException
Causes this piped reader to be connected to the piped writer src. If this object is already 
connected to some other piped writer, an IOException is thrown.
If src is an unconnected piped writer and snk is an unconnected piped reader, they may be 
connected by either the call:

snk.connect(src) 
or the call:

src.connect(snk) 
The two calls have the same effect.

Parameters:
src - The piped writer to connect to.
Throws:
IOException - if an I/O error occurs.


read():
public int read​()
         throws IOException
Reads the next character of data from this piped stream. If no character is available 
because the end of the stream has been reached, the value -1 is returned. This method 
blocks until input data is available, the end of the stream is detected, or an exception is thrown.
讀取此管道流中的資料的下一個字元。如果沒有字元可用,因為已到達流的末端,則返回值-1.此番發將阻塞,直到
輸入資料可用、檢測到流的末尾或引發異常。

Overrides:
read in class Reader
Returns:
the next character of data, or -1 if the end of the stream is reached.
Throws:
IOException - if the pipe is broken, unconnected, closed, or an I/O error occurs.


read(char[] cbuf, int off, int len):
public int read​(char[] cbuf,
                int off,
                int len)
         throws IOException
Reads up to len characters of data from this piped stream into an array of characters. 
Less than len characters will be read if the end of the data stream is reached or if len 
exceeds the pipe's buffer size. This method blocks until at least one character of input 
is available.
Specified by:
read in class Reader
Parameters:
cbuf - the buffer into which the data is read.
off - the start offset of the data.
len - the maximum number of characters read.
Returns:
the total number of characters read into the buffer, or -1 if there is no more data 
because the end of the stream has been reached.
Throws:
IOException - if the pipe is broken, unconnected, closed, or an I/O error occurs.
IndexOutOfBoundsException - If an I/O error occurs


close():
public void close​()
           throws IOException
Closes this piped stream and releases any system resources associated with the stream.
Throws:
IOException - if an I/O error occurs.

 

兩個執行緒中通過管道流進行字元資料的傳輸
舉個例子:
 

package chapter03.section1.thread_3_1_13.project_1_pipeReaderWriter;

import java.io.IOException;
import java.io.PipedReader;

public class ReadData {
	public void readMethod(PipedReader input) {
		try {
			System.out.println("read :");
			char[] byteArray = new char[20];
			int readLength = input.read(byteArray);
			while(readLength != -1) {
				String newData = new String(byteArray, 0, readLength);
				System.out.print(newData);
				readLength = input.read(byteArray);
			}
			System.out.println();
			input.close();
		} catch (IOException e) {
			// TODO: handle exceptione
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_13.project_1_pipeReaderWriter;

import java.io.IOException;
import java.io.PipedWriter;

public class WriteData {
	
	public void writeMethod(PipedWriter out) {
		try {
			System.out.println("write :");
			for(int i = 0; i < 50; i++) {
				String outData = "" + (i + 1);
				out.write(outData);
				System.out.print(outData);
			}
			System.out.println();
			out.close();
		} catch (IOException e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
}


package chapter03.section1.thread_3_1_13.project_1_pipeReaderWriter;

import java.io.PipedReader;

public class ThreadRead extends Thread{
	
	private ReadData read;
	private PipedReader in;
	
	public ThreadRead(ReadData read, PipedReader in) {
		super();
		this.read = read;
		this.in = in;
	}
	
	@Override
	public void run() {
		read.readMethod(in);
	}
}


package chapter03.section1.thread_3_1_13.project_1_pipeReaderWriter;

import java.io.PipedWriter;

public class ThreadWrite extends Thread {
	
	private WriteData write;
	private PipedWriter out;
	
	public ThreadWrite(WriteData write, PipedWriter out) {
		super();
		this.write = write;
		this.out = out;
	}
	
	@Override
	public void run() {
		write.writeMethod(out);
	}
	
}


package chapter03.section1.thread_3_1_13.project_1_pipeReaderWriter;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

public class Run {

	public static void main(String[] args) {

		try {
			WriteData writeData = new WriteData();
			ReadData readData = new ReadData();

			PipedReader inputStream = new PipedReader();
			PipedWriter outputStream = new PipedWriter();

			// inputStream.connect(outputStream);
			outputStream.connect(inputStream);

			ThreadRead threadRead = new ThreadRead(readData, inputStream);
			threadRead.start();

			Thread.sleep(2000);

			ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
			threadWrite.start();

		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}