1. 程式人生 > >Fork/Join框架(二)建立一個Fork/Join池

Fork/Join框架(二)建立一個Fork/Join池

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛

建立一個Fork/Join池

在這個指南中,你將學習如何使用Fork/Join框架的基本元素。它包括:

  • 建立一個ForkJoinPool物件來執行任務。
  • 建立一個ForkJoinPool執行的ForkJoinTask類。

你將在這個示例中使用Fork/Join框架的主要特點,如下:

  • 你將使用預設構造器建立ForkJoinPool。
  • 在這個任務中,你將使用Java API文件推薦的結構:
If (problem size < default size){
tasks=divide(task);
execute(tasks);
} else {
resolve problem using another algorithm;
}

  • 你將以一種同步方式執行任務。當一個任務執行2個或2個以上的子任務時,它將等待它們的結束。通過這種方式 ,正在執行這些任務的執行緒(工作執行緒)將會查詢其他任務(尚未執行的任務)來執行,充分利用它們的執行時間。
  • 你將要實現的任務將不會返回任何結果,所以你將使用RecursiveAction作為它們實現的基類。

準備工作

這個指南中的例子使用Eclipse IDE實現。如果你使用Eclipse或其他IDE,如NetBeans,開啟它並建立一個新的Java專案。

如何做…

在這個指南中,你將繼續實現一個任務來修改產品列表的價格。任務最初是負責更新一個佇列中的所有元素。你將會使用10作為參考大小,如果一個任務必須更新超過10個元素,這些元素將被劃分成兩個部分,並建立兩個任務來更新每個部分中的產品的價格。

按以下步驟來實現這個示例:

1.建立類Product,將用來儲存產品的名稱和價格。

public class Product {

2.宣告一個私有的String型別的屬性name和一個私有的double型別的屬性price。

private String name;
private double price;

3.實現這些方法,用來設定和獲取這兩個屬性的值。

public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {

this.price = price;
}

4.建立ProductListGenerator類,用來產生隨機產品的數列。

public class ProductListGenerator {

5.實現generate()方法。它接收一個數列大小 的int型別引數,返回一個產生產品數列的List<Product>物件。

public List<Product> generate (int size) {

6.建立返回產品數列的物件。

List<Product> ret=new ArrayList<Product>();

7.建立產品佇列。給所有產品賦予相同值。比如,10用來檢查程式是否工作得很好。

for (int i=0; i<size; i++){
Product product=new Product();
product.setName("Product"+i);
product.setPrice(10);
ret.add(product);
}
return ret;
}

8.建立Task類,指定它繼承RecursiveAction類。

public class Task extends RecursiveAction {

9.宣告類的序列版本UID。這個元素是必需的,因為RecursiveAction類的父類ForkJoinTask實現了Serializable介面。

private static final long serialVersionUID = 1L;

10.宣告一個私有的、List<Product>型別的屬性products。

private List&lt;Product&gt; products;

11.宣告兩個私有的、int型別的屬性first和last。這些屬性將決定這個任務產品的阻塞過程。

private int first;
private int last;

12.宣告一個私有的、double型別的屬性increment,用來儲存產品價格的增長。

private double increment;

13.實現這個類的構造器,初始化所有屬性。

public Task (List&lt;Product&gt; products, int first, int last, double increment) {
this.products=products;
this.first=first;
this.last=last;
this.increment=increment;
}

14.實現compute()方法 ,該方法將實現任務的邏輯。

@Override
protected void compute() {

15.如果last和first的差小於10(任務只能更新價格小於10的產品),使用updatePrices()方法遞增的設定產品的價格。

if (last-first<10) {
updatePrices();

16.如果last和first的差大於或等於10,則建立兩個新的Task物件,一個處理產品的前半部分,另一個處理產品的後半部分,然後在ForkJoinPool中,使用invokeAll()方法執行它們。

} else {
int middle=(last+first)/2;
System.out.printf("Task: Pending tasks:
%s\n",getQueuedTaskCount());
Task t1=new Task(products, first,middle+1, increment);
Task t2=new Task(products, middle+1,last, increment);
invokeAll(t1, t2);
}

17.實現updatePrices()方法。這個方法更新產品佇列中位於first值和last值之間的產品。

private void updatePrices() {
for (int i=first; i<last; i++){
Product product=products.get(i);
product.setPrice(product.getPrice()*(1+increment));
}
}

18.實現這個示例的主類,通過建立Main類,並實現main()方法。

public class Main {
public static void main(String[] args) {

19.使用ProductListGenerator類建立一個包括10000個產品的數列。

ProductListGenerator generator=new ProductListGenerator();
List<Product> products=generator.generate(10000);

20.建立一個新的Task物件,用來更新產品佇列中的產品。first引數使用值0,last引數使用值10000(產品數列的大小)。

Task task=new Task(products, 0, products.size(), 0.20);

21.使用無參構造器建立ForkJoinPool物件。

ForkJoinPool pool=new ForkJoinPool();

22.在池中使用execute()方法執行這個任務 。

pool.execute(task);

23.實現一個顯示關於每隔5毫秒池中的變化資訊的程式碼塊。將池中的一些引數值寫入到控制檯,直到任務完成它的執行。

do {
System.out.printf("Main: Thread Count: %d\n",pool.getActiveThreadCount());
System.out.printf("Main: Thread Steal: %d\n",pool.getStealCount());
System.out.printf("Main: Parallelism: %d\n",pool.getParallelism());
try {
TimeUnit.MILLISECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());

24.使用shutdown()方法關閉這個池。

pool.shutdown();

25.使用isCompletedNormally()方法檢查假設任務完成時沒有出錯,在這種情況下,寫入一條資訊到控制檯。

if (task.isCompletedNormally()){
System.out.printf("Main: The process has completed
normally.\n");
}

26.在增長之後,所有產品的價格應該是12。將價格不是12的所有產品的名稱和價格寫入到控制檯,用來檢查它們錯誤地增長它們的價格。

for (int i=0; i<products.size(); i++){
Product product=products.get(i);
if (product.getPrice()!=12) {
System.out.printf("Product %s: %f\n",product.getName(),product.getPrice());
}
}

27.寫入一條資訊到控制檯表明程式的結束。

System.out.println("Main: End of the program.\n");

它是如何工作的…

在這個示例中,你已經建立一個ForkJoinPool物件和一個在池中執行的ForkJoinTask類的子類。為了建立ForkJoinPool物件,你已經使用了無參構造器,所以它會以預設的配置來執行。它建立一個執行緒數等於計算機處理器數的池。當ForkJoinPool物件被建立時,這些執行緒被建立並且在池中等待,直到有任務到達讓它們執行。

由於Task類沒有返回結果,所以它繼承RecursiveAction類。在這個指南中,你已經使用了推薦的結構來實現任務。如果這個任務更新超過10產品,它將被分解成兩部分,並建立兩個任務,一個任務執行一部分。你已經在Task類中使用first和last屬性,用來了解這個任務要更新的產品佇列的位置範圍。你已經使用first和last屬性,只複製產品數列一次,而不是為每個任務建立不同的數列。

它呼叫invokeAll()方法,執行每個任務所建立的子任務。這是一個同步呼叫,這個任務在繼續(可能完成)它的執行之前,必須等待子任務的結束。當任務正在等待它的子任務(結束)時,正在執行它的工作執行緒執行其他正在等待的任務。在這種行為下,Fork/Join框架比Runnable和Callable物件本身提供一種更高效的任務管理。

ForkJoinTask類的invokeAll()方法是執行者(Executor)和Fork/Join框架的一個主要區別。在執行者框架中,所有任務被提交給執行者,而在這種情況下,這些任務包括執行和控制這些任務的方法都在池內。你已經在Task類中使用invokeAll()方法,它是繼承了繼承ForkJoinTask類的RecursiveAction類。

你使用execute()方法提交唯一的任務給這個池,用來所有產品數列。在這種情況下,它是一個非同步呼叫,而主執行緒繼續它的執行。

你已經使用ForkJoinPool類的一些方法,用來檢查正在執行任務的狀態和變化。基於這個目的,這個類包括更多的方法。參見有這些方法完整列表的監控一個Fork/Join池指南。

最後,與執行者框架一樣,你應該使用shutdown()方法結束ForkJoinPool。
 以下截圖顯示這個示例執行的一部分:

2

你可以看出任務正在完成它們的工作和產品價格的更新。

不止這些…

ForkJoinPool類提供其他的方法,用來執行一個任務。這些方法如下:

  • execute (Runnable task):這是在這個示例中,使用的execute()方法的另一個版本。在這種情況下,你可以提交一個Runnable物件給ForkJoinPool類。注意:ForkJoinPool類不會對Runnable物件使用work-stealing演算法。它(work-stealing演算法)只用於ForkJoinTask物件。
  • invoke(ForkJoinTask<T> task):當execute()方法使用一個非同步呼叫ForkJoinPool類,正如你在本示例中所學的,invoke()方法使用同步呼叫ForkJoinPool類。這個呼叫不會(立即)返回,直到傳遞的引數任務完成它的執行。
  • 你也可以使用在ExecutorService介面的invokeAll()和invokeAny()方法。這些方法接收一個Callable物件作為引數。ForkJoinPool類不會對Callable物件使用work-stealing演算法,所以你最好使用執行者去執行它們。

ForkJoinTask類同樣提供在示例中使用的invokeAll()的其他版本。這些版本如下:

  • invokeAll(ForkJoinTask<?>… tasks):這個版本的方法使用一個可變引數列表。你可以傳入許多你想要執行的ForkJoinTask物件作為引數。
  • invokeAll(Collection<T> tasks):這個版本的方法接收一個泛型型別T物件的集合(如:一個ArrayList物件,一個LinkedList物件或者一個TreeSet物件)。這個泛型型別T必須是ForkJoinTask類或它的子類。

即使ForkJoinPool類被設計成用來執行一個ForkJoinTask,你也可以直接執行Runnable和Callable物件。你也可以使用ForkJoinTask類的adapt()方法來執行任務,它接收一個Callable物件或Runnable物件(作為引數)並返回一個ForkJoinTask物件。 參見

  • 在第8章,測試併發應用程式中的監控一個Fork/Join池的指南