1. 程式人生 > >Java 併發:執行緒間通訊與協作

Java 併發:執行緒間通訊與協作

摘要:

  執行緒與執行緒之間不是相互獨立的個體,它們彼此之間需要相互通訊和協作,最典型的例子就是生產者-消費者問題。本文首先介紹 wait/notify 機制,並對實現該機制的兩種方式——synchronized+wait-notify模式和Lock+Condition模式進行詳細剖析,以作為執行緒間通訊與協作的基礎。進一步地,以經典的生產者-消費者問題為背景,熟練對 wait/notify 機制的使用。最後,對 Thread 類中的 join() 方法進行原始碼分析,並以宿主執行緒與寄生執行緒的協作為例進行說明。

一. 引子

  執行緒與執行緒之間不是相互獨立的個體,它們彼此之間需要相互通訊和協作。比如說最經典的 生產者-消費者模型:

當佇列滿時,生產者需要等待佇列有空間才能繼續往裡面放入商品,而在等待的期間內,生產者必須釋放對臨界資源(即佇列)的佔用權。因為生產者如果不釋放對臨界資源的佔用權,那麼消費者就無法消費佇列中的商品,就不會讓佇列有空間,那麼生產者就會一直無限等待下去。因此,一般情況下,當佇列滿時,會讓生產者交出對臨界資源的佔用權,並進入掛起狀態。然後等待消費者消費了商品,然後消費者通知生產者佇列有空間了。同樣地,當佇列空時,消費者也必須等待,等待生產者通知它佇列中有商品了。這種互相通訊的過程就是執行緒間的協作,也是本文要探討的問題。
  
  在下面的例子中,雖然兩個執行緒實現了通訊,但是憑藉 執行緒B 不斷地通過 while語句輪詢
來檢測某一個條件,這樣會導致CPU的浪費。因此,需要一種機制來減少 CPU資源 的浪費,而且還能實現多個執行緒之間的通訊,即 wait/notify 機制

//資源類
class MyList {

    //臨界資源
    private volatile List<String> list = new ArrayList<String>();

    public void add() {
        list.add("abc");
    }

    public int size() {
        return list.size();
    }
}

// 執行緒A
class ThreadA extends Thread { private MyList list; public ThreadA(MyList list,String name) { super(name); this.list = list; } @Override public void run() { try { for (int i = 0; i < 3; i++) { list.add(); System.out.println("添加了" + (i + 1) + "個元素"); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } //執行緒B class ThreadB extends Thread { private MyList list; public ThreadB(MyList list,String name) { super(name); this.list = list; } @Override public void run() { try { while (true) { // while 語句輪詢 if (list.size() == 2) { System.out.println("==2了,執行緒b要退出了!"); throw new InterruptedException(); } } } catch (InterruptedException e) { e.printStackTrace(); } } } //測試 public class Test { public static void main(String[] args) { MyList service = new MyList(); ThreadA a = new ThreadA(service,"A"); ThreadB b = new ThreadB(service,"B"); a.start(); b.start(); } }/* Output(輸出結果不唯一): 添加了1個元素 添加了2個元素 ==2了,執行緒b要退出了! java.lang.InterruptedException at test.ThreadB.run(Test.java:57) 添加了3個元素 *///:~

二. wait/notify 機制

  在這之前,執行緒間通過共享資料來實現通訊,即多個執行緒主動地讀取一個共享資料,通過 同步互斥訪問機制 保證執行緒的安全性。等待/通知機制 主要由 Object類 中的以下三個方法保證:

1、wait()、notify() 和 notifyAll()

  上述三個方法均非Thread類中所宣告的方法,而是Object類中宣告的方法。原因是每個物件都擁有monitor(鎖),所以讓當前執行緒等待某個物件的鎖,當然應該通過這個物件來操作,而不是用當前執行緒來操作,因為當前執行緒可能會等待多個執行緒的鎖,如果通過執行緒來操作,就非常複雜了。

1) wait()

   當前執行緒 (Thread.concurrentThread() 方法所返回的執行緒) 釋放物件鎖並進入等待(阻塞)狀態。

(1) 方法宣告

