1. 程式人生 > >基本執行緒同步(八)在Lock中使用多個條件

基本執行緒同步(八)在Lock中使用多個條件

宣告:本文是《 Java 7 Concurrency Cookbook 》的第二章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛

在Lock中使用多個條件

一個鎖可能伴隨著多個條件。這些條件宣告在Condition介面中。 這些條件的目的是允許執行緒擁有鎖的控制並且檢查條件是否為true,如果是false,那麼執行緒將被阻塞,直到其他執行緒喚醒它們。Condition介面提供一種機制,阻塞一個執行緒和喚醒一個被阻塞的執行緒。

在併發程式設計中,生產者與消費者是經典的問題。我們有一個數據緩衝區,一個或多個數據生產者往緩衝區儲存資料,一個或多個數據消費者從緩衝區中取出資料,正如在這一章中前面所解釋的一樣。

在這個指南中,你將學習如何通過使用鎖和條件來實現生產者與消費者問題。

準備工作…

你應該事先閱讀使用Lock同步程式碼的指南,才能更好的理解這個食譜。

如何做…

按以下步驟來實現的這個例子:

1.首先,讓我們建立一個類來模擬文字檔案。建立FileMock類,包括兩個屬性:一個字串陣列型別,名叫content,另一個int型別,名叫index。它們將儲存檔案內容和被檢索到的模擬檔案的行數。

public class FileMock {
private String content[];
private int index;

2.實現FileMock類的構造器,用隨機字元初始化檔案的內容。

public FileMock(int size, int length){
content=new String[size];
for (int i=0; i<size; i++){
StringBuilder buffer=new StringBuilder(length);
for (int j=0; j<length; j++){
int indice=(int)Math.random()*255;
buffer.append((char)indice);
}
content[i]=buffer.toString();
}
index=0;
}

3.實現hasMoreLines()方法,如果檔案有更多的行來處理,則返回true,如果我們已經取到了模擬檔案的尾部,則返回false。

public boolean hasMoreLines(){
return index<content.length;
}

4.實現getLine()方法,返回index屬性所確定的行數並增加其值。

public String getLine(){
if (this.hasMoreLines()) {
System.out.println("Mock: "+(content.length-index));
return content[index++];
}
return null;
}

5.現在,實現Buffer類,用來實現在生產者與消費者之間共享的緩衝區。

public class Buffer {

6.Buffer類,有6個屬性:

