1. 程式人生 > >Java執行緒之fork/join框架

Java執行緒之fork/join框架

fork/join框架是用多執行緒的方式實現分治法來解決問題。fork指的是將問題不斷地縮小規模,join是指根據子問題的計算結果,得出更高層次的結果。

fork/join框架的使用有一定的約束條件:

1. 除了fork()  和  join()方法外,執行緒不得使用其他的同步工具。執行緒最好也不要sleep()

2. 執行緒不得進行I/O操作

3. 執行緒不得丟擲checked exception

此框架有幾個核心類:ForkJoinPool是實現了工作竊取演算法的執行緒池。ForkJoinTask是任務類,他有2個子類:RecursiveAction無返回值,RecursiveTask有返回值,在定義自己的任務時,一般都是從這2類中挑一個,通過繼承的方式定義自己的新類。由於ForkJoinTask類實現了Serializable介面,因此,定義自己的任務類時,應該定義serialVersionUID屬性。

在編寫任務時,推薦的寫法是這樣的:

  1. If (problem size > default size){  
  2. task s = divide(task);  
  3. execute(tasks);  
  4. else {  
  5. resolve problem using another algorithm;  
  6. }  

ForkJoinPool實現了工作竊取演算法(work-stealing),執行緒會主動尋找新建立的任務去執行,從而保證較高的執行緒利用率。它使用守護執行緒(deamon)來執行任務,因此無需對他顯示的呼叫shutdown()來關閉。一般情況下,一個程式只需要唯一的一個ForkJoinPool,因此應該按如下方式建立它:

static final ForkJoinPool mainPool = new ForkJoinPool(); //執行緒的數目等於CPU的核心數

下面給出一個非常簡單的例子,功能是將一個數組中每一個元素的值加1。具體實現為:將大陣列不斷分解為更短小的子陣列,當子陣列長度不超過10的時候,對其中所有元素進行加1操作。

  1. publicclass Test {  
  2.     publicfinalstatic ForkJoinPool mainPool = new ForkJoinPool();  
  3.     publicstaticvoid main(String[] args){  
  4.         int
     n = 26;  
  5.         int[] a = newint[n];  
  6.         for(int i=0; i<n; i++) {  
  7.             a[i] = i;  
  8.         }  
  9.         SubTask task = new SubTask(a, 0, n);  
  10.         mainPool.invoke(task);  
  11.         for(int i=0; i<n; i++) {  
  12.             System.out.print(a[i]+" ");  
  13.         }  
  14.     }  
  15. }  
  16. class SubTask extends RecursiveAction {  
  17.     privatestaticfinallong serialVersionUID = 1L;  
  18.     privateint[] a;  
  19.     privateint beg;  
  20.     privateint end;  
  21.     public SubTask(int[] a, int beg, int end) {  
  22.         super();  
  23.         this.a = a;  
  24.         this.beg = beg;  
  25.         this.end = end;  
  26.     }  
  27.     @Override
  28.     protectedvoid compute() {  
  29.         if(end-beg>10) {  
  30.             int mid = (beg+end) / 2;  
  31.             SubTask t1 = new SubTask(a, beg, mid);  
  32.             SubTask t2 = new SubTask(a, mid, end);  
  33.             invokeAll(t1, t2);  
  34.         }else {  
  35.             for(int i=beg; i<end; i++) {  
  36.                 a[i] = a[i] + 1;  
  37.             }  
  38.         }  
  39.     }  
  40. }  

例子2,任務擁有返回值。隨機生成一個數組,每個元素均是0-999之間的整數,統計該陣列中每個數字出現1的次數的和。

實現方法,將該陣列不斷的分成更小的陣列,直到每個子陣列的長度為1,即只包含一個元素。此時,統計該元素中包含1的個數。最後彙總,得到陣列中每個數字共包含了多少個1。

  1. publicclass Test {  
  2.     publicfinalstatic ForkJoinPool mainPool = new ForkJoinPool();  
  3.     publicstaticvoid main(String[] args){  
  4.         int n = 26;  
  5.         int[] a = newint[n];  
  6.         Random rand = new Random();  
  7.         for(int i=0; i<n; i++) {  
  8.             a[i] = rand.nextInt(1000);  
  9.         }  
  10.         SubTask task = new SubTask(a, 0, n);  
  11.         int count = mainPool.invoke(task);  
  12.         for(int i=0; i<n; i++) {  
  13.             System.out.print(a[i]+" ");  
  14.         }  
  15.         System.out.println("\n陣列中共出現了" + count + "個1");  
  16.     }  
  17. }  
  18. class SubTask extends RecursiveTask<Integer> {  
  19.     privatestaticfinallong serialVersionUID = 1L;  
  20.     privateint[] a;  
  21.     privateint beg;  
  22.     privateint end;  
  23.     public SubTask(int[] a, int beg, int end) {  
  24.         super();  
  25.         this.a = a;  
  26.         this.beg = beg;  
  27.         this.end = end;  
  28.     }  
  29.     @Override
  30.     protected Integer compute() {  
  31.         int result = 0;  
  32.         if(end-beg>1) {  
  33.             int mid = (beg+end)/2;  
  34.             SubTask t1 = new SubTask(a, beg, mid);  
  35.             SubTask t2 = new SubTask(a, mid, end);  
  36.             invokeAll(t1, t2);  
  37.             try {  
  38.                 result = t1.get()+t2.get();  
  39.             } catch (InterruptedException | ExecutionException e) {  
  40.                 e.printStackTrace();  
  41.             }  
  42.         } else {  
  43.             result = count(a[beg]);  
  44.         }  
  45.         return result;  
  46.     }  
  47.     //統計一個整數中出現了幾個1
  48.     privateint count(int n) {  
  49.         int result = 0;  
  50.         while(n>0) {  
  51.             if(n % 10==1) {  
  52.                 result++;  
  53.             }  
  54.             n = n / 10;  
  55.         }  
  56.         return result;  
  57.     }  
  58. }  

例子3,非同步執行任務。前面兩個例子都是同步執行任務,當啟動任務後,主執行緒陷入了阻塞狀態,直到任務執行完畢。若建立新任務後,希望當前執行緒能繼續執行而非陷入阻塞,則需要非同步執行。ForkJoinPool執行緒池提供了execute()方法來非同步啟動任務,而作為任務本身,可以呼叫fork()方法非同步啟動新的子任務,並呼叫子任務的join()方法來取得計算結果。需要注意的是,非同步使用ForkJoin框架,無法使用“工作竊取”演算法來提高執行緒的利用率,針對每個子任務,系統都會啟動一個新的執行緒。

本例的功能是查詢硬碟上某一型別的檔案。給定副檔名後,將硬碟上所有該型別的檔名列印顯示出來。作為主程式,啟動任務後,繼續顯示任務的執行進度,每3秒鐘列印顯示一個黑點,表示任務在繼續。最後,當所有執行緒都結束了,列印顯示結果。

  1. publicclass ThreadLocalTest {  
  2.     publicstaticvoid main(String[] args) throws Exception {  
  3.         Path p = Paths.get("D:/");  
  4.         List<Path> roots = (List<Path>) FileSystems.getDefault().getRootDirectories();  
  5.         List<Path> result = new ArrayList<>();  
  6.         List<MyTask> tasks = new ArrayList<>();  
  7.         ForkJoinPool pool = new ForkJoinPool();  
  8.         for(Path root:roots) {  
  9.             MyTask t = new MyTask(root, "pdf");  
  10.             pool.execute(t);  
  11.             tasks.add(t);  
  12.         }  
  13.         System.out.print("正在處理中");  
  14.         while(isAllDone(tasks) == false) {  
  15.             System.out.print(". ");  
  16.             TimeUnit.SECONDS.sleep(3);  
  17.         }  
  18.         for(MyTask t:tasks) {  
  19.             result.addAll(t.get());  
  20.         }  
  21. 相關推薦

    Java執行fork/join框架

    fork/join框架是用多執行緒的方式實現分治法來解決問題。fork指的是將問題不斷地縮小規模,join是指根據子問題的計算結果,得出更高層次的結果。 fork/join框架的使用有一定的約束條件: 1. 除了fork()  和  join()方法外,執行緒不得使用

    Java執行 Fork/Join框架的同步和非同步

                    在Fork/Join框架中,提交任務的時候,有同步和非同步兩種方式。以前使用的invokeAll(

    Java執行fork/join框架詳解

    這個框架的目的主要是更好地利用底層平臺上的多核CPU和多處理器來進行處理,解決問題時通常使用分治演算法或map/reduce演算法來進行.這個框架的名稱來源於使用時的兩個基本操作fork和join,可以類比於map/reduce中的map和reduce操作.fork操作的作

    Java併發(十)執行池&fork/join框架

    什麼是執行緒池 第四種獲取執行緒的方法:執行緒池,一個 ExecutorService,它使用可能的幾個池執行緒之一執行每個提交的任務,通常使用 Executors 工廠方法配置。 執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執

    定製併發類(七)實現ThreadFactory介面生成自定義的執行Fork/Join框架

    宣告:本文是《 Java 7 Concurrency Cookbook 》的第七章,作者: Javier Fernández González     譯者:許巧輝 實現ThreadFactory介面生成自定義的執行緒給Fork/Join框架 Fork/Join框架是Java7中最有趣的特徵之

    Java程式設計技術分享:Java併發Fork-Join框架分析

    1、什麼是Fork/Join框架 及產生背景 Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架。上邊是書上的定義。 我們用粗話說:Fork/Join是一個框架,來解決執行效率,手段是並行,但

    java執行Executor框架執行

    執行緒雖然在web開發中用的不算特別多,但在特定的情況下還是能發揮重要重要作用的,因此即使用的少還是掌握下比較好;下面先回顧下比較執行緒的常規實現方法 1 繼承Thread類 2 實現runnable介面(使用較多) java5之後有了新的執行緒實現方式,java5可以使用

    java執行wait(),notify(),notifyAll(),synchronized(obj){},join()

    wait(),notify(),notifyAll(),synchronized wait()和notify()是java.lang.Object的物件方法,synchronized是java關鍵字;wait()和notify()必須配合synchronized

    JAVA執行join)

    /** * Waits at most <code>millis</code> milliseconds for this thread to *

    Java執行join

    簡述 Thread類的join方法用來使main執行緒進入阻塞狀態,進而等待呼叫join方法的執行緒執行,join有三個過載方法: public final void join() 使主執行緒進

    java 執行執行狀態

    java執行緒狀態 1. 初始(NEW):新建立了一個執行緒物件,但還沒有呼叫start()方法。 2. 執行(RUNNABLE): Java執行緒中將就緒(ready)和執行中(running)兩種狀態籠統的稱為“執行”。 執行緒物件建立後,其他執行緒(比如main執行緒)呼叫了該物件

    Java執行semaphore和Exchanger

    Semaphore是Java執行緒的一個計數訊號量。我們可用於多執行緒的併發訪問控制。 就像我們常見的執行緒池,資料庫連線池就可以使用Semaphore進行邏輯的實現。Semaphore中我們就介紹兩個最常用的兩個方法。 acquire() 從Semaphore獲取許可,如果計數不小於0

    Java執行非同步回撥(Callback)

    ●介紹      有時候執行一個任務需要很長時間,單執行緒下執行緒會處於阻塞狀態。這個時候我們會考慮一種非阻塞的處理模式。非阻塞任務在任何程式語言裡都必不可少,Java也不例外。多執行緒就是一個很好的解決辦法。     

    java執行Latch設計模式見解

    CountDownLatch :(個人理解)使用閥門值,直到閥門值為0之前,一直阻塞執行緒。實則使用物件鎖,不釋放物件鎖,一直佔用資源,(這裡是一個缺點)。閥門值為0時,呼叫釋放物件鎖的方法,釋放資源。應用的場景,我覺得是需要一些執行緒先完成的前提下,再使用其他執行緒。也就是我就是要一些重要的執行緒

    Java執行Synchronized同步————Java學習路(17)

    前言——不進則退啊,部落格幾天沒寫,排名第一次下降了,得勤奮更新,不能偷懶。。 歡迎轉載,轉載請註明來處。 目錄 一.演示同步問題 此前,我們用的多執行緒都是較為簡單的例子,都沒有涉及到多個執行緒共享同一個物件或者資源的情況。倘若多執行緒共享資

    Java執行訊號量

     計數訊號量用來控制同時訪問某個特定資源的運算元量,或者同時執行某個指定操作的數量。計數訊號量還可以用來實現某種資源池,或者對容器施加邊界。  Semaphore中管理者一組虛擬的許可,許可的數量可通過建構函式來指定。在互相操作時可以首先獲得許可(只要還有剩餘的許可),並在使用以後釋放許可。

    Java執行閉鎖

     閉鎖是一種同步工具類,可以延遲執行緒的進度直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何執行緒能通過,當到達結束狀態時,這扇門會開啟並允許所有的執行緒通過。當閉鎖到達結束狀態後,將不會再改變狀態,因此這扇門將永遠保持開啟狀態。閉鎖可以用

    Java執行銀行存取款

    銀行的存取款可以分為存款和取款: 當去存款的時候會先顯示賬戶資訊,然後將存進去的錢和賬戶原有的錢數相加,返回存款之後賬戶資訊; 當去取款的時候會先顯示賬戶資訊,然後將取錢數和賬戶裡面的錢相對比,如果取

    Java執行執行池的使用

     在Java中進行非同步操作時,執行緒必不可少,但如果頻繁的建立、銷燬一個執行緒,這是很耗效能的,所以執行緒池就應運而生了,Java中主要有newFixedThreadPool、newCachedThreadPool、newSingleThreadExecuter及newSchedule

    Java執行執行

     Executor框架可以將任務的提交與任務的執行策略解耦開來。就像許多對複雜過程的解耦操作那樣,這種論斷多少有些言過其實了。雖然Executor框架為定製和修改執行策略提供了相當大的靈活性,但並非所有的任務都能適用所有的執行策略。有些型別的任務需要明確的指定執行策略。