1. 程式人生 > >Java多執行緒(二)Java併發工具包concurrent例項簡述

Java多執行緒(二)Java併發工具包concurrent例項簡述

傳統的多執行緒並沒有提供高階特性,例如:訊號量、執行緒池和執行管理器等,而這些特性恰恰有助於建立強大的併發程式。新的Fork/Join框架針對當前的多核系統,也提供了並行程式設計的可能。這塊的內容是java多執行緒資訊量最大的一部分內容,本篇部落格循序漸進的,首先對concurrent包下的各模組常用類和介面進行例項展示,從使用中去了解concurrent包的內容。

1.Java併發工具的體系結構

併發工具包思維導圖 java併發工具包處於java.util.concurrent包中,從上圖中可以看出主要包括同步器、執行器、併發集合、Fork/Join框架、atomic包、locks包。 簡單來說如下所示:

  • tools同步器:為每種特定的同步問題提供瞭解決方案
  • executor 執行器:用來管理執行緒的執行
  • collections併發集合:提供了集合框架中集合的併發版本
  • atomic包:提供了不需要鎖既可以完成併發環境使用的原子性操作
  • locks包:使用Lock介面為併發程式設計提供了同步的另外一種替代方案
  • Fork/Join框架:提供了對並行程式設計的支援

2.同步器

2.1 Semaphore訊號量

描述了一個在作業系統當中比較經典的訊號量,主要作用是通過計數器來控制對共享資源的訪問。

  • 常用API
  1. Semaphore(int count):建構函式,建立擁有count個許可證的訊號量。
  2. acquire()/acquire(int num):獲取1/num個許可證
  3. release()/release(int num):釋放1/num個許可證
  • 例項使用
import cn.ji2h.util.LogUtil;

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {

    public static void main(String[] args){
    	//定義兩個許可證
        Semaphore semaphore = new Semaphore(2);

        Persion p1 = new Persion(semaphore,"A");
        p1.start();

        Persion p2 = new Persion(semaphore,"B");
        p2.start();

        Persion p3 = new Persion(semaphore,"C");
        p3.start();
    }

}

class Persion extends Thread{
    private Semaphore semaphore;

    public Persion(Semaphore semaphore,String name){
        setName(name);
        this.semaphore = semaphore;
    }

    public void run(){
        LogUtil.logger.info(getName() + " is waiting ......");
        try {
            semaphore.acquire();
            LogUtil.logger.info(getName() + " is servicing ......");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LogUtil.logger.info(getName() + " is done!");
        semaphore.release();
    }
}

模擬銀行櫃員服務的例子,三個客戶,兩個櫃檯。對應的三個執行緒,兩個資源。

  • 執行結果
2018-10-02 16:28:41,229 [C] INFO  cn.ji2h.util.LogUtil - C is waiting ......
2018-10-02 16:28:41,229 [A] INFO  cn.ji2h.util.LogUtil - A is waiting ......
2018-10-02 16:28:41,229 [B] INFO  cn.ji2h.util.LogUtil - B is waiting ......
2018-10-02 16:28:41,231 [A] INFO  cn.ji2h.util.LogUtil - A is servicing ......
2018-10-02 16:28:41,231 [C] INFO  cn.ji2h.util.LogUtil - C is servicing ......
2018-10-02 16:28:42,235 [C] INFO  cn.ji2h.util.LogUtil - C is done!
2018-10-02 16:28:42,235 [A] INFO  cn.ji2h.util.LogUtil - A is done!
2018-10-02 16:28:42,236 [B] INFO  cn.ji2h.util.LogUtil - B is servicing ......
2018-10-02 16:28:43,240 [B] INFO  cn.ji2h.util.LogUtil - B is done!

2.2 CountDownLatch同步器

必須發生指定數量的時間後才可以繼續執行

  • 常用API
  1. CountDownLatch(int count):必須發生count個數量事件才可以開啟鎖存器
  2. await():等待鎖存器
  3. countDown():觸發事件
  • 例項使用

import cn.ji2h.util.LogUtil;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
    public static void main(String[] args){
        //建立計數栓
        CountDownLatch countDownLatch = new CountDownLatch(3);
        //建立三個執行緒
        new Racer(countDownLatch,"A").start();
        new Racer(countDownLatch,"B").start();
        new Racer(countDownLatch,"C").start();

        //開始倒計時3...2...1...
        for(int i = 0; i<3 ;i++){
            try{
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LogUtil.logger.info((3 - i)+ "");
            if(i == 2){
                LogUtil.logger.info("Start");
            }
            //出發執行起跑
            countDownLatch.countDown();
        }

    }
}

class Racer extends Thread{
    private CountDownLatch countDownLatch;

