Java並發編程系列之二十八:CompletionService
CompletionService簡介
CompletionService與ExecutorService類似都可以用來執行線程池的任務,ExecutorService繼承了Executor接口,而CompletionService則是一個接口,那麽為什麽CompletionService不直接繼承Executor接口呢?主要是Executor的特性決定的,Executor框架不能完全保證任務執行的異步性,那就是如果需要實現任務(task)的異步性,只要為每個task創建一個線程就實現了任務的異步性。代碼往往包含new Thread(task).start()
。這種方式的問題在於,它沒有限制可創建線程的數量(在ExecutorService可以限制),不過,這樣最大的問題是在高並發的情況下,不斷創建線程異步執行任務將會極大增大線程創建的開銷
一般情況下,如果需要判斷任務是否完成,思路是得到Future列表的每個Future,然後反復調用其get方法,並將timeout參數設為0,從而通過輪詢的方式判斷任務是否完成。為了更精確實現任務的異步執行以及更簡便的完成任務的異步執行,可以使用CompletionService。
CompletionService實現原理
CompletionService實際上可以看做是Executor和BlockingQueue的結合體。CompletionService在接收到要執行的任務時,通過類似BlockingQueue的put和take獲得任務執行的結果。CompletionService的一個實現是ExecutorCompletionService,ExecutorCompletionService把具體的計算任務交給Executor完成。
在實現上,ExecutorCompletionService在構造函數中會創建一個BlockingQueue(使用的基於鏈表的無界隊列LinkedBlockingQueue),該BlockingQueue的作用是保存Executor執行的結果。當計算完成時,調用FutureTask的done方法。當提交一個任務到ExecutorCompletionService時,首先將任務包裝成QueueingFuture,它是FutureTask的一個子類,然後改寫FutureTask的done方法,之後把Executor執行的計算結果放入BlockingQueue中。QueueingFuture的源碼如下:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
從代碼可以看到,CompletionService將提交的任務轉化為QueueingFuture,並且覆蓋了done方法,在done方法中就是將任務加入任務隊列中。這點與之前對Executor框架的分析是一致的。
使用ExecutorService實現任務
代碼模擬了電商中加載商品詳情這一操作,因為商品屬性的多樣性,所以可以將商品的圖片顯示與商品簡介的顯示設為兩個獨立執行的任務。另外,由於商品的圖片可能有許多張,所以圖片的顯示往往比簡介顯示更慢。這個時候異步執行能夠在一定程度上加快執行的速度提高系統的性能。下面的代碼演示了這點:
package com.rhwayfun.patchwork.concurrency.r0410;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
* Created by rhwayfun on 16-4-10.
*/
public class DisplayProductInfoWithExecutorService {
//線程池
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
//日期格式器
private final DateFormat format = new SimpleDateFormat("HH:mm:ss");
// 模擬電商網站商品詳情的信息展示
// 由於可能商品的圖片可能會有很多張,所以顯示商品的圖片往往會有一定的延遲
// 除了商品的詳情外還包括商品簡介等信息的展示,由於這裏信息主要的是文字為
// 主,所以能夠比圖片更快顯示出來。下面的代碼就以執行這兩個任務為主線,完
// 成這兩個任務的執行。由於這兩個任務的執行存在較大差距,所以想到的第一個
// 思路就是異步執行,首先執行圖像的下載任務,之後(不會很久)開始執行商品
// 簡介信息的展示,如果網絡足夠好,圖片又不是很大的情況下,可能在開始展示
// 商品的時候圖像就下載完成了,所以自然想到使用Executor和Callable完成異
// 步任務的執行。
public void renderProductDetail() {
final List<ProductInfo> productInfos = loadProductImages();
//異步下載圖像的任務
Callable<List<ProductImage>> task = new Callable<List<ProductImage>>() {
@Override
public List<ProductImage> call() throws Exception {
List<ProductImage> imageList = new ArrayList<>();
for (ProductInfo info : productInfos){
imageList.add(info.getImage());
}
return imageList;
}
};
//提交給線程池執行
Future<List<ProductImage>> listFuture = executorService.submit(task);
//展示商品簡介的信息
renderProductText(productInfos);
try {
//顯示商品的圖片
List<ProductImage> imageList = listFuture.get();
renderProductImage(imageList);
} catch (InterruptedException e) {
// 如果顯示圖片發生中斷異常則重新設置線程的中斷狀態
// 這樣做可以讓wait中的線程喚醒
Thread.currentThread().interrupt();
// 同時取消任務的執行,參數false表示在線程在執行不中斷
listFuture.cancel(true);
} catch (ExecutionException e) {
try {
throw new Throwable(e.getCause());
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
private void renderProductImage(List<ProductImage> imageList ) {
for (ProductImage image : imageList){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " display products images! "
+ format.format(new Date()));
}
private void renderProductText(List<ProductInfo> productInfos) {
for (ProductInfo info : productInfos){
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " display products description! "
+ format.format(new Date()));
}
private List<ProductInfo> loadProductImages() {
List<ProductInfo> list = new ArrayList<>();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
ProductInfo info = new ProductInfo();
info.setImage(new ProductImage());
list.add(info);
System.out.println(Thread.currentThread().getName() + " load products info! "
+ format.format(new Date()));
return list;
}
/**
* 商品
*/
private static class ProductInfo{
private ProductImage image;
public ProductImage getImage() {
return image;
}
public void setImage(ProductImage image) {
this.image = image;
}
}
private static class ProductImage{}
public static void main(String[] args){
DisplayProductInfoWithExecutorService cd = new DisplayProductInfoWithExecutorService();
cd.renderProductDetail();
System.exit(0);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
代碼的執行結果如下:
在上面的代碼中,嘗試並行執行商品圖像的下載和簡介信息的任務的執行,雖然這種方式能夠完成任務,但是異構任務的並行對性能的提升還是有限的。考慮一種極端情況,商品圖片的下載的速度遠遠小於簡介信息的加載,那麽這種情況(通常兩者的加載速度的比例會是一個較大的值)下實際上任務的串行的執行效率就差不多了。而且使用了更復雜的代碼,得到的提升卻如此之小。只有當量相互獨立並且同構的任務可以並發處理時,對系統性能的提升才是巨大的 (因為加載圖片和簡介執行速度相差太大,所以不是同構的任務)。
使用CompletionService實現任務
使用CompletionService的一大改進就是把多個圖片的加載分發給多個工作單元進行處理,這樣通過分發的方式就縮小了商品圖片的加載與簡介信息的加載的速度之間的差距,讓這些小任務在線程池中執行,這樣就大大降低了下載所有圖片的時間,所以在這個時候可以認為這兩個任務是同構的。使用CompletionService完成最合適不過了。
package com.rhwayfun.patchwork.concurrency.r0410;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
* Created by rhwayfun on 16-4-10.
*/
public class DisplayProductInfoWithCompletionService {
//線程池
private final ExecutorService executorService;
//日期格式器
private final DateFormat format = new SimpleDateFormat("HH:mm:ss");
public DisplayProductInfoWithCompletionService(ExecutorService executorService) {
this.executorService = executorService;
}
public void renderProductDetail() {
final List<ProductInfo> productInfos = loadProductInfos();
CompletionService<ProductImage> completionService = new ExecutorCompletionService<ProductImage>(executorService);
//為每個圖像的下載建立一個工作任務
for (final ProductInfo info : productInfos) {
completionService.submit(new Callable<ProductImage>() {
@Override
public ProductImage call() throws Exception {
return info.getImage();
}
});
}
//展示商品簡介的信息
renderProductText(productInfos);
try {
//顯示商品圖片
for (int i = 0, n = productInfos.size(); i < n; i++){
Future<ProductImage> imageFuture = completionService.take();
ProductImage image = imageFuture.get();
renderProductImage(image);
}
} catch (InterruptedException e) {
// 如果顯示圖片發生中斷異常則重新設置線程的中斷狀態
// 這樣做可以讓wait中的線程喚醒
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
try {
throw new Throwable(e.getCause());
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
}
private void renderProductImage(ProductImage image) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " display products images! "
+ format.format(new Date()));
}
private void renderProductText(List<ProductInfo> productInfos) {
for (ProductInfo info : productInfos) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " display products description! "
+ format.format(new Date()));
}
private List<ProductInfo> loadProductInfos() {
List<ProductInfo> list = new ArrayList<>();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
ProductInfo info = new ProductInfo();
info.setImage(new ProductImage());
list.add(info);
System.out.println(Thread.currentThread().getName() + " load products info! "
+ format.format(new Date()));
return list;
}
/**
* 商品
*/
private static class ProductInfo {
private ProductImage image;
public ProductImage getImage() {
return image;
}
public void setImage(ProductImage image) {
this.image = image;
}
}
private static class ProductImage {
}
public static void main(String[] args) {
DisplayProductInfoWithCompletionService cd = new DisplayProductInfoWithCompletionService(Executors.newCachedThreadPool());
cd.renderProductDetail();
}
}
執行結果與上面的一樣。因為多個ExecutorCompletionService可以共享一個Executor,因此可以創建一個特定某個計算的私有的,又能共享公共的Executor的ExecutorCompletionService。
CompletionService小結
- 相比ExecutorService,CompletionService可以更精確和簡便地完成異步任務的執行
- CompletionService的一個實現是ExecutorCompletionService,它是Executor和BlockingQueue功能的融合體,Executor完成計算任務,BlockingQueue負責保存異步任務的執行結果
- 在執行大量相互獨立和同構的任務時,可以使用CompletionService
- CompletionService可以為任務的執行設置時限,主要是通過BlockingQueue的poll(long time,TimeUnit unit)為任務執行結果的取得限制時間,如果沒有完成就取消任務
Java並發編程系列之二十八:CompletionService