Fork/Join 框架的簡單使用
阿新 • • 發佈:2018-12-29
Fork/Join 框架是 Java 7 提供了的一個用於並行執行任務的框架,採用類似於分治演算法,就是把一個複雜的問題分成兩個或更多的相同或相似的子問題,直到最後子問題可以簡單的直接求解,原問題的解即子問題的解的合併。
Fork/Join 的使用也很簡單,一些 API 就跟執行緒池差不多。使用的時候要注意的是需要合理的“分而治之”。
Fork/Join 框架最常用的就是這四個類:
RecursiveAction 和 RecursiveTask 繼承了 ForkJoinTask 抽象類:
RecursiveAction 是沒有返回值的,RecursiveTask 是有返回值的。
看一個有返回值的小 Demo:
package test.demo2.deadlock; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.stream.IntStream; /** * 計算 Task * * @author Dongguabai * @date 2018/12/24 16:55 */ public class CalculatedForkJoinRecursiveTask extends RecursiveTask<Integer> { private final int start; private final int end; //拆分閾值 private int maxThreshold = 3; public CalculatedForkJoinRecursiveTask(int start, int end) { this.start = start; this.end = end; } public CalculatedForkJoinRecursiveTask(int start, int end, int maxThreshold) { this.maxThreshold = maxThreshold; this.start = start; this.end = end; } @Override protected Integer compute() { if ((end - start) <= maxThreshold) { return IntStream.rangeClosed(start, end).sum(); } int middle = (start + end) / 2; CalculatedForkJoinRecursiveTask left = new CalculatedForkJoinRecursiveTask(start, middle); CalculatedForkJoinRecursiveTask right = new CalculatedForkJoinRecursiveTask(middle + 1, end); left.fork(); right.fork(); return left.join() + right.join(); } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> future = forkJoinPool.submit(new CalculatedForkJoinRecursiveTask(0, 2)); System.out.println(future.get()); } }
使用 RecursiveAction:
package test.demo2.deadlock; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; /** * 計算 Task * * @author Dongguabai * @date 2018/12/24 16:55 */ public class CalculatedForkJoinRecursiveAction extends RecursiveAction { //計算結果 private static AtomicInteger RESULT = new AtomicInteger(); private final int start; private final int end; //拆分閾值 private int maxThreshold = 3; public CalculatedForkJoinRecursiveAction(int start, int end) { this.start = start; this.end = end; } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.submit(new CalculatedForkJoinRecursiveAction(0, 10)); forkJoinPool.awaitTermination(1, TimeUnit.SECONDS); System.out.println(RESULT); } @Override protected void compute() { if ((end - start) <= maxThreshold) { RESULT.addAndGet(IntStream.rangeClosed(start, end).sum()); } else { int middle = (start + end) / 2; CalculatedForkJoinRecursiveAction left = new CalculatedForkJoinRecursiveAction(start, middle); CalculatedForkJoinRecursiveAction right = new CalculatedForkJoinRecursiveAction(middle + 1, end); left.fork(); right.fork(); } } }
參考資料: