1. 程式人生 > >Java併發(十)執行緒池&fork/join框架

Java併發(十)執行緒池&fork/join框架

什麼是執行緒池

  1. 第四種獲取執行緒的方法:執行緒池,一個 ExecutorService,它使用可能的幾個池執行緒之一執行每個提交的任務,通常使用 Executors 工廠方法配置。
  2. 執行緒池可以解決兩個不同問題:由於減少了每個任務呼叫的開銷,它們通常可以在執行大量非同步任務時提供增強的效能,並且還可以提供繫結和管理資源(包括執行任務集時使用的執行緒)的方法。每個ThreadPoolExecutor 還維護著一些基本的統計資料,如完成的任務數。
  3. 為了便於跨大量上下文使用,此類提供了很多可調整的引數和擴充套件鉤子 (hook)。但是,強烈建議程式設計師使用較為方便的 Executors 工廠方法

    • Executors.newCachedThreadPool()(無界執行緒池,可以進行自動執行緒回收)
    • Executors.newFixedThreadPool(int)(固定大小執行緒池)
    • Executors.newSingleThreadExecutor()(單個後臺執行緒)
      它們均為大多數使用場景預定義了設定。

執行緒池的體系結構:

  1. java.util.concurrent.Executor:負責執行緒的使用與排程的根介面

    • |–>ExecutorService 子介面:執行緒池的主要介面
    • |–>ThreadPoolExecutor :執行緒池的實現類
    • |–>ScheduledExecutorService 子介面:負責執行緒池的排程
      • |–>ScheduledThreadPoolExecutor:繼承 ThreadPoolExecutor實現類,實現了SchedualedExecutorService子介面
        這裡寫圖片描述
  2. 工具類:java.util.concurrent.Executors

    • ExecutorService newFixedThreadPool():建立固定大小的執行緒池
    • ExecutorService newCachedThreadPool():快取線程池,執行緒池的數量數量不固定,根據自己的需要更改大小
    • ExecutorService newSingleThreadExecutor():建立單個執行緒池,執行緒池中只有一個執行緒

    • ScheduledExecutorService newSchedualedThreadPool():建立固定大小的執行緒,可以延遲定時的執行任務

舉個例子

普通的Runnable介面

建立一個繼承Runnable介面的類

// 測試的繼承Runnable介面的類
class RunDemo implements Runnable{

    @Override
    public void run() {
        for(int i=0;i<5;i++){
            System.out.println(Thread.currentThread().getName()+"==>"+i);
        }
    }

}

測試主函式:

 ExecutorService pool = Executors.newFixedThreadPool(3);//建立指定三個執行緒的執行緒池

       for(int i=0;i<3;i++){
            pool.submit(new RunDemo());
        }

        pool.shutdown();//當前任務執行完之後關閉
//        pool.shutdownNow();//強制關閉

帶返回值的Callable

         //使用callable普通任務,建立匿名內部類,測試submit
        List<Future<Integer>>list = new ArrayList<Future<Integer>>();
        for (int i = 0; i < 10; i++) {
            Future<Integer> future = pool.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int sum=0;
                    for(int j=0;j<100;j++){
                        sum+=j;
                    }
                    return sum;
                }
            });

            list.add(future);
        }

        for(Future<Integer>future:list){
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        pool.shutdown();

帶返回值的Callable的定時任務

 ScheduledExecutorService pool2 = Executors.newScheduledThreadPool(3);

//使用callable定時任務 
       for(int i=0;i<5;i++){


       Future<Integer> result = pool2.schedule(new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        int num = new Random().nextInt(100);//100以內的隨機數
                        return num;
                    }


       }, 1,TimeUnit.SECONDS);
       try {
        System.out.println(result.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
       }
       pool2.shutdown();

什麼是fork/join

Fork/Join 框架:就是在必要的情況下,將一個大任務,進行拆分(fork)成
若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進
行 join 彙總。

與執行緒池的區別

  1. 採用 “工作竊取”模式(work-stealing):
    當執行新的任務時它可以將其拆分分成更小的任務執行,並將小任務加
    到執行緒佇列中,然後再從一個隨機執行緒的佇列中偷一個並把它放在自己的隊
    列中。
  2. 相對於一般的執行緒池實現,fork/join框架的優勢體現在對其中包含的任務
    的處理方式上.在一般的執行緒池中,如果一個執行緒正在執行的任務由於某些
    原因無法繼續執行,那麼該執行緒會處於等待狀態。而在fork/join框架實現中,
    如果某個子問題由於等待另外一個子問題的完成而無法繼續執行。那麼處理
    該子問題的執行緒會主動尋找其他尚未執行的子問題來執行.這種方式減少了
    執行緒的等待時間,提高了效能。

建立自己的fork/join類


class CaculatorForkAndJoin extends RecursiveTask<Long>{

    /**
     * 建立serialVersionUID
     */
    private static final long serialVersionUID = 1L;

    private long start;
    private long end;
    private static final long THURSHOLD = 10000L;  //臨界值

    CaculatorForkAndJoin(long start,long end){
        this.start = start;
        this.end = end;
    }

    //重寫方法
    @Override
    protected Long compute() {
        long length = end - start;

        if(length <= THURSHOLD){
            long sum = new Long(0);

            for(long i = start;i<=end;i++){
                sum+=i;
            }

            return sum;
        }else{
            //中間值
            long mid = (start + end ) / 2;

            CaculatorForkAndJoin left = new CaculatorForkAndJoin(start, mid);
            left.fork();//進行拆分,同時壓入現執行緒佇列

            CaculatorForkAndJoin right = new CaculatorForkAndJoin(mid+1, end);
            right.fork();//進行拆分,同時壓入現執行緒佇列

            return left.join()+right.join();
        }
    }    
}

測試輸出:

public static void main(String[] args) {
        long start = System.currentTimeMillis();
        //建立 執行緒池
        ForkJoinPool pool = new ForkJoinPool();
        //建立任務
        CaculatorForkAndJoin task = new CaculatorForkAndJoin(0L,100000000L);

        //新增任務到執行緒池,獲得返回值
        long sum = pool.invoke(task);

        long end = System.currentTimeMillis();
        System.out.println(sum+"spend:"+(end - start));
    }
5000000050000000spend:1498