1. 程式人生 > >實現為fork/join框架生成自定義執行緒的ThreadFactory介面

實現為fork/join框架生成自定義執行緒的ThreadFactory介面

Java 9併發程式設計指南 目錄

實現為fork/join框架生成自定義執行緒的ThreadFactory介面

fork/join框架是Java9中最有趣的特性之一,它是Executor和ExecutorService介面的實現,能夠直接執行Callable和Runnable任務,無需管理執行它們的執行緒。

此執行器目標是執行可以劃分為更小部分的任務,其主要組成部分如下:

  • 由ForkJoinTask類實現的特殊任務。
  • 提供兩個操作,用於將任務劃分為子任務(fork操作)和等待這些子任務結束(join操作)。
  • 它是一種命名為“工作竊取”的演算法,優化執行緒池的使用。當任務等待其子任務時,正在執行此任務的執行緒會去執行其它任務。

fork/join框架的主類是ForkJoinPool類。在內部包括如下兩個元素:

  • 等待被執行的任務佇列
  • 執行任務的執行緒池

ForkJoinWorkerThread向Thread類新增新方法,比如建立執行緒時執行的onStart()方法和清理執行緒使用資源時呼叫的onTermination()方法。ForkJoinPool類使用ForkJoinWorkerThreadFactory介面的實現來建立其使用的工作執行緒。

本節將學習如何實現在ForkJoinPool類中使用的定製工作執行緒,以及在繼承ForkJoinPool類和實現ForkJoinWorkerThreadFactory介面的工廠中如何使用此執行緒。

準備工作

本範例通過Eclipse開發工具實現。如果使用諸如NetBeans的開發工具,開啟並建立一個新的Java專案。

實現過程

通過如下步驟實現範例:

  1. 建立名為MyWorkerThread的類,繼承ForkJoinWorkerThread類:

    public class MyWorkerThread extends ForkJoinWorkerThread{
    
  2. 宣告和建立ThreadLocal屬性,由名為taskCounter的Integer類引數化:

    	private final static ThreadLocal<Integer> taskCounter=
    new ThreadLocal<Integer>();
  3. 實現類建構函式:

    	protected MyWorkerThread(ForkJoinPool pool) {
    		super(pool);
    	}
    
  4. 重寫onStart()方法,在其父類上呼叫此方法輸出資訊到控制條,且設定這個執行緒的taskCounter屬性值為零:

    	@Override
    	protected void onStart() {
    		super.onStart();
    		System.out.printf("MyWorkerThread %d: Initializing task counter.\n", getId());	
    		taskCounter.set(0);
    	}
    
  5. 重寫onTermination()方法,輸出此執行緒的taskCounter屬性值到控制檯:

    	@Override
    	protected void onTermination(Throwable exception) {
    		System.out.printf("MyWorkerThread %d: %d\n",
    		getId(),taskCounter.get());
    		super.onTermination(exception);
    	}
    
  6. 實現addTask()方法,遞增taskCounter屬性值:

    	public void addTask(){
    		taskCounter.set(taskCounter.get() + 1);;
    	}
    }
    
  7. 建立名為MyWorkerThreadFactory的類,實現ForkJoinWorkerThreadFactory介面。實現newThread()方法,建立和返回MyWorkerThread物件:

    public class MyWorkerThreadFactory  implements ForkJoinWorkerThreadFactory{
    	@Override
    	public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    		return new MyWorkerThread(pool);
    	}
    }
    
  8. 建立名為MyRecursiveTask的類,繼承Integer類引數化的RecursiveTask類:

    public class MyRecursiveTask extends RecursiveTask<Integer> {
    
  9. 定義名為array的私有int陣列:

    	private int array[];
    
  10. 定義名為start和end的兩個私有int屬性:

    	private int start, end;
    
  11. 實現類建構函式,初始化這些屬性:

    	public MyRecursiveTask(int array[],int start, int end) {
    		this.array=array;
    		this.start=start;
    		this.end=end;
    	}
    
  12. 實現compute()方法,將陣列中開始和結束位置之間所有元素相加。首先將正在執行任務的執行緒轉換成MyWorkerThread物件,並且使用addTask()為此執行緒遞增任務的計數:

    	@Override
    	protected Integer compute() {
    		Integer ret;
    		MyWorkerThread thread=(MyWorkerThread)Thread.currentThread();
    		thread.addTask();
    
  13. 如果陣列中開始和結束位置區間的元素數大於100,計算中間位置且建立兩個新的MyRecursiveTask任務分別處理前後兩部分。如果區間等於或小於100,計算開始和結束位置區間所有元素的和:

    		if (end-start>100) {
    			int mid=(start+end)/2;
    			MyRecursiveTask task1=new MyRecursiveTask(array,start,mid);
    			MyRecursiveTask task2=new MyRecursiveTask(array,mid,end);
    			invokeAll(task1,task2);
    			ret=addResults(task1,task2);
    		} else {
    			int add=0;
    			for (int i=start; i<end; i++) {
    				add+=array[i];
    			}
    			ret=add;
    		}
    
  14. 設定執行緒休眠10毫秒,返回任務結果:

    		try {
    			TimeUnit.MILLISECONDS.sleep(10);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		return ret;
    	}
    
  15. 實現addResults()介面,計算並返回作為引數接收的兩個任務的結果之和:

    	private Integer addResults(MyRecursiveTask task1, MyRecursiveTask task2) {
    		int value;
    		try {
    			value = task1.get().intValue()+task2.get().intValue();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    			value=0;
    		} catch (Exception e) {
    			e.printStackTrace();
    			value=0;
    		}	
    		return value;
    	}
    }
    
  16. 通過建立名為Main的類,新增main()方法,實現本範例主類:

    public class Main {
    	public static void main(String[] args)  throws Exception{
    
  17. 建立名為factory的MyWorkerThreadFactory物件:

    		MyWorkerThreadFactory factory=new MyWorkerThreadFactory();
    
  18. 建立名為pool的ForkJoinPool物件,傳遞之前建立的factory物件到建構函式:

    		ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);
    
  19. 建立100000個整數的陣列,初始化所有元素為1:

    		int array[]=new int[100000];
    		for (int i=0; i<array.length; i++){
    			array[i]=1;
    		}
    
  20. 建立新的task物件,計算陣列中所有元素的和:

    		MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);
    
  21. 使用execute()方法傳送任務到執行緒池:

    		pool.execute(task);
    
  22. 使用join()方法等待任務結束:

    		task.join();
    
  23. 使用shutdown()方法關閉執行緒池:

    		pool.shutdown();
    
  24. 使用awaitTermination()方法等待執行器結束:

    		pool.awaitTermination(1, TimeUnit.DAYS);
    
  25. 使用get()方法,輸出任務結果到控制檯:

    		System.out.printf("Main: Result: %d\n",task.get());
    
  26. 輸出指明程式結束的資訊到控制檯:

    		System.out.printf("Main: End of the program\n");
    	}
    }
    