  • 一個型別為LinkedList<String>,名為buffer的屬性,用來儲存共享資料
  • 一個型別為int,名為maxSize的屬性,用來儲存緩衝區的長度
  • 一個名為lock的ReentrantLock物件,用來控制修改緩衝區程式碼塊的訪問
  • 兩個名分別為lines和space,型別為Condition的屬性
  • 一個Boolean型別,名為pendingLines的屬性,表明如果緩衝區中有行
private LinkedList<String> buffer;
private int maxSize;
private ReentrantLock lock;
private Condition lines;
private Condition space;
private boolean pendingLines;

7.實現Buffer類的構造器,初始化前面描述的所有屬性。

public Buffer(int maxSize) {
this.maxSize=maxSize;
buffer=new LinkedList<>();
lock=new ReentrantLock();
lines=lock.newCondition();
space=lock.newCondition();
pendingLines=true;
}

8. 實現insert()方法,接收一個String型別引數並試圖將它儲存到緩衝區。首先,它獲得鎖的控制。當它有鎖的控制,它將檢查緩衝區是否有空閒空 間。如果緩衝區已滿,它將呼叫await()方法在space條件上等待釋放空間。如果其他執行緒在space條件上呼叫signal()或 signalAll()方法,這個執行緒將被喚醒。當這種情況發生,這個執行緒在緩衝區上儲存行並且在lines條件上呼叫signallAll()方法,稍 後我們將會看到,這個條件將會喚醒所有在緩衝行上等待的執行緒。

public void insert(String line) {
lock.lock();
try {
while (buffer.size() == maxSize) {
space.await();
}
buffer.offer(line);
System.out.printf("%s: Inserted Line: %d\n", Thread.
currentThread().getName(),buffer.size());
lines.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

9. 實現get()方法,它返回儲存在緩衝區上的第一個字串。首先,它獲取鎖的控制。當它擁有鎖的控制,它檢查緩衝區是否有行。如果緩衝區是空的,它呼叫 await()方法在lines條件上等待緩衝區中的行。如果其他執行緒在lines條件上呼叫signal()或signalAll()方法,這個執行緒將 被喚醒。當它發生時,這個方法獲取緩衝區的首行,並且在space條件上呼叫signalAll()方法,然後返回String。

public String get() {
String line=null;
lock.lock();
try {
while ((buffer.size() == 0) &&(hasPendingLines())) {
lines.await();
}
if (hasPendingLines()) {
line = buffer.poll();
System.out.printf("%s: Line Readed: %d\n",Thread.
currentThread().getName(),buffer.size());

space.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return line;
}

10.實現setPendingLines()方法,用來設定pendingLines的值。當沒有更多的行生產時,它將被生產者呼叫。

public void setPendingLines(boolean pendingLines) {
this.pendingLines=pendingLines;
}

11.實現hasPendingLines()方法,如果有更多的行被處理時,返回true,否則返回false。

public boolean hasPendingLines() {
return pendingLines || buffer.size()>0;
}

12.現在輪到生產者,實現Producer類,並指定其實現Runnable介面。

public class Producer implements Runnable {

13.宣告兩個屬性:一個FileMock類物件,另一個Buffer類物件。

private FileMock mock;
private Buffer buffer;

14.實現Producer類的構造器,初始化這兩個屬性。

public Producer (FileMock mock, Buffer buffer){
this.mock=mock;
this.buffer=buffer;
}

15.實現run()方法,讀取在FileMock物件中建立的所有行,並使用insert()方法將它們儲存到緩衝區。一旦這個過程結束,使用setPendingLines()方法警告緩衝區,不會再產生更多的行。

@Override
public void run() {
buffer.setPendingLines(true);
while (mock.hasMoreLines()){
String line=mock.getLine();
buffer.insert(line);
}
buffer.setPendingLines(false);
}

16.接下來輪到消費者,實現Consumer類,並指定它實現Runnable介面。

public class Consumer implements Runnable {

17.宣告Buffer物件,實現Consumer構造器來初始化這個物件。

private Buffer buffer;
public Consumer (Buffer buffer) {
this.buffer=buffer;
}

18.實現run()方法,當緩衝區有等待的行,它將獲取一個並處理它。

@Override
public void run() {
while (buffer.hasPendingLines()) {
String line=buffer.get();
processLine(line);
}
}

19.實現輔助方法processLine(),它只睡眠10毫秒,用來模擬某種行的處理。

private void processLine(String line) {
try {
Random random=new Random();
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

20.通過建立類名為Main,且包括main()方法來實現這個示例的主類。

public class Main {
public static void main(String[] args) {

21.建立一個FileMock物件。

FileMock mock=new FileMock(100, 10);

22.建立一個Buffer物件。

Buffer buffer=new Buffer(20);

23.建立Producer物件,並且用10個執行緒執行它。

Producer producer=new Producer(mock, buffer);
Thread threadProducer=new Thread(producer,"Producer");

24.建立Consumer物件,並且用10個執行緒執行它。

Consumer consumers[]=new Consumer[3];
Thread threadConsumers[]=new Thread[3];
for (int i=0; i<3; i++){
consumers[i]=new Consumer(buffer);
threadConsumers[i]=new Thread(consumers[i],"Consumer "+i);
}

25.啟動producer和3個consumers。

threadProducer.start();
for (int i=0; i<3; i++){
threadConsumers[i].start();
}

它是如何工作的…

所 有Condition物件都與鎖有關,並且使用宣告在Lock介面中的newCondition()方法來建立。使用condition做任何操作之前, 你必須獲取與這個condition相關的鎖的控制。所以,condition的操作一定是在以呼叫Lock物件的lock()方法為開頭,以呼叫相同 Lock物件的unlock()方法為結尾的程式碼塊中。

當一個執行緒在一個condition上呼叫await()方法時,它將自動釋放鎖的控制,所以其他執行緒可以獲取這個鎖的控制並開始執行相同操作,或者由同個鎖保護的其他臨界區。

註釋:當一個執行緒在一個condition上呼叫signal()或signallAll()方法,一個或者全部在這個condition上等待的執行緒將被喚醒。這並不能保證的使它們現在睡眠的條件現在是true,所以你必須在while迴圈內部呼叫await()方法。你不能離開這個迴圈,直到 condition為true。當condition為false,你必須再次呼叫 await()方法。

你必須十分小心 ,在使用await()和signal()方法時。如果你在condition上呼叫await()方法而卻沒有在這個condition上呼叫signal()方法,這個執行緒將永遠睡眠下去。

在呼叫await()方法後,一個執行緒可以被中斷的,所以當它正在睡眠時,你必須處理InterruptedException異常。

不止這些…

Condition介面提供不同版本的await()方法,如下:

  • await(long time, TimeUnit unit):這個執行緒將會一直睡眠直到:

(1)它被中斷

(2)其他執行緒在這個condition上呼叫singal()或signalAll()方法

(3)指定的時間已經過了

(4)TimeUnit類是一個列舉型別如下的常量:

DAYS,HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS,SECONDS

  • awaitUninterruptibly():這個執行緒將不會被中斷,一直睡眠直到其他執行緒呼叫signal()或signalAll()方法
  • awaitUntil(Date date):這個執行緒將會一直睡眠直到:

(1)它被中斷

(2)其他執行緒在這個condition上呼叫singal()或signalAll()方法

(3)指定的日期已經到了

你可以在一個讀/寫鎖中的ReadLock和WriteLock上使用conditions。

參見