java實現執行緒間通訊的四種方式
本文主要針對JAVA多執行緒中執行緒之間的通訊方式進行分析解釋,主要以程式碼結合文字的方式來討論執行緒間的通訊。
synchronized同步
public class MyObject {
synchronized public void methodA() {
//do something....
}
synchronized public void methodB() {
//do some other thing
}
}
public class ThreadA extends Thread {
private MyObject object;
//省略構造方法
@Override
public void run() {
super.run();
object.methodA();
}
}
public class ThreadB extends Thread {
private MyObject object;
//省略構造方法
@Override
public void run() {
super.run();
object.methodB();
}
}
public class Run {
public static void main(String[] args) {
MyObject object = new MyObject();
//執行緒A與執行緒B 持有的是同一個物件:object
ThreadA a = new ThreadA(object);
ThreadB b = new ThreadB(object);
a.start();
b.start();
}
}
由於執行緒A和執行緒B持有同一個MyObject類的物件object,儘管這兩個執行緒需要呼叫不同的方法,但是它們是同步執行的,比如:執行緒B需要等待執行緒A執行完了methodA()方法之後,它才能執行methodB()方法。這樣,執行緒A和執行緒B就實現了通訊。
這種方式,本質上就是“共享記憶體”式的通訊。多個執行緒需要訪問同一個共享變數,誰拿到了鎖(獲得了訪問許可權),誰就可以執行。
while輪詢
其實就是多執行緒同時執行,會犧牲部分CPU效能。
在這種方式下,執行緒A不斷地改變條件,執行緒ThreadB不停地通過while語句檢測這個條件(list.size()==5)是否成立 ,從而實現了執行緒間的通訊。但是這種方式會浪費CPU資源。之所以說它浪費資源,是因為JVM排程器將CPU交給執行緒B執行時,它沒做啥“有用”的工作,只是在不斷地測試 某個條件是否成立。就類似於現實生活中,某個人一直看著手機螢幕是否有電話來了,而不是: 在幹別的事情,當有電話來時,響鈴通知TA電話來了。
import java.util.ArrayList;
import java.util.List;
public class MyList {
private List<String> list = new ArrayList<String>();
public void add() {
list.add("elements");
}
public int size() {
return list.size();
}
}
import mylist.MyList;
public class ThreadA extends Thread {
private MyList list;
public ThreadA(MyList list) {
super();
this.list = list;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
list.add();
System.out.println("添加了" + (i + 1) + "個元素");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import mylist.MyList;
public class ThreadB extends Thread {
private MyList list;
public ThreadB(MyList list) {
super();
this.list = list;
}
@Override
public void run() {
try {
while (true) {
if (list.size() == 5) {
System.out.println("==5, 執行緒b準備退出了");
throw new InterruptedException();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import mylist.MyList;
import extthread.ThreadA;
import extthread.ThreadB;
public class Test {
public static void main(String[] args) {
MyList service = new MyList();
ThreadA a = new ThreadA(service);
a.setName("A");
a.start();
ThreadB b = new ThreadB(service);
b.setName("B");
b.start();
}
}
wait/notify機制
public class MyList {
private static List<String> list = new ArrayList<String>();
public static void add() {
list.add("anyString");
}
public static int size() {
return list.size();
}
}
public class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
if (MyList.size() != 5) {
System.out.println("wait begin " + System.currentTimeMillis());
lock.wait();
System.out.println("Interruption!!!");
//lock.wait();
lock.notify();
lock.wait();
System.out.println("wait end " + System.currentTimeMillis());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
super();
this.lock = lock;
}
@Override
public void run() {
try {
synchronized (lock) {
for (int i = 0; i < 10; i++) {
MyList.add();
if (MyList.size() == 5) {
lock.notify();
System.out.println("已經發出了通知");
lock.wait();
}
System.out.println("添加了" + (i + 1) + "個元素!");
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Test {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
wait begin 1498007974397
添加了1個元素!
添加了2個元素!
添加了3個元素!
添加了4個元素!
已經發出了通知
Interruption!!!
添加了5個元素!
添加了6個元素!
添加了7個元素!
添加了8個元素!
添加了9個元素!
添加了10個元素!
執行緒A要等待某個條件滿足時(list.size()==5),才執行操作。執行緒B則向list中新增元素,改變list 的size。
A,B之間如何通訊的呢?也就是說,執行緒A如何知道 list.size() 已經為5了呢?
這裡用到了Object類的 wait() 和 notify() 方法。
當條件未滿足時(list.size() !=5),執行緒A呼叫wait() 放棄CPU,並進入阻塞狀態。—不像②while輪詢那樣佔用CPU
當條件滿足時,執行緒B呼叫 notify()通知 執行緒A,所謂通知執行緒A,就是喚醒執行緒A,並讓它進入可執行狀態。
這種方式的一個好處就是CPU的利用率提高了。
管道通訊
管道流主要用來實現兩個執行緒之間的二進位制資料的傳播,下面以PipedInputStream類和PipedOutputStream類為例,實現生產者-消費者:
package test.pipe;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
/**
* 我們以數字替代產品 生產者每5秒提供5個產品,放入管道
*/
class MyProducer extends Thread {
private PipedOutputStream outputStream;
private int index = 0;
public MyProducer(PipedOutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public void run() {
while (true) {
try {
for (int i = 0; i < 5; i++) {
outputStream.write(index++);
}
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消費者每0.5秒從管道中取1件產品,並列印剩餘產品數量,並列印產品資訊(以數字替代)
*/
class MyConsumer extends Thread {
private PipedInputStream inputStream;
public MyConsumer(PipedInputStream inputStream) {
this.inputStream = inputStream;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
int count = inputStream.available();
if (count > 0) {
System.out.println("rest product count: " + count);
System.out.println("get product: " + inputStream.read());
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
public class PipeTest1 {
public static void main(String[] args) {
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();
try {
pis.connect(pos);
} catch (IOException e) {
e.printStackTrace();
}
new MyProducer(pos).start();
new MyConsumer(pis).start();
}
}