1. 程式人生 > >《Java 7併發程式設計實戰手冊》第六章併發集合

《Java 7併發程式設計實戰手冊》第六章併發集合

Snip20140120_1
人民郵電出版社出版的《Java 7併發程式設計實戰手冊》終於出版了,譯者是俞黎敏和申紹勇,該書將於近期上架。之前併發程式設計網組織翻譯過此書,由於郵電出版社在併發網聯絡他們之前就找到了譯者,所以沒有采用併發網的譯稿,但郵電出版社將於併發網展開合作,釋出該書的樣章(樣章由併發網挑選,你也可以回帖告訴我們你想看哪一章的樣章),並組織贈書活動回饋給活躍讀者。活動詳情請時刻關注併發網的微博和微信(微訊號:ifeves),最後祝各位用餐愉快!:)

本章將介紹下列內容:

  • 使用非阻塞式執行緒安全列表
  • 使用阻塞式執行緒安全列表
  • 使用按優先順序排序的阻塞式執行緒安全列表
  • 使用帶有延遲元素的執行緒安全列表
  • 使用執行緒安全可遍歷對映
  • 生成併發隨機數
  • 使用原子變數
  • 使用原子陣列

6.1 簡介

資料結構(Data Structure)是程式設計中的基本元素,幾乎每個程式都使用一種或多種資料結構來儲存和管理資料。Java API提供了包含介面、類和演算法的Java集合框架(Java Collection Framework),它實現了可用在程式中的大量資料結構。

當需要在併發程式中使用資料集合時,必須要謹慎地選擇相應的實現方式。大多數集合類不能直接用於併發應用,因為它們沒有對本身資料的併發訪問進行控制。如果一些併發任務共享了一個不適用於併發任務的資料結構,將會遇到資料不一致的錯誤,並將影響程式的準確執行。這類資料結構的一個例子是ArrayList類。

Java提供了一些可以用於併發程式中的資料集合,它們不會引起任何問題。一般來說,Java提供了兩類適用於併發應用的集合。

  • 阻塞式集合(Blocking Collection):這類集合包括新增和移除資料的方法。當集合已滿或為空時,被呼叫的新增或者移除方法就不能立即被執行,那麼呼叫這個方法的執行緒將被阻塞,一直到該方法可以被成功執行。
  • 非阻塞式集合(Non-Blocking Collection):這類集合也包括新增和移除資料的方法。如果方法不能立即被執行,則返回null或丟擲異常,但是呼叫這個方法的執行緒不會被阻塞。

通過本章的各個小節,你將學會如何在併發應用中使用一些Java集合。

  • 非阻塞式列表對應的實現類:ConcurrentLinkedDeque類;
  • 阻塞式列表對應的實現類:LinkedBlockingDeque 類;
  • 用於資料生成或消費的阻塞式列表對應的實現類:LinkedTransferQueue類;
  • 按優先順序排序列表元素的阻塞式列表對應的實現類:PriorityBlockingQueue類;
  • 帶有延遲列表元素的阻塞式列表對應的實現類:DelayQueue類;
  • 非阻塞式可遍歷對映對應的實現類:ConcurrentSkipListMap類;
  • 隨機數字對應的實現類:ThreadLocalRandom類;
  • 原子變數對應的實現類:AtomicLong和AtomicIntegerArray類。

6.2 使用非阻塞式執行緒安全列表

最基本的集合型別是列表(List)。一個列表包含的元素數量不定,可以在任何位置新增、讀取或移除元素。併發列表允許不同的執行緒在同一時間新增或移除列表中的元素,而不會造成資料不一致。
在本節,將會學到如何在併發程式中使用非阻塞式列表。非阻塞式列表提供了一些操作,如果被執行的操作不能夠立即執行(例如,在列表為空時,從列表取出一個元素),方法會丟擲異常或返回null。Java 7引入了ConcurrentLinkedDeque類來實現非阻塞式併發列表。
將要實現的範例包括以下兩個不同的任務:

  • 新增大量的資料到一個列表中;
  • 從同一個列表中移除大量的資料。

準備工作

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。
1.建立一個名為AddTask的類,實現Runnable介面。

