1. 程式人生 > >Fork/Join 框架的簡單使用

Fork/Join 框架的簡單使用

Fork/Join 框架是 Java 7 提供了的一個用於並行執行任務的框架,採用類似於分治演算法,就是把一個複雜的問題分成兩個或更多的相同或相似的子問題,直到最後子問題可以簡單的直接求解,原問題的解即子問題的解的合併。

Fork/Join 的使用也很簡單,一些 API 就跟執行緒池差不多。使用的時候要注意的是需要合理的“分而治之”。

Fork/Join 框架最常用的就是這四個類:

RecursiveAction 和 RecursiveTask 繼承了 ForkJoinTask 抽象類:

RecursiveAction 是沒有返回值的,RecursiveTask 是有返回值的。

看一個有返回值的小 Demo:

package test.demo2.deadlock;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;

/**
 * 計算 Task
 *
 * @author Dongguabai
 * @date 2018/12/24 16:55
 */
public class CalculatedForkJoinRecursiveTask extends RecursiveTask<Integer> {

    private final int start;
    private final int end;
    //拆分閾值
    private int maxThreshold = 3;

    public CalculatedForkJoinRecursiveTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    public CalculatedForkJoinRecursiveTask(int start, int end, int maxThreshold) {
        this.maxThreshold = maxThreshold;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if ((end - start) <= maxThreshold) {
            return IntStream.rangeClosed(start, end).sum();
        }
        int middle = (start + end) / 2;
        CalculatedForkJoinRecursiveTask left = new CalculatedForkJoinRecursiveTask(start, middle);
        CalculatedForkJoinRecursiveTask right = new CalculatedForkJoinRecursiveTask(middle + 1, end);
        left.fork();
        right.fork();
        return left.join() + right.join();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> future = forkJoinPool.submit(new CalculatedForkJoinRecursiveTask(0, 2));
        System.out.println(future.get());
    }
}

使用 RecursiveAction:

package test.demo2.deadlock;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

/**
 * 計算 Task
 *
 * @author Dongguabai
 * @date 2018/12/24 16:55
 */
public class CalculatedForkJoinRecursiveAction extends RecursiveAction {

    //計算結果
    private static AtomicInteger RESULT = new AtomicInteger();
    private final int start;
    private final int end;
    //拆分閾值
    private int maxThreshold = 3;

    public CalculatedForkJoinRecursiveAction(int start, int end) {
        this.start = start;
        this.end = end;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.submit(new CalculatedForkJoinRecursiveAction(0, 10));
        forkJoinPool.awaitTermination(1, TimeUnit.SECONDS);
        System.out.println(RESULT);
    }

    @Override
    protected void compute() {
        if ((end - start) <= maxThreshold) {
            RESULT.addAndGet(IntStream.rangeClosed(start, end).sum());
        } else {
            int middle = (start + end) / 2;
            CalculatedForkJoinRecursiveAction left = new CalculatedForkJoinRecursiveAction(start, middle);
            CalculatedForkJoinRecursiveAction right = new CalculatedForkJoinRecursiveAction(middle + 1, end);
            left.fork();
            right.fork();
        }
    }
}

參考資料:

https://www.jianshu.com/p/f32ee3e25c2d