工作原理

fork/join框架使用的執行緒稱為工作執行緒。 Java提供ForkJoinWorkerThread類,此類繼承了Thread類並實現fork/join框架使用的工作執行緒。

本節實現了繼承ForkJoinWorkerThread類的MyWorkerThread類,並重寫ForkJoinWorkerThread類的兩個方法。範例目標是在每個工作執行緒中實現任務計數器,以便了解一個工作執行緒已經執行多少任務。通過ThreadLocal屬性實現計數器,這樣每個執行緒都將以透明的方式擁有自己的計數器。

重寫ForkJoinWorkerThread類的onStart()方法初始化任務計數器,當工作執行緒開始執行時呼叫此方法。還重寫了onTermination()方法輸出任務計數器的值到控制檯,當工作執行緒結束執行時呼叫此方法。此外,在MyWorkerThread類中實現了addTask()方法,用來遞增每個執行緒的任務計數器。

ForkJoinPool類像Java併發API中所有執行器一樣,使用工廠來建立執行緒。所以如果在ForkJoinPool類中使用MyWorkerThread執行緒,需要實現自定義的執行緒工廠。對於fork/join框架框架,這個工廠需要實現ForkJoinPool.ForkJoinWorkerThreadFactory類,此類只有一個方法來建立新的MyWorkerThread物件。

最後,只要使用已經建立的工廠初始化ForkJoinPool類,也就是在Main類中所做的,使用ForkJoinPool類建構函式。

下圖顯示本範例在控制檯輸出的部分執行資訊:

pics/08_05.jpg

可以看到ForkJoinPool物件如何執行四個工作執行緒,以及每個執行緒執行多少個任務。

擴充套件學習

需要注意當執行緒正常結束或者丟擲異常時,才會呼叫ForkJoinWorkerThread類提供的onTermination()方法。此方法將Throwable物件作為引數接收,如果引數值為null,工作執行緒正常結束,但如果引數值不為空,執行緒丟擲異常,所以需要編寫必要的程式來處理這種情形。

更多關注

  • 第五章“Fork/Join框架”中的“建立fork/join池”小節
  • 第一章“執行緒管理”中的“工廠模式建立執行緒”小節