public class AddTask implements Runnable {

2.宣告一個私有的ConcurrentLinkedDeque屬性list,並指定它的泛型引數是String型的。

private ConcurrentLinkedDeque list;

3.實現類的構造器來初始化屬性。

public AddTask(ConcurrentLinkedDeque list) {
this.list=list;
}

4.實現run()方法。這個方法將10,000個字串存放到列表中,這些字串由當前執行任務的執行緒的名稱和數字組成。

@Override
public void run() {
String name=Thread.currentThread().getName();
for (int i=0; i<10000; i++){
list.add(name+": Element "+i);
}
}

5.建立名為PollTask的類,並實現Runnable介面。

public class PollTask implements Runnable {

6.宣告一個私有的ConcurrentLinkedDeque屬性list,並指定它的泛型引數是String型的。

private ConcurrentLinkedDeque list;

7.實現類的構造器來初始化屬性。

public PollTask(ConcurrentLinkedDeque list) {
this.list=list;
}

8.實現run()方法。這個方法將列表中的10,000個字串取出,總共取5,000次,每次取兩個元素。

@Override
public void run() {
for (int i=0; i<5000; i++) {
list.pollFirst();
list.pollLast();
}
}

9.建立範例的主類Main,並新增main()方法。

public class Main {
public static void main(String[] args) {

10.建立ConcurrentLinkedDeque物件,並指定它的泛型引數是String型的。

ConcurrentLinkedDeque list=new
ConcurrentLinkedDeque<>();

11.建立執行緒陣列threads,它包含100個執行緒。

Thread threads[]=new Thread[100];

12.建立100個AddTask物件及其對應的執行執行緒。將每個執行緒存放到上一步建立的陣列中,然後啟動執行緒。

for (int i=0; i AddTask task=new AddTask(list);
threads[i]=new Thread(task);
threads[i].start();
}
System.out.printf("Main: %d AddTask threads have been
launched\n",threads.length);

13.使用join()方法等待執行緒完成。

for (int i=0; i<threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

14.將列表的元素數量列印到控制檯。

System.out.printf("Main: Size of the List: %d\n",list.size());

15.建立100個PollTask物件及其對應的執行執行緒。將每個執行緒存放到上一步建立的陣列中,然後啟動執行緒。

for (int i=0; i< threads.length; i++){
PollTask task=new PollTask(list);
threads[i]=new Thread(task);
threads[i].start();
}
System.out.printf("Main: %d PollTask threads have been
launched\n",threads.length);

16.使用join()方法等待執行緒完成。

for (int i=0; i<threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

17.將列表的元素數量列印到控制檯。

System.out.printf("Main: Size of the List: %d\n",list.size());

工作原理

本節使用的泛型引數是String類的ConcurrentLinkedDeque物件,用來實現一個非阻塞式併發資料列表。下面的截圖顯示了程式的執行結果。

首先,執行100個AddTask任務將元素新增到ConcurrentLinkedDeque物件list中。每個任務使用add()方法向這個列表中插入10,000個元素。add()方法將新元素新增到列表尾部。當所有任務執行完畢,列表中的元素數量將被列印到控制檯。在這一刻,列表中有1,000,000個元素。
接下來,執行100個PollTask任務將元素從列表中移除。每個任務使用pollFirst()和pollLast()方法從列表中移除10,000個元素。pollFirst()方法返回並移除列表中的第一個元素,pollLast()方法返回並移除列表中的最後一個元素。如果列表為空,這些方法返回null。當所有任務執行完畢,列表中的元素數量將被列印到控制檯。在這一刻,列表中有0個元素。

使用size()方法輸出列表中的元素數量。需要注意的是,這個方法返回的值可能不是真實的,尤其當有執行緒在新增資料或移除資料時,這個方法需要遍歷整個列表來計算元素數量,而遍歷過的資料可能已經改變。僅當沒有任何執行緒修改列表時,才能保證返回的結果是準確的。

更多資訊

ConcurrentLinkedDeque類提供了其他從列表中讀取資料的方法。

  • getFirst()和getLast():分別返回列表中第一個和最後一個元素,返回的元素不會從列表中移除。如果列表為空,這兩個方法丟擲NoSuchElementExcpetion異常。
  • peek()、peekFirst()和peekLast():分別返回列表中第一個和最後一個元素,返回的元素不會從列表中移除。如果列表為空,這些方法返回null。
  • remove()、removeFirst()和removeLast():分別返回列表中第一個和最後一個元素,返回的元素將會從列表中移除。如果列表為空,這些方法丟擲NoSuchElementExcpetion異常。

6.3 使用阻塞式執行緒安全列表

最基本的集合型別是列表。一個列表包含的元素數量不定,可以在任何位置新增、讀取或移除元素。併發列表允許不同的執行緒在同一時間新增或移除列表中的元素,而不會造成資料不一致。
在本節,你會學到如何在併發程式中使用阻塞式列表。阻塞式列表與非阻塞式列表的主要差別是:阻塞式列表在插入和刪除操作時,如果列表已滿或為空,操作不會被立即執行,而是將呼叫這個操作的執行緒阻塞佇列直到操作可以執行成功。Java引入了LinkedBlocking Deque類來實現阻塞式列表。
將要實現的範例包括以下兩個不同的任務:

  • 新增大量的資料到一個列表中;
  • 從同一個列表中移除大量的資料。

準備工作

本節的範例是在Eclipse IDE裡完成的。無論你使用Eclipse還是其他的IDE(比如NetBeans),都可以開啟這個IDE並且建立一個新的Java工程。

範例實現

按照接下來的步驟實現本節的範例。
1.建立名為Client的類,並實現Runnable介面。

public class Client implements Runnable{

2.宣告一個私有的LinkedBlockingDeque屬性requestList,並指定它的泛型引數是String型的。

private LinkedBlockingDeque requestList;

3.實現類的構造器來初始化屬性。

public Client (LinkedBlockingDeque requestList) {
this.requestList=requestList;
}

4.實現run()方法。使用requestList物件的put()方法,每兩秒向列表requestList中插入5個字串。重複3次。

@Override
public void run() {
for (int i=0; i<3; i++) {
for (int j=0; j<5; j++) {
StringBuilder request=new StringBuilder();
request.append(i);
request.append(":");
request.append(j);
try {
requestList.put(request.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Client: %s at %s.\n",request,new
Date());
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("Client: End.\n");
}

5. 建立範例的主類Main,並新增main()方法。

public class Main {
public static void main(String[] args) throws Exception {

6. 宣告並建立LinkedBlockingDeque屬性list,並指定它的泛型引數是String型的。

LinkedBlockingDeque list=new LinkedBlockingDeque<>(3);

7.將client作為傳入引數建立執行緒Thread並啟動。

Client client=new Client(list);
Thread thread=new Thread(client);
thread.start();

8.使用list物件的take()方法,每300毫秒從列表中取出3個字串物件,重複5次。在控制檯輸出字串。

for (int i=0; i for (int j=0; j<3; j++) {
String request=list.take();
System.out.printf("Main: Request: %s at %s. Size:
%d\n",request,new Date(),list.size());
}
TimeUnit.MILLISECONDS.sleep(300);
}

9.輸出一條表示程式結束的訊息。

System.out.printf("Main: End of the program.\n");

工作原理

本節使用的泛型引數是String的LinkedBlockingDeque物件,用來實現一個阻塞式併發資料列表。
Client類使用put()方法將字串插入到列表中。如果列表已滿(列表生成時指定了固定的容量),呼叫這個方法的執行緒將被阻塞直到列表中有了可用的空間。

Main類使用take()方法從列表中取字串。如果列表為空,呼叫這個方法的執行緒將被阻塞直到列表不為空(即有可用的元素)。

這個例子中使用了LinkedBlockingDeque物件的兩個方法,呼叫它們的執行緒可能會被阻塞,在阻塞時如果執行緒被中斷,方法會丟擲InterruptedException異常,所以必須捕獲和處理這個異常。

更多資訊

LinkedBlockingDeque類也提供了其他存取元素的方法,這些方法不會引起阻塞,而是丟擲異常或返回null。

  • takeFirst()和takeLast():分別返回列表中第一個和最後一個元素,返回的元素會從列表中移除。如果列表為空,呼叫方法的執行緒將被阻塞直到列表中有可用的元素出現。
  • getFirst()和getLast():分別返回列表中第一個和最後一個元素,返回的元素不會從列表中移除。如果列表為空,則丟擲NoSuchElementExcpetinon異常。
  • peek()、peekFirst()和peekLast():分別返回列表中第一個和最後一個元素,返回的元素不會從列表中移除。如果列表為空,返回null。
  • poll()、pollFirst()和pollLast():分別返回列表中第一個和最後一個元素,返回的元素將會從列表中移除。如果列表為空,返回null。
  • add()、addFirst()和addLast(): 分別將元素新增到列表中第一位和最後一位。如果列表已滿(列表生成時指定了固定的容量),這些方法將丟擲IllegalStateException異常。

參見

參見6.3節。


方 騰飛

花名清英,併發網(ifeve.com)創始人,暢銷書《Java併發程式設計的藝術》作者,螞蟻金服技術專家。目前工作於支付寶微貸事業部,關注網際網路金融,併發程式設計和敏捷實踐。微信公眾號aliqinying。