1. 程式人生 > >執行緒同步工具(七)在併發任務間交換資料

執行緒同步工具(七)在併發任務間交換資料

宣告:本文是《 Java 7 Concurrency Cookbook 》的第三章, 作者: Javier Fernández González 譯者:鄭玉婷

在併發任務間交換資料

Java 併發 API 提供了一種允許2個併發任務間相互交換資料的同步應用。更具體的說,Exchanger 類允許在2個執行緒間定義同步點,當2個執行緒到達這個點,他們相互交換資料型別,使用第一個執行緒的資料型別變成第二個的,然後第二個執行緒的資料型別變成第一個的。

這個類在遇到類似生產者和消費者問題時,是非常有用的。來一個非常經典的併發問題:你有相同的資料buffer,一個或多個數據生產者,和一個或多個數據消費者。只是Exchange類只能同步2個執行緒,所以你只能在你的生產者和消費者問題中只有一個生產者和一個消費者時使用這個類。

在這個指南,你將學習如何使用 Exchanger 類來解決只有一個生產者和一個消費者的生產者和消費者問題。

準備

這個指南的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。

怎麼做呢

按照這些步驟來實現下面的例子:

package tool;
import java.util.List;
import java.util.concurrent.Exchanger;

//1. 首先,從實現producer開始吧。建立一個類名為Producer並一定實現 Runnable 介面。
public class Producer implements Runnable {

// 2. 宣告 List<String>物件,名為 buffer。這是等等要被相互交換的資料型別。
private List<String> buffer;

// 3. 宣告 Exchanger<List<String>>; 物件,名為exchanger。這個 exchanger 物件是用來同步producer和consumer的。
private final Exchanger<List<String>> exchanger;

// 4. 實現類的建構函式,初始化這2個屬性。
public Producer(List<String> buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}

// 5. 實現 run() 方法. 在方法內,實現10次交換。
@Override
public void run() {
int cycle = 1;
for (int i = 0; i < 10; i++) {			System.out.printf("Producer: Cycle %d\n", cycle);

// 6. 在每次迴圈中,加10個字串到buffer。
for (int j = 0; j <10; j++) {
String message = "Event " + ((i * 10) + j);
System.out.printf("Producer: %s\n", message);
buffer.add(message);
}

// 7. 呼叫 exchange() 方法來與consumer交換資料。此方法可能會丟擲InterruptedException 異常, 加上處理程式碼。
try {
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer: " + buffer.size());
cycle++;
}
}
}
//8. 現在, 來實現consumer。建立一個類名為Consumer並一定實現 Runnable 介面。
package tool;
import java.util.List;
import java.util.concurrent.Exchanger;
public class Consumer implements Runnable {

// 9. 宣告名為buffer的 List<String>物件。這個物件型別是用來相互交換的。
private List<String> buffer;

// 10. 宣告一個名為exchanger的 Exchanger<List<String>> 物件。用來同步 producer和consumer。
private final Exchanger<List<String>> exchanger;

// 11. 實現類的建構函式,並初始化2個屬性。
public Consumer(List<String>buffer, Exchanger<List<String>> exchanger) {
this.buffer = buffer;
this.exchanger = exchanger;
}

// 12. 實現 run() 方法。在方法內,實現10次交換。
@Override
public void run() {
int cycle = 1;
for (int i = 0; i < 10; i++) {
System.out.printf("Consumer: Cycle %d\n", cycle);

// 13. 在每次迴圈,首先呼叫exchange()方法來與producer同步。Consumer需要消耗資料。此方法可能會丟擲InterruptedException異常, 加上處理程式碼。
try {
buffer = exchanger.exchange(buffer);
} catch (InterruptedException e) {				e.printStackTrace();
}

// 14. 把producer發來的在buffer裡的10字串寫到操控臺並從buffer內刪除,留空。System.out.println("Consumer: " + buffer.size());
for (int j = 0; j <10; j++) {
String message = buffer.get(0);
System.out.println("Consumer: " + message);
buffer.remove(0);
}
cycle++;
}
//15.現在,實現例子的主類通過建立一個類,名為Core並加入 main() 方法。
package tool;
import java.util.ArrayList;
mport java.util.List;
import java.util.concurrent.Exchanger;

public class Core {
public static void main(String[] args) {

// 16. 建立2個buffers。分別給producer和consumer使用.
List<String> buffer1 = new ArrayList<String>();
List<String> buffer2 = new ArrayList<String>();

// 17. 建立Exchanger物件,用來同步producer和consumer。
Exchanger<List<String>> exchanger = new Exchanger<List<String>>();

// 18. 建立Producer物件和Consumer物件。
Producer producer = new Producer(buffer1, exchanger);
Consumer consumer = new Consumer(buffer2, exchanger);

// 19. 建立執行緒來執行producer和consumer並開始執行緒。
Thread threadProducer = new Thread(producer);
Thread threadConsumer = new Thread(consumer); threadProducer.start();
threadConsumer.start();
}

它是怎麼工作的…

消費者開始時是空白的buffer,然後呼叫Exchanger來與生產者同步。因為它需要資料來消耗。生產者也是從空白的buffer開始,然後建立10個字串,儲存到buffer,並使用exchanger與消費者同步。

在這兒,2個執行緒(生產者和消費者執行緒)都是在Exchanger裡並交換了資料型別,所以當消費者從exchange() 方法返回時,它有10個字串在buffer內。當生產者從 exchange() 方法返回時,它有空白的buffer來重新寫入。這樣的操作會重複10遍。

如你執行例子,你會發現生產者和消費者是如何併發的執行任務和在每個步驟它們是如何交換buffers的。與其他同步工具一樣會發生這種情況,第一個呼叫 exchange()方法會進入休眠直到其他執行緒的達到。

更多…

Exchanger 類有另外一個版本的exchange方法:

  • exchange(V data, long time, TimeUnit unit):V是宣告Phaser的引數種類(例子裡是 List)。 此執行緒會休眠直到另一個執行緒到達並中斷它,或者特定的時間過去了。TimeUnit類有多種常量:DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, 和 SECONDS。