    public final native void wait(long timeout) throws InterruptedException;

(2) 方法作用

  Causes the current thread to wait until either another thread invokes the notify() method or the notifyAll() method for this object, or a specified amount of time has elapsed. If any threads are waiting on this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the implementation. A thread waits on this object’s monitor by calling one of the wait() methods.

  This method causes the current thread (call it T):

① to place itself in the wait set for this object;

② to relinquish (放棄) any and all synchronization claims on this object;

Thread T becomes disabled for thread scheduling purposes and lies dormant (休眠) until one of four things happens:

  • Some other thread invokes the notify method for this object and thread T happens to be arbitrarily chosen as the thread to be awakened;

  • Some other thread invokes the notifyAll method for this object;

  • Some other thread interrupts thread T;

  • The specified amount of real time has elapsed, more or less. If timeout is zero, however, then real time is not taken into consideration and the thread simply waits until notified (等待時間為 0 意味著永遠等待,直至執行緒被喚醒) .

      The thread T is then removed from the wait set for this object and re-enabled for thread scheduling. It then competes in the usual manner with other threads for the right to synchronize on the object; once it has gained control of the object, all its synchronization claims on the object are restored to the status quo ante - that is, to the situation as of the time that the wait method was invoked. Thread T then returns from the invocation of the wait method. Thus, on return from the wait method, the synchronization state of the object and of thread T is exactly as it was when the wait method was invoked.

(3) 方法使用條件

  This method should only be called by a thread that is the owner of this object’s monitor.

(4) 異常

  • 執行時(不受檢查)異常
    IllegalMonitorStateException: if the current thread is not the owner of this object’s monitor;
    IllegalArgumentException: if the value of timeout is negative;

  • 受檢查異常 (中斷阻塞執行緒,拋 InterruptedException並終止執行緒,釋放鎖,釋放CPU)
    InterruptedException: if any thread interrupted the current thread before or while the current thread was waiting for a notification. The interrupted status of the current thread is cleared when this exception is thrown.

2) notify()

  喚醒一個正在等待相應物件鎖的執行緒,使其進入就緒佇列,以便在當前執行緒釋放鎖後競爭鎖,進而得到CPU的執行。

(1) 方法宣告

    public final native void notify();

(2) 方法作用

  Wakes up a single thread that is waiting on this object’s monitor. If any threads are waiting on this object, one of them is chosen to be awakened. The choice is arbitrary and occurs at the discretion of the implementation.

  The awakened thread will not be able to proceed until the current thread relinquishes (放棄) the lock on this object.(在執行 notify() 方法後,當前執行緒不會馬上釋放該鎖物件,呈 wait 狀態的執行緒也並不能馬上獲取該物件鎖。只有等到執行notify()方法的執行緒退出synchronized程式碼塊/方法後,當前執行緒才會釋放鎖,而相應的呈wait狀態的執行緒才可以去爭取該物件鎖。) The awakened thread will compete in the usual manner with any other threads that might be actively competing to (競爭) synchronize on this object; the awakened thread enjoys no reliable privilege or disadvantage in being the next thread to lock this object.

(3) 方法使用條件

  This method should only be called by a thread that is the owner of this object’s monitor. A thread becomes the owner of the object’s monitor in one of three ways:

  • By executing a synchronized instance method of that object;

  • By executing the body of a synchronized statement that synchronizes on the object;

  • For objects of type Class, by executing a synchronized static method of that class.

      Only one thread at a time can own an object’s monitor(互斥鎖).

(4) 異常

  • 執行時(不受檢查)異常
    IllegalMonitorStateException: if the current thread is not the owner of this object’s monitor.

3) notifyAll()

  喚醒所有正在等待相應物件鎖的執行緒,使它們進入就緒佇列,以便在當前執行緒釋放鎖後競爭鎖,進而得到CPU的執行。

(1) 方法宣告

    public final native void notifyAll();