    public Racer(CountDownLatch countDownLatch,String name){
        setName(name);
        this.countDownLatch = countDownLatch;
    }

    public void run(){
        try{
            countDownLatch.await();
            for (int i = 0;i<3 ;i ++){
                LogUtil.logger.info(getName() + " run : " + i);
            }
        } catch(InterruptedException e){
            e.printStackTrace();
        }

    }

}

模擬倒計數3-2-1-start後,三個執行緒開始跑步的例子。

  • 執行結果
2018-10-02 16:37:23,316 [main] INFO  cn.ji2h.util.LogUtil - 3
2018-10-02 16:37:24,323 [main] INFO  cn.ji2h.util.LogUtil - 2
2018-10-02 16:37:25,324 [main] INFO  cn.ji2h.util.LogUtil - 1
2018-10-02 16:37:25,324 [main] INFO  cn.ji2h.util.LogUtil - Start
2018-10-02 16:37:25,324 [A] INFO  cn.ji2h.util.LogUtil - A : 0
2018-10-02 16:37:25,324 [B] INFO  cn.ji2h.util.LogUtil - B : 0
2018-10-02 16:37:25,325 [B] INFO  cn.ji2h.util.LogUtil - B : 1
2018-10-02 16:37:25,325 [B] INFO  cn.ji2h.util.LogUtil - B : 2
2018-10-02 16:37:25,324 [A] INFO  cn.ji2h.util.LogUtil - A : 1
2018-10-02 16:37:25,324 [C] INFO  cn.ji2h.util.LogUtil - C : 0
2018-10-02 16:37:25,325 [C] INFO  cn.ji2h.util.LogUtil - C : 1
2018-10-02 16:37:25,325 [A] INFO  cn.ji2h.util.LogUtil - A : 2
2018-10-02 16:37:25,325 [C] INFO  cn.ji2h.util.LogUtil - C : 2

2.3 CyclicBarrier迴圈屏障同步器

適合於只有多個執行緒都到達與定點時才可以繼續執行

  • 常用API
  1. CyclicBarrier(int num);等待執行緒的數量
  2. CyclicBarrier(int num, Runnable action):等待執行緒的數量以及所有執行緒到達後的操作
  3. await():到達臨界點後暫停執行緒
  • 例項使用
import cn.ji2h.util.LogUtil;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    public static void main(String[] args){
        //定義一個迴圈屏障,這定義了三個
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
            public void run() {
                LogUtil.logger.info("Game start!");
            }
        });

        //建立三個工作執行緒
        new Player(cyclicBarrier,"A").start();
        new Player(cyclicBarrier,"B").start();
        new Player(cyclicBarrier,"C").start();
    }
}

class Player extends Thread{
    private CyclicBarrier cyclicBarrier;
    public Player(CyclicBarrier cyclicBarrier,String name){
        setName(name);
        this.cyclicBarrier = cyclicBarrier;
    }

