1. 程式人生 > >java實現執行緒間通訊的四種方式

java實現執行緒間通訊的四種方式

本文主要針對JAVA多執行緒中執行緒之間的通訊方式進行分析解釋,主要以程式碼結合文字的方式來討論執行緒間的通訊。

synchronized同步

public class MyObject {

    synchronized public void methodA() {
        //do something....
    }

    synchronized public void methodB() {
        //do some other thing
    }
}

public class ThreadA extends Thread {

    private
MyObject object; //省略構造方法 @Override public void run() { super.run(); object.methodA(); } } public class ThreadB extends Thread { private MyObject object; //省略構造方法 @Override public void run() { super.run(); object.methodB(); } } public class
Run {
public static void main(String[] args) { MyObject object = new MyObject(); //執行緒A與執行緒B 持有的是同一個物件:object ThreadA a = new ThreadA(object); ThreadB b = new ThreadB(object); a.start(); b.start(); } }

由於執行緒A和執行緒B持有同一個MyObject類的物件object,儘管這兩個執行緒需要呼叫不同的方法,但是它們是同步執行的,比如:執行緒B需要等待執行緒A執行完了methodA()方法之後,它才能執行methodB()方法。這樣,執行緒A和執行緒B就實現了通訊。

這種方式,本質上就是“共享記憶體”式的通訊。多個執行緒需要訪問同一個共享變數,誰拿到了鎖(獲得了訪問許可權),誰就可以執行。

while輪詢

其實就是多執行緒同時執行,會犧牲部分CPU效能。

在這種方式下,執行緒A不斷地改變條件,執行緒ThreadB不停地通過while語句檢測這個條件(list.size()==5)是否成立 ,從而實現了執行緒間的通訊。但是這種方式會浪費CPU資源。之所以說它浪費資源,是因為JVM排程器將CPU交給執行緒B執行時,它沒做啥“有用”的工作,只是在不斷地測試 某個條件是否成立。就類似於現實生活中,某個人一直看著手機螢幕是否有電話來了,而不是: 在幹別的事情,當有電話來時,響鈴通知TA電話來了。

import java.util.ArrayList;
import java.util.List;

public class MyList {

    private List<String> list = new ArrayList<String>();
    public void add() {
        list.add("elements");
    }
    public int size() {
        return list.size();
    }
}

import mylist.MyList;

public class ThreadA extends Thread {

    private MyList list;

    public ThreadA(MyList list) {
        super();
        this.list = list;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                list.add();
                System.out.println("添加了" + (i + 1) + "個元素");
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

import mylist.MyList;

public class ThreadB extends Thread {

    private MyList list;

    public ThreadB(MyList list) {
        super();
        this.list = list;
    }

    @Override
    public void run() {
        try {
            while (true) {
                if (list.size() == 5) {
                    System.out.println("==5, 執行緒b準備退出了");
                    throw new InterruptedException();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

import mylist.MyList;
import extthread.ThreadA;
import extthread.ThreadB;

public class Test {

    public static void main(String[] args) {
        MyList service = new MyList();

        ThreadA a = new ThreadA(service);
        a.setName("A");
        a.start();

        ThreadB b = new ThreadB(service);
        b.setName("B");
        b.start();
    }
}

wait/notify機制

public class MyList {
    private static List<String> list = new ArrayList<String>();

    public static void add() {
        list.add("anyString");
    }

    public static int size() {
        return list.size();
    }
}


public class ThreadA extends Thread {

    private Object lock;

    public ThreadA(Object lock) {
        super();
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (MyList.size() != 5) {
                    System.out.println("wait begin " + System.currentTimeMillis());
                    lock.wait();
                    System.out.println("Interruption!!!");
                    //lock.wait();
                    lock.notify();
                    lock.wait();
                    System.out.println("wait end  " + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


public class ThreadB extends Thread {

    private Object lock;

    public ThreadB(Object lock) {
        super();
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    MyList.add();
                    if (MyList.size() == 5) {
                        lock.notify();
                        System.out.println("已經發出了通知");
                        lock.wait();
                    }
                    System.out.println("添加了" + (i + 1) + "個元素!");
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Test {
    public static void main(String[] args) {
        try {
            Object lock = new Object();

            ThreadA a = new ThreadA(lock);
            a.start();

            Thread.sleep(50);

            ThreadB b = new ThreadB(lock);
            b.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
wait begin 1498007974397
添加了1個元素!
添加了2個元素!
添加了3個元素!
添加了4個元素!
已經發出了通知
Interruption!!!
添加了5個元素!
添加了6個元素!
添加了7個元素!
添加了8個元素!
添加了9個元素!
添加了10個元素!

執行緒A要等待某個條件滿足時(list.size()==5),才執行操作。執行緒B則向list中新增元素,改變list 的size。

A,B之間如何通訊的呢?也就是說,執行緒A如何知道 list.size() 已經為5了呢?

這裡用到了Object類的 wait() 和 notify() 方法。

當條件未滿足時(list.size() !=5),執行緒A呼叫wait() 放棄CPU,並進入阻塞狀態。—不像②while輪詢那樣佔用CPU

當條件滿足時,執行緒B呼叫 notify()通知 執行緒A,所謂通知執行緒A,就是喚醒執行緒A,並讓它進入可執行狀態。

這種方式的一個好處就是CPU的利用率提高了。

這裡寫圖片描述

管道通訊

管道流主要用來實現兩個執行緒之間的二進位制資料的傳播,下面以PipedInputStream類和PipedOutputStream類為例,實現生產者-消費者:

package test.pipe;  

import java.io.IOException;  
import java.io.PipedInputStream;  
import java.io.PipedOutputStream;  

/** 
 * 我們以數字替代產品 生產者每5秒提供5個產品,放入管道 
 */  
class MyProducer extends Thread {  

    private PipedOutputStream outputStream;  

    private int index = 0;  

    public MyProducer(PipedOutputStream outputStream) {  
        this.outputStream = outputStream;  
    }  

    @Override  
    public void run() {  
        while (true) {  
            try {  
                for (int i = 0; i < 5; i++) {  
                    outputStream.write(index++);  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  

            try {  
                Thread.sleep(5000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}  

/** 
 * 消費者每0.5秒從管道中取1件產品,並列印剩餘產品數量,並列印產品資訊(以數字替代) 
 */  
class MyConsumer extends Thread {  

    private PipedInputStream inputStream;  

    public MyConsumer(PipedInputStream inputStream) {  
        this.inputStream = inputStream;  
    }  

    @Override  
    public void run() {  
        while (true) {  
            try {  
                Thread.sleep(500);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
            try {  
                int count = inputStream.available();  
                if (count > 0) {  
                    System.out.println("rest product count: " + count);  
                    System.out.println("get product: " + inputStream.read());  
                }  
            } catch (IOException e1) {  
                e1.printStackTrace();  
            }  
        }  
    }  
}  

public class PipeTest1 {  

    public static void main(String[] args) {  

        PipedOutputStream pos = new PipedOutputStream();  
        PipedInputStream pis = new PipedInputStream();  
        try {  
            pis.connect(pos);  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  

        new MyProducer(pos).start();  
        new MyConsumer(pis).start();  

    }  
}