(2) 方法作用

  Wakes up all threads that are waiting on this object’s monitor. A thread waits on an object’s monitor by calling one of the wait methods.

  The awakened threads will not be able to proceed until the current thread relinquishes (放棄) the lock on this object. The awakened threads will compete in the usual manner with any other threads that might be actively competing to (競爭) synchronize on this object; the awakened threads enjoy no reliable privilege or disadvantage in being the next thread to lock this object.

(3) 方法使用條件

   This method should only be called by a thread that is the owner of this object’s monitor.

(4) 異常

  • 執行時(不受檢查)異常
    IllegalMonitorStateException: if the current thread is not the owner of this object’s monitor.

4) 小結

從以上描述可以得出:

  • wait()、notify() 和 notifyAll()方法是 本地方法,並且為 final 方法,無法被重寫;
  • 呼叫某個物件的 wait() 方法能讓 當前執行緒阻塞,並且當前執行緒必須擁有此物件的monitor(即鎖);
  • 呼叫某個物件的 notify() 方法能夠喚醒 一個正在等待這個物件的monitor的執行緒,如果有多個執行緒都在等待這個物件的monitor,則只能喚醒其中一個執行緒;
  • 呼叫notifyAll()方法能夠喚醒所有正在等待這個物件的monitor的執行緒。

2、方法呼叫與執行緒狀態關係

  每個鎖物件都有兩個佇列,一個是就緒佇列,一個是阻塞佇列。就緒佇列儲存了已就緒(將要競爭鎖)的執行緒,阻塞佇列儲存了被阻塞的執行緒。當一個阻塞執行緒被喚醒後,才會進入就緒佇列,進而等待CPU的排程;反之,當一個執行緒被wait後,就會進入阻塞佇列,等待被喚醒。

           Thread方法與狀態.jpg-73.5kB

3、使用舉例

public class Test {
    public static Object object = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread1 thread1 = new Thread1();
        Thread2 thread2 = new Thread2();

        thread1.start();

        Thread.sleep(2000);

        thread2.start();
    }

    static class Thread1 extends Thread {
        @Override
        public void run() {
            synchronized (object) {
                System.out.println("執行緒" + Thread.currentThread().getName()
                        + "獲取到了鎖...");
                try {
                    System.out.println("執行緒" + Thread.currentThread().getName()
                            + "阻塞並釋放鎖...");
                    object.wait();
                } catch (InterruptedException e) {
                }
                System.out.println("執行緒" + Thread.currentThread().getName()
                        + "執行完成...");
            }
        }
    }

    static class Thread2 extends Thread {
        @Override
        public void run() {
            synchronized (object) {
                System.out.println("執行緒" + Thread.currentThread().getName()
                        + "獲取到了鎖...");
                object.notify();
                System.out.println("執行緒" + Thread.currentThread().getName()
                        + "喚醒了正在wait的執行緒...");
            }
            System.out
                    .println("執行緒" + Thread.currentThread().getName() + "執行完成...");
        }
    }
}/* Output: 
        執行緒Thread-0獲取到了鎖...
        執行緒Thread-0阻塞並釋放鎖...
        執行緒Thread-1獲取到了鎖...
        執行緒Thread-1喚醒了正在wait的執行緒...
        執行緒Thread-1執行完成...
        執行緒Thread-0執行完成...
 *///:~

4、多個同類型執行緒的場景(wait 的條件發生變化)

//資源類
class ValueObject {
    public static List<String> list = new ArrayList<String>();
}

//元素新增執行緒
class ThreadAdd extends Thread {

    private String lock;

    public ThreadAdd(String lock,String name) {
        super(name);
        this.lock = lock;
    }

    @Override
    public void run() {
        synchronized (lock) {
            ValueObject.list.add("anyString");
            lock.notifyAll();               // 喚醒所有 wait 執行緒
        }
    }
}

//元素刪除執行緒
class ThreadSubtract extends Thread {

    private String lock;