    public void run(){
        LogUtil.logger.info(getName() + " is waiting other players ...");

        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

模擬鬥地主,三個人都準備好了再開始比賽的場景。每個執行緒在迴圈屏障處等待,主執行緒通過定義的迴圈屏障後繼續執行。

2.4 Exchanger交換器同步器

簡化兩個執行緒間資料的交換

  • 常用API
  1. Exchanger:指定進行交換的資料型別
  2. V exchange(V object):等待執行緒到達,交換資料。
  • 例項使用
import cn.ji2h.util.LogUtil;

import java.util.concurrent.Exchanger;

public class ExchangerDemo {
    public static void main(String[] args){
        //定義一個交換器
        Exchanger<String> exchanger = new Exchanger<String>();

        //A或B誰先執行不確定
        //若A先執行則A-B-B-A-A-B執行順序
        //若B先執行則B-A-A-B-B-A執行順序
        new A(exchanger).start();
        new B(exchanger).start();
    }
}

class A extends Thread{
    private Exchanger<String> exchanger;
    public A(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    public void run(){
        String str = null;
        try {
            str = exchanger.exchange("Hello");
            LogUtil.logger.info(str);

            str = exchanger.exchange("A");
            LogUtil.logger.info(str);

            str = exchanger.exchange("B");
            LogUtil.logger.info(str);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}


class B extends Thread{
    private Exchanger<String> exchanger;
    public B(Exchanger<String> exchanger){
        this.exchanger = exchanger;
    }
    public void run(){
        String str = null;
        try {
            str = exchanger.exchange("Hi");
            LogUtil.logger.info(str);

            str = exchanger.exchange("1");
            LogUtil.logger.info(str);

            str = exchanger.exchange("2");
            LogUtil.logger.info(str);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
  • 執行結果
2018-10-02 17:09:26,396 [Thread-1] INFO  cn.ji2h.util.LogUtil - Hello
2018-10-02 17:09:26,396 [Thread-0] INFO  cn.ji2h.util.LogUtil - Hi
2018-10-02 17:09:26,398 [Thread-0] INFO  cn.ji2h.util.LogUtil - 1
2018-10-02 17:09:26,398 [Thread-1] INFO  cn.ji2h.util.LogUtil - A
2018-10-02 17:09:26,398 [Thread-1] INFO  cn.ji2h.util.LogUtil - B
2018-10-02 17:09:26,398 [Thread-0] INFO  cn.ji2h.util.LogUtil - 2

2.5 Phsaer同步器

工作方式與CyclicBarrier類似,但是可以定義多個階段

  • 常用API
  1. Phaser()/Phaser(int num):使用指定0/num個party建立Phaser
  2. register():註冊party
  3. arriveAndAdvance():到達時等待所有party到達
  4. arriveAndDeregister():到達時登出執行緒自己
  • 例項使用
import cn.ji2h.util.LogUtil;

import java.util.concurrent.Phaser;

public class PhaserDemo {
    public static void main(String[] args){

        Phaser phaser = new Phaser(1);

        LogUtil.logger.info("Starting ...");

        //假設三個階段
        new Worker(phaser,"服務員").start();
        new Worker(phaser,"廚師").start();
        new Worker(phaser,"上菜").start();

        for (int i = 1;i<= 3;i++){
            //一共三個訂單,對於每個訂單,必須要求三個階段都完成才可以進行下個訂單
            phaser.arriveAndAwaitAdvance();
            LogUtil.logger.info("order " + i + " finished!");
        }

        phaser.arriveAndDeregister();
        LogUtil.logger.info("all done!");
    }
}

class Worker extends Thread{
    private Phaser phaser;

    public Worker(Phaser phaser,String name) {
        this.setName(name);
        this.phaser = phaser;
        //將自己註冊進來
        phaser.register();
    }

    public void run(){
        for (int i = 1;i<=3;i++){
            LogUtil.logger.info(" current order is : " + i + ":" + getName());

            if(i == 3){
                //都處理完了登出自己
                phaser.arriveAndDeregister();
            }else {
                //自己處理完了,等待其他的階段處理
                phaser.arriveAndAwaitAdvance();
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 執行結果
2018-10-02 17:31:05,755 [main] INFO  cn.ji2h.util.LogUtil - Starting ...
2018-10-02 17:31:05,757 [服務員] INFO  cn.ji2h.util.LogUtil -  current order is : 1:服務員
2018-10-02 17:31:05,758 [廚師] INFO  cn.ji2h.util.LogUtil -  current order is : 1:廚師
2018-10-02 17:31:05,758 [上菜] INFO  cn.ji2h.util.LogUtil -  current order is : 1:上菜
2018-10-02 17:31:05,758 [main] INFO  cn.ji2h.util.LogUtil - order 1 finished!
2018-10-02 17:31:06,761 [上菜] INFO  cn.ji2h.util.LogUtil -  current order is : 2:上菜
2018-10-02 17:31:06,763 [廚師] INFO  cn.ji2h.util.LogUtil -  current order is : 2:廚師
2018-10-02 17:31:06,763 [服務員] INFO  cn.ji2h.util.LogUtil -  current order is : 2:服務員
2018-10-02 17:31:06,764 [main] INFO  cn.ji2h.util.LogUtil - order 2 finished!
2018-10-02 17:31:07,768 [廚師] INFO  cn.ji2h.util.LogUtil -  current order is : 3:廚師
2018-10-02 17:31:07,768 [服務員] INFO  cn.ji2h.util.LogUtil -  current order is : 3:服務員
2018-10-02 17:31:07,768 [上菜] INFO  cn.ji2h.util.LogUtil -  current order is : 3:上菜
2018-10-02 17:31:07,768 [main] INFO  cn.ji2h.util.LogUtil - order 3 finished!
2018-10-02 17:31:07,768 [main] INFO  cn.ji2h.util.LogUtil - all done!

3.執行器

3.1基本概念

  • 用於啟動並控制執行緒的執行
  • 核心介面Executor,包含一個execute(Runnable)用於指定被執行的執行緒
  • ExecutorService介面用於控制執行緒執行和管理執行緒
  • 預定義瞭如下執行器:ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool

3.2 Callable與Future

Callable<V>:表示具有返回值的執行緒,
V:表示返回值型別 
call():執行任務 

Future<V>:表示Callable的返回值, 
V:返回值型別 
get():獲取返回值
  • 例項使用
import cn.ji2h.util.LogUtil;

import java.util.concurrent.*;

public class ExecutorDemo {

    public static void main(String args[]){
        //定義執行器,固定了2個執行緒的一個執行緒池
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Future<Integer> r1 = executorService.submit(new MC(1,100));
        Future<Integer> r2 = executorService.submit(new MC(100,10000));
        try {
            LogUtil.logger.info(r1.get() + "   " + r2.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        executorService.shutdown();
    }


}

class MC implements Callable<Integer> {

    private int begin,end;
    public MC(int begin,int end){
        this.begin = begin;
        this.end = end;
    }

    @Override
    public Integer call() throws Exception {

        int sum = 0;

        for (int i= begin; i<end; i++){
            sum += i;
        }
        return sum;
    }
}
  • 執行結果
2018-10-02 17:38:58,803 [main] INFO  cn.ji2h.util.LogUtil - 4950   49990050

4.鎖

java.util.concurrent.locks包中提供了對鎖的支援。鎖的作用是為使用synchronized控制對資源訪問提供了替代機制。

  1. 基本操作模型:訪問資源之前申請鎖,訪問完畢之後釋放鎖。
  2. lock/tryLock:申請鎖
  3. unlock:釋放鎖
  4. Lock是一個介面,具體鎖類ReentrantLock實現了Lock介面 4 Lock是一個介面,具體鎖類ReentrantLock實現了Lock介面
  • 使用例項
mport cn.ji2h.util.LogUtil;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockDemo {

    public static void main(String args[]){
        new MT().start();
        new MT().start();
        new MT().start();
        new MT().start();
    }

}


class Data{
    static int i = 0;

    static Lock lock = new ReentrantLock();

    static void operate(){
        lock.lock();
        i++;
        LogUtil.logger.info(i + "");
        lock.unlock();
    }
}

class MT extends Thread{
    public void run(){
        while(true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Data.operate();
        }
    }
}
  • 執行結果
2018-10-02 20:36:52,143 [Thread-0] INFO  cn.ji2h.util.LogUtil - 1
2018-10-02 20:36:52,144 [Thread-1] INFO  cn.ji2h.util.LogUtil - 2
2018-10-02 20:36:52,144 [Thread-2] INFO  cn.ji2h.util.LogUtil - 3
2018-10-02 20:36:52,144 [Thread-3] INFO  cn.ji2h.util.LogUtil - 4
2018-10-02 20:36:53,148 [Thread-0] INFO  cn.ji2h.util.LogUtil - 5
2018-10-02 20:36:53,149 [Thread-1] INFO  cn.ji2h.util.LogUtil - 6
2018-10-02 20:36:53,150 [Thread-2] INFO  cn.ji2h.util.LogUtil - 7
2018-10-02 20:36:53,150 [Thread-3] INFO  cn.ji2h.util.LogUtil - 8
2018-10-02 20:36:54,153 [Thread-0] INFO  cn.ji2h.util.LogUtil - 9
2018-10-02 20:36:54,154 [Thread-1] INFO  cn.ji2h.util.LogUtil - 10
2018-10-02 20:36:54,154 [Thread-2] INFO  cn.ji2h.util.LogUtil - 11
2018-10-02 20:36:54,154 [Thread-3] INFO  cn.ji2h.util.LogUtil - 12

5.原子操作

這裡的例項是4中的變種 java.util.concurrent.atom包中提供了對原子操作的支援,原子操作主要是提供了不需要鎖以及其他同步機制就可以進行的一些不可中斷操作。通過原子操作,能夠簡化對鎖的一些控制。主要操作為:獲取、設定、比較等。

  • 使用例項
import cn.ji2h.util.LogUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class AtomDemo {
    public static void main(String args[]){
        new MT1().start();
        new MT1().start();
        new MT1().start();
        new MT1().start();
    }
}

class AtomData{
    static int i = 0;

    static AtomicInteger ai = new AtomicInteger(0);

    static void operate(){
        LogUtil.logger.info(ai.incrementAndGet() + "");
    }
}

class MT1 extends Thread{
    public void run(){
        while(true){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            AtomData.operate();
        }
    }
}
  • 執行結果
2018-10-02 20:46:02,220 [Thread-3] INFO  cn.ji2h.util.LogUtil - 1
2018-10-02 20:46:02,220 [Thread-1] INFO  cn.ji2h.util.LogUtil - 4
2018-10-02 20:46:02,220 [Thread-0] INFO  cn.ji2h.util.LogUtil - 2
2018-10-02 20:46:02,220 [Thread-2] INFO  cn.ji2h.util.LogUtil - 3
2018-10-02 20:46:03,225 [Thread-1] INFO  cn.ji2h.util.LogUtil - 7
2018-10-02 20:46:03,225 [Thread-2] INFO  cn.ji2h.util.LogUtil - 8
2018-10-02 20:46:03,225 [Thread-0] INFO  cn.ji2h.util.LogUtil - 5
2018-10-02 20:46:03,225 [Thread-3] INFO  cn.ji2h.util.LogUtil - 6
2018-10-02 20:46:04,230 [Thread-2] INFO  cn.ji2h.util.LogUtil - 10
2018-10-02 20:46:04,230 [Thread-3] INFO  cn.ji2h.util.LogUtil - 12
2018-10-02 20:46:04,230 [Thread-0] INFO  cn.ji2h.util.LogUtil - 11
2018-10-02 20:46:04,230 [Thread-1] INFO  cn.ji2h.util.LogUtil - 9

執行緒執行順序可忽視

6. 併發集合

這裡我們使用Concurrent進行舉例說明

ConcurrentHashMap 和 java.util.HashTable 類很相似,但 ConcurrentHashMap 能夠提供比 HashTable 更好的併發效能。在你從中讀取物件的時候 ConcurrentHashMap 並不會把整個 Map 鎖住。此外,在你向其中寫入物件的時候,ConcurrentHashMap 也不會鎖住整個 Map。它的內部只是把 Map中正在被寫入的部分進行鎖定。

  • 使用例項
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class ConcurrentHashMapDemo {
    private static int INPUT_NUMBER = 100000;
    public static void main(String[] args) throws InterruptedException {
//        Map<Integer, String> map = new Hashtable<>(12 * INPUT_NUMBER);
        Map<Integer, String> map = new ConcurrentHashMap<>(12 * INPUT_NUMBER);
        long begin = System.currentTimeMillis();
        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            service.execute(new InputWorker(map, i));
        }
        service.shutdown();
        service.awaitTermination(1, TimeUnit.DAYS);
        long end = System.currentTimeMillis();
        System.out.println("span time = "+(end-begin)+", map size = "+map.size());
    }

    private static class InputWorker implements Runnable {
        private static Random rand = new Random(System.currentTimeMillis());
        private final Map<Integer, String> map;
        private final int flag;

        private InputWorker(Map<Integer, String> map, int begin) {
            this.map = map;
            this.flag = begin;
        }

        @Override
        public void run() {
            int input = 0;
            while (input < INPUT_NUMBER) {
                int x = rand.nextInt();
                if (!map.containsKey(x)) {
                    map.put(x, "Alex Wang" + x);
                    input++;
                }
            }
            System.out.println("InputWorker" + flag + " is over.");
        }
    }
}
  • 執行結果
InputWorker1 is over.
InputWorker3 is over.
InputWorker4 is over.
InputWorker8 is over.
InputWorker2 is over.
InputWorker0 is over.
InputWorker6 is over.
InputWorker9 is over.
InputWorker7 is over.
InputWorker5 is over.
span time = 324, map size = 1000000

這段程式碼中,生成10個子執行緒,每個執行緒向map中插入10萬個鍵值對,然後計算耗時。ConcurrentHashMap平均耗時324ms,Hashtable平均耗時680ms,如此看來在我的普通PC機(i7,4核,16G記憶體)上,ConcurrentHashMap的耗時僅佔Hashtable耗時的不到一半。

7.Fork、Join框架

7.1 Fork/Join框架中的主要類

  • ForkJoinTask:描述任務的抽象類
  • ForkJoinPool:管理ForkJoinTask的執行緒池
  • RecursiveAction:ForkJoinTask子類,描述無返回值的任務
  • RecursiveTask:ForkJoinTask子類,描述有返回值的任務

7.2分而治之策略

  • 將任務遞迴劃分成更小的子任務,直到子任務足夠小,從而能夠被連續地處理掉為止。
  • 優勢是處理過程可以使用並行發生,這種情況特別適合基於多核處理器的並行程式設計
  • 根據Java API中定義,分而治之的建議臨界點定義在100-1000個操作(加減乘除等)中的某個位置

7.3 計算1-1000000的和

  • 使用例項
import cn.ji2h.util.LogUtil;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo {
    public static void main(String[] args){
        ForkJoinPool forkJoinPool  = new ForkJoinPool();
        Future<Long> result = forkJoinPool.submit(new MTask(0,1000001));
        try {
            LogUtil.logger.info(result.get() + "");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        forkJoinPool.shutdown();
    }

}


class MTask extends RecursiveTask<Long>{

    public static final int THRESHOLD = 1000;

    private int begin,end;

    public MTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;
        //判斷任務是否滿足條件1000個以內的操作
        if((end - begin) <= THRESHOLD){
            for(int i = begin;i<end;i++){
                sum += i;
            }
        } else {
            int mid =  (begin +end)/2;
            MTask left = new MTask(begin,mid);
            left.fork();
            MTask right = new MTask(mid +1,end);
            right.fork();

            //將兩個子結果合併
            Long lr = left.join();
            LogUtil.logger.info(begin + "-" + mid + ":" + lr);
            Long rr = right.join();
            LogUtil.logger.info((mid + 1) + "-" + end + ":" + rr);

            sum = lr + rr;

        }

        return sum;
    }
}
  • 執行結果
···
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-2] INFO  cn.ji2h.util.LogUtil - 999026-1000001:974525175
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-2] INFO  cn.ji2h.util.LogUtil - 998049-1000001:1949096799
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-2] INFO  cn.ji2h.util.LogUtil - 996096-1000001:3894383295
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-2] INFO  cn.ji2h.util.LogUtil - 992190-1000001:7773525378
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-2] INFO  cn.ji2h.util.LogUtil - 984377-1000001:15487070286
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-2] INFO  cn.ji2h.util.LogUtil - 968752-1000001:30730249947
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-2] INFO  cn.ji2h.util.LogUtil - 937502-1000001:60484937394
2018-10-02 21:30:01,688 [ForkJoinPool-1-worker-0] INFO  cn.ji2h.util.LogUtil - 843752-875001:26827999947
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-0] INFO  cn.ji2h.util.LogUtil - 812502-875001:52680437394
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-0] INFO  cn.ji2h.util.LogUtil - 750002-875001:101458624788
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-0] INFO  cn.ji2h.util.LogUtil - 875002-1000001:117067624788
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-3] INFO  cn.ji2h.util.LogUtil - 750002-1000001:218526249576
2018-10-02 21:30:01,689 [ForkJoinPool-1-worker-1] INFO  cn.ji2h.util.LogUtil - 500001-1000001:374616999162
2018-10-02 21:30:01,694 [main] INFO  cn.ji2h.util.LogUtil - 499488998835

8.參考連結