1. 程式人生 > >Java併發程式設計之Exchanger

Java併發程式設計之Exchanger

概述

  用於執行緒間資料的交換。它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange方法交換資料,如果第一個執行緒先執行exchange()方法,它會一直等待第二個執行緒也執行exchange方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產出來的資料傳遞給對方。

  Exchanger 可被視為 SynchronousQueue 的雙向形式。Exchanger在遺傳演算法和管道設計等應用中很有用。

  記憶體一致性:對於通過 Exchanger 成功交換物件的每對執行緒,每個執行緒中在 exchange() 之前的操作 happen-before 從另一執行緒中相應的 exchange() 返回的後續操作。

使用

  提供的方法:

    // 等待另一個執行緒到達此交換點(除非當前執行緒被中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。
    public V exchange(V x) throws InterruptedException
    //增加超時機制,超過指定時間,拋TimeoutException異常
    public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

  使用示例:

    該類使用 Exchanger 線上程間交換緩衝區,因此,在需要時,填充緩衝區的執行緒獲取一個新騰空的緩衝區,並將填滿的緩衝區傳遞給清空緩衝區的執行緒。

class FillAndEmpty {
    Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
    DataBuffer initialEmptyBuffer = ... a made-up type
    DataBuffer initialFullBuffer = ...
    
    //填充緩衝區執行緒
    class FillingLoop implements Runnable {
        public void run() {
            DataBuffer currentBuffer = initialEmptyBuffer;    //空的緩衝區
            try {
                while (currentBuffer != null) {
                    addToBuffer(currentBuffer);    //填充資料
                    //如果緩衝區被資料填滿,執行exchange。等待清空緩衝區執行緒也執行exchange方法。當兩個執行緒都到達同步點,交換資料。
                    if (currentBuffer.isFull())
                        currentBuffer = exchanger.exchange(currentBuffer);    
                }
            } catch (InterruptedException ex) { ... handle ... }
        }
    }
    
    //清空緩衝區執行緒
    class EmptyingLoop implements Runnable {
        public void run() {
            DataBuffer currentBuffer = initialFullBuffer;    //滿的緩衝區
            try {
                while (currentBuffer != null) {
                    takeFromBuffer(currentBuffer);    //清空緩衝區
                    //如果緩衝區被清空,執行exchange。等待填充緩衝區執行緒也執行exchange方法。當兩個執行緒都到達同步點,交換資料。
                    if (currentBuffer.isEmpty())
                        currentBuffer = exchanger.exchange(currentBuffer);
                }
            } catch (InterruptedException ex) { ... handle ...}
        }
    }

    void start() {
        new Thread(new FillingLoop()).start();
        new Thread(new EmptyingLoop()).start();
    }
}