    public ThreadSubtract(String lock,String name) {
        super(name);
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                if (ValueObject.list.size() == 0) {
                    System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
                    lock.wait();
                    System.out.println("wait   end ThreadName=" + Thread.currentThread().getName());
                }
                ValueObject.list.remove(0);
                System.out.println("list size=" + ValueObject.list.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

//測試類
public class Run {
    public static void main(String[] args) throws InterruptedException {

        //鎖物件
        String lock = new String("");

        ThreadSubtract subtract1Thread = new ThreadSubtract(lock,"subtract1Thread");
        subtract1Thread.start();

        ThreadSubtract subtract2Thread = new ThreadSubtract(lock,"subtract2Thread");
        subtract2Thread.start();

        Thread.sleep(1000);

        ThreadAdd addThread = new ThreadAdd(lock,"addThread");
        addThread.start();

    }
}/* Output: 
        wait begin ThreadName=subtract1Thread
        wait begin ThreadName=subtract2Thread
        wait   end ThreadName=subtract2Thread
        list size=0
        wait   end ThreadName=subtract1Thread
        Exception in thread "subtract1Thread"
            java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
            at java.util.ArrayList.rangeCheck(Unknown Source)
            at java.util.ArrayList.remove(Unknown Source)
            at test.ThreadSubtract.run(Run.java:49)
 *///:~

  當 執行緒subtract1Thread 被喚醒後,將從 wait處 繼續執行。但由於 執行緒subtract2Thread 先獲取到鎖得到執行,導致 執行緒subtract1Thread 的 wait的條件發生變化(不再滿足),而 執行緒subtract1Thread 卻毫無所知,導致異常產生。

  像這種有多個相同型別的執行緒場景,為防止wait的條件發生變化而導致的執行緒異常終止,我們在阻塞執行緒被喚醒的同時還必須對wait的條件進行額外的檢查,如下所示:

//元素刪除執行緒
class ThreadSubtract extends Thread {

    private String lock;

    public ThreadSubtract(String lock,String name) {
        super(name);
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            synchronized (lock) {
                while (ValueObject.list.size() == 0) {    //將 if 改成 while 
                    System.out.println("wait begin ThreadName=" + Thread.currentThread().getName());
                    lock.wait();
                    System.out.println("wait   end ThreadName=" + Thread.currentThread().getName());
                }
                ValueObject.list.remove(0);
                System.out.println("list size=" + ValueObject.list.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}/* Output: 
        wait begin ThreadName=subtract1Thread
        wait begin ThreadName=subtract2Thread
        wait   end ThreadName=subtract2Thread
        list size=0
        wait   end ThreadName=subtract1Thread
        wait begin ThreadName=subtract1Thread
 *///:~

  只需將 執行緒類ThreadSubtract 的 run()方法中的 if 條件 改為 while 條件 即可。

三. Condition

  Condition是在java 1.5中出現的,它用來替代傳統的Object的wait()/notify()實現執行緒間的協作,它的使用依賴於 Lock,Condition、Lock 和 Thread 三者之間的關係如下圖所示。相比使用Object的wait()/notify(),使用Condition的await()/signal()這種方式能夠更加安全和高效地實現執行緒間協作。Condition是個介面,基本的方法就是await()和signal()方法。Condition依賴於Lock介面,生成一個Condition的基本程式碼是lock.newCondition() 。 必須要注意的是,Condition 的 await()/signal() 使用都必須在lock保護之內,也就是說,必須在lock.lock()和lock.unlock之間才可以使用。事實上,Conditon的await()/signal() 與 Object的wait()/notify() 有著天然的對應關係:

  • Conditon中的await()對應Object的wait();
  • Condition中的signal()對應Object的notify();
  • Condition中的signalAll()對應Object的notifyAll()。

             這裡寫圖片描述

                       
    Ps: 關於Lock框架的更多介紹,請讀者移步我的博文《 Java 併發:Lock 框架詳解》

  使用Condition往往比使用傳統的通知等待機制(Object的wait()/notify())要更靈活、高效,例如,我們可以使用多個Condition實現通知部分執行緒:

// 執行緒 A
class ThreadA extends Thread {
    private MyService service;
    public ThreadA(MyService service) {
        super();
        this.service = service;
    }
    @Override
    public void run() {
        service.awaitA();
    }
}
// 執行緒 B
class ThreadB extends Thread {
    public ThreadB(MyService service) {
        super();
        this.service = service;
    }
    @Override
    public void run() {
        service.awaitB();
    }
}

class MyService {
    private Lock lock = new ReentrantLock();
    // 使用多個Condition實現通知部分執行緒
    public Condition conditionA = lock.newCondition();
    public Condition conditionB = lock.newCondition();

    public void awaitA() {
        lock.lock();
        try {
            System.out.println("begin awaitA時間為" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
            conditionA.await();
            System.out.println("  end awaitA時間為" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void awaitB() {
        lock.lock();
        try {
            System.out.println("begin awaitB時間為" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
            conditionB.await();
            System.out.println("  end awaitB時間為" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void signalAll_A() {
        try {
            lock.lock();
            System.out.println("  signalAll_A時間為" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
            conditionA.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public void signalAll_B() {
        try {
            lock.lock();
            System.out.println("  signalAll_B時間為" + System.currentTimeMillis()
                    + " ThreadName=" + Thread.currentThread().getName());
            conditionB.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

// 測試
public class Run {
    public static void main(String[] args) throws InterruptedException {
        MyService service = new MyService();

        ThreadA a = new ThreadA(service);
        a.setName("A");
        a.start();

        ThreadB b = new ThreadB(service);
        b.setName("B");
        b.start();

        Thread.sleep(3000);
        service.signalAll_A();
    }
}

  輸出結果如下圖所示,我們可以看到只有執行緒A被喚醒,執行緒B仍然阻塞。

            多個Condition通知部分執行緒.png-13.1kB

  實際上,Condition 實現了一種分組機制,將所有對臨界資源進行訪問的執行緒進行分組,以便實現執行緒間更精細化的協作,例如通知部分執行緒。我們可以從上面例子的輸出結果看出,只有conditionA範圍內的執行緒A被喚醒,而conditionB範圍內的執行緒B仍然阻塞。

四. 生產者-消費者模型

  等待/通知機制 最經典的應用就是 生產者-消費者模型。下面以多生產者-多消費者問題為背景,分別運用兩種模式 —— synchronized+wait-notify模式和Lock+Condition模式實現 wait-notify 機制。

Case 1: 傳統實現方式

//資源類
class MyStack {
    // 共享佇列
    private List list = new ArrayList();

    // 生產
    @SuppressWarnings("unchecked")
    public synchronized void push() {
        try {
            while (list.size() == 1) {    // 多個生產者
                System.out.println("佇列已滿,執行緒 "
                        + Thread.currentThread().getName() + " 呈wait狀態...");
                this.wait();
            }
            list.add("anyString=" + Math.random());
            System.out.println("執行緒 " + Thread.currentThread().getName()
                    + " 生產了,佇列已滿...");
            this.notifyAll();                   // 防止生產者僅通知生產者
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 消費
    public synchronized String pop() {
        String returnValue = "";
        try {
            while (list.size() == 0) {              // 多個消費者
                System.out.println("佇列已空,執行緒 "
                        + Thread.currentThread().getName() + " 呈wait狀態...");
                this.wait();
            }
            returnValue = "" + list.get(0);
            list.remove(0);
            System.out.println("執行緒 " + Thread.currentThread().getName()
                    + " 消費了,佇列已空...");
            this.notifyAll();                   // 防止消費者僅通知消費者
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return returnValue;
    }
}

//生產者
class P_Thread extends Thread {

    private MyStack myStack;

    public P_Thread(MyStack myStack,String name) {
        super(name);
        this.myStack = myStack;
    }

    public void pushService() {
        myStack.push();
    }

    @Override
    public void run() {
        while (true) {     
            myStack.push();
        }
    }
}

//消費者
class C_Thread extends Thread {

    private MyStack myStack;

    public C_Thread(MyStack myStack,String name) {
        super(name);
        this.myStack = myStack;
    }

    @Override
    public void run() {
        while (true) {
            myStack.pop();
        }
    }
}

//測試類
public class Run {
    public static void main(String[] args) throws InterruptedException {
        MyStack myStack = new MyStack();

        P_Thread pThread1 = new P_Thread(myStack, "P1");
        P_Thread pThread2 = new P_Thread(myStack, "P2");
        P_Thread pThread3 = new P_Thread(myStack, "P3");
        P_Thread pThread4 = new P_Thread(myStack, "P4");
        P_Thread pThread5 = new P_Thread(myStack, "P5");
        P_Thread pThread6 = new P_Thread(myStack, "P6");
        pThread1.start();
        pThread2.start();
        pThread3.start();
        pThread4.start();
        pThread5.start();
        pThread6.start();

        C_Thread cThread1 = new C_Thread(myStack, "C1");
        C_Thread cThread2 = new C_Thread(myStack, "C2");
        C_Thread cThread3 = new C_Thread(myStack, "C3");
        C_Thread cThread4 = new C_Thread(myStack, "C4");
        C_Thread cThread5 = new C_Thread(myStack, "C5");
        C_Thread cThread6 = new C_Thread(myStack, "C6");
        C_Thread cThread7 = new C_Thread(myStack, "C7");
        C_Thread cThread8 = new C_Thread(myStack, "C8");
        cThread1.start();
        cThread2.start();
        cThread3.start();
        cThread4.start();
        cThread5.start();
        cThread6.start();
        cThread7.start();
        cThread8.start();
    }
}/* Output: 
        執行緒 P1 生產了,佇列已滿...
        佇列已滿,執行緒 P1 呈wait狀態...
        執行緒 C5 消費了,佇列已空...
        佇列已空,執行緒 C5 呈wait狀態...
        佇列已空,執行緒 C8 呈wait狀態...
        佇列已空,執行緒 C2 呈wait狀態...
        佇列已空,執行緒 C7 呈wait狀態...
        佇列已空,執行緒 C4 呈wait狀態...
        佇列已空,執行緒 C6 呈wait狀態...
        佇列已空,執行緒 C3 呈wait狀態...
        佇列已空,執行緒 C1 呈wait狀態...
        執行緒 P6 生產了,佇列已滿...
        佇列已滿,執行緒 P6 呈wait狀態...
        佇列已滿,執行緒 P5 呈wait狀態...
        佇列已滿,執行緒 P4 呈wait狀態...
        ...
 *///:~

對於生產者-消費者問題,有兩個要點需要注意:

  • 在多個同類型執行緒(多個生產者執行緒或者消費者執行緒)的場景中,為防止wait的條件發生變化而導致執行緒異常終止,我們在阻塞執行緒被喚醒的同時還必須對wait的條件進行額外的檢查,即 使用 while 迴圈代替 if 條件

  • 在多個同類型執行緒(多個生產者執行緒或者消費者執行緒)的場景中,為防止生產者(消費者)喚醒生產者(消費者),保證生產者和消費者互相喚醒,需要 使用 notify 替代 notifyAll.

Case 2: 使用 Condition 實現方式

// 執行緒A
class MyThreadA extends Thread {

    private MyService myService;

    public MyThreadA(MyService myService, String name) {
        super(name);
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true)
            myService.set();
    }
}

// 執行緒B
class MyThreadB extends Thread {

    private MyService myService;

    public MyThreadB(MyService myService, String name) {
        super(name);
        this.myService = myService;
    }

    @Override
    public void run() {
        while (true)
            myService.get();
    }
}

// 資源類
class MyService {

    private ReentrantLock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();   // 生產執行緒
    private Condition conditionB = lock.newCondition();  // 消費執行緒
    private boolean hasValue = false;

    public void set() {
        try {
            lock.lock();
            while (hasValue == true) {
                System.out.println("[生產執行緒] " + " 執行緒"
                        + Thread.currentThread().getName() + " await...");
                conditionA.await();
            }
            System.out.println("[生產中] " + " 執行緒" + Thread.currentThread().getName() + " 生產★");
            Thread.sleep(1000);
            hasValue = true;
            System.out.println("執行緒" + Thread.currentThread().getName() + " 生產完畢...");
            System.out.println("[喚醒所有消費執行緒] " + " 執行緒"
                    + Thread.currentThread().getName() + "...");
            conditionB.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void get() {
        try {
            lock.lock();
            while (hasValue == false) {
                System.out.println("[消費執行緒] " + " 執行緒"
                        + Thread.currentThread().getName() + " await...");
                conditionB.await();
            }
            System.out.println("[消費中] " + " 執行緒"
                    + Thread.currentThread().getName() + " 消費☆");
            Thread.sleep(1000);
            System.out.println("執行緒" + Thread.currentThread().getName() + " 消費完畢...");
            hasValue = false;
            System.out.println("[喚醒所有生產執行緒] " + " 執行緒"
                    + Thread.currentThread().getName() + "...");
            conditionA.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class Run {
    public static void main(String[] args) throws InterruptedException {
        MyService service = new MyService();

        MyThreadA[] threadA = new MyThreadA[10];
        MyThreadB[] threadB = new MyThreadB[10];

        for (int i = 0; i < 10; i++) {
            threadA[i] = new MyThreadA(service, "ThreadA-" + i);
            threadB[i] = new MyThreadB(service, "ThreadB-" + i);
            threadA[i].start();
            threadB[i].start();
        }
    }
}/* Output: 
        [生產中]  執行緒ThreadA-0 生產★
        執行緒ThreadA-0 生產完畢...
        [喚醒所有消費執行緒]  執行緒ThreadA-0...
        [生產執行緒]  執行緒ThreadA-0 await...
        [消費中]  執行緒ThreadB-0 消費☆
        執行緒ThreadB-0 消費完畢...
        [喚醒所有生產執行緒]  執行緒ThreadB-0...
        [消費執行緒]  執行緒ThreadB-0 await...
        [生產中]  執行緒ThreadA-1 生產★
        執行緒ThreadA-1 生產完畢...
        [喚醒所有消費執行緒]  執行緒ThreadA-1...
        [生產執行緒]  執行緒ThreadA-1 await...
        [消費中]  執行緒ThreadB-1 消費☆
        執行緒ThreadB-1 消費完畢...
        [喚醒所有生產執行緒]  執行緒ThreadB-1...
        [消費執行緒]  執行緒ThreadB-1 await...
        [生產中]  執行緒ThreadA-2 生產★
        執行緒ThreadA-2 生產完畢...
        [喚醒所有消費執行緒]  執行緒ThreadA-2...
        ...
 *///:~

五.執行緒間的通訊:管道

  PipedInputStream類 與 PipedOutputStream類 用於在應用程式中建立管道通訊。一個PipedInputStream例項物件必須和一個PipedOutputStream例項物件進行連線而產生一個通訊管道。PipedOutputStream可以向管道中寫入資料,PipedIntputStream可以讀取PipedOutputStream向管道中寫入的資料,這兩個類主要用來完成執行緒之間的通訊。一個執行緒的PipedInputStream物件能夠從另外一個執行緒的PipedOutputStream物件中讀取資料,如下圖所示:

            執行緒通訊示意圖之管道.jpg-30.5kB

  PipedInputStream和PipedOutputStream的實現原理類似於”生產者-消費者”原理,PipedOutputStream是生產者,PipedInputStream是消費者。在PipedInputStream中,有一個buffer位元組陣列,預設大小為1024,作為緩衝區,存放”生產者”生產出來的東西。此外,還有兩個變數in和out —— in用來記錄”生產者”生產了多少,out是用來記錄”消費者”消費了多少,in為-1表示消費完了,in==out表示生產滿了。當消費者沒東西可消費的時候,也就是當in為-1的時候,消費者會一直等待,直到有東西可消費。
  
  在 Java 的 JDK 中,提供了四個類用於執行緒間通訊:

  • 位元組流:PipedInputStream 和 PipedOutputStream;
  • 字元流:PipedReader 和 PipedWriter;
//讀執行緒
class ThreadRead extends Thread {

    private ReadData read;
    private PipedInputStream input;

    public ThreadRead(ReadData read, PipedInputStream input) {
        super();
        this.read = read;
        this.input = input;
    }

    public void readMethod(PipedInputStream input) {
        try {
            System.out.println("read  :");
            byte[] byteArray = new byte[20];
            int readLength = input.read(byteArray);
            while (readLength != -1) {
                String newData = new String(byteArray, 0, readLength);
                System.out.print(newData);
                readLength = input.read(byteArray);
            }
            System.out.println();
            input.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        this.readMethod(input);
    }
}

//寫執行緒
class ThreadWrite extends Thread {

    private WriteData write;
    private PipedOutputStream out;

    public ThreadWrite(WriteData write, PipedOutputStream out) {
        super();
        this.write = write;
        this.out = out;
    }

    public void writeMethod(PipedOutputStream out) {
        try {
            System.out.println("write :");
            for (int i = 0; i < 30; i++) {
                String outData = "" + (i + 1);
                out.write(outData.getBytes());
                System.out.print(outData);
            }
            System.out.println();
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        this.writeMethod(out);
    }
}

//測試
public class Run {

    public static void main(String[] args) {
        try {
            WriteData writeData = new WriteData();
            ReadData readData = new ReadData();

            PipedInputStream inputStream = new PipedInputStream();
            PipedOutputStream outputStream = new PipedOutputStream();

            // inputStream.connect(outputStream);   // 效果相同
            outputStream.connect(inputStream);

            ThreadRead threadRead = new ThreadRead(readData, inputStream);
            threadRead.start();

            Thread.sleep(2000);

            ThreadWrite threadWrite = new ThreadWrite(writeData, outputStream);
            threadWrite.start();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}/* Output: 
        read  :
        write :
        123456789101112131415161718192021222324252627282930
        123456789101112131415161718192021222324252627282930
 *///:~

六. 方法 join() 的使用

1). join() 的定義

  假如在main執行緒中呼叫thread.join方法,則main執行緒會等待thread執行緒執行完畢或者等待一定的時間。詳細地,如果呼叫的是無參join方法,則等待thread執行完畢;如果呼叫的是指定了時間引數的join方法,則等待一定的時間。join()方法有三個過載版本:

public final synchronized void join(long millis) throws InterruptedException {...}
public final synchronized void join(long millis, int nanos) throws InterruptedException {...}
public final void join() throws InterruptedException {...}

  以 join(long millis) 方法為例,其內部呼叫了Object的wait()方法,如下圖:
            join 的定義.png-39.6kB

  根據以上原始碼可以看出,join()方法是通過wait()方法 (Object 提供的方法) 實現的。當 millis == 0 時,會進入 while(isAlive()) 迴圈,並且只要子執行緒是活的,宿主執行緒就不停的等待。 wait(0) 的作用是讓當前執行緒(宿主執行緒)等待,而這裡的當前執行緒是指 Thread.currentThread() 所返回的執行緒。所以,雖然是子執行緒物件(鎖)呼叫wait()方法,但是阻塞的是宿主執行緒。

2). join() 使用例項及原理

//示例程式碼
public class Test {

    public static void main(String[] args) throws IOException  {
        System.out.println("進入執行緒"+Thread.currentThread().getName());
        Test test = new Test();
        MyThread thread1 = test.new MyThread();
        thread1.start();
        try {
            System.out.println("執行緒"+Thread.currentThread().getName()+"等待");
            thread1.join();
            System.out.println("執行緒"+Thread.currentThread().getName()+"繼續執行");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } 

    class MyThread extends Thread{
        @Override
        public void run() {
            System.out.println("進入執行緒"+Thread.currentThread().getName());
            try {
                Thread.currentThread().sleep(5000);
            } catch (InterruptedException e) {
                // TODO: handle exception
            }
            System.out.println("執行緒"+Thread.currentThread().getName()+"執行完畢");
        }
    }
}/* Output:
        進入執行緒main
        執行緒main等待
        進入執行緒Thread-0
        執行緒Thread-0執行完畢
        執行緒main繼續執行
 *///~

  看上面的例子,當 main執行緒 執行到 thread1.join() 時,main執行緒會獲得執行緒物件thread1的鎖(wait 意味著拿到該物件的鎖)。只要 thread1執行緒 存活, 就會呼叫該物件鎖的wait()方法阻塞 main執行緒。那麼,main執行緒被什麼時候喚醒呢?

  事