1. 程式人生 > >【小家Java】Future、FutureTask、CompletionService、CompletableFuture解決多執行緒併發中歸集問題的效率對比

【小家Java】Future、FutureTask、CompletionService、CompletableFuture解決多執行緒併發中歸集問題的效率對比

相關閱讀

【小家java】java5新特性(簡述十大新特性) 重要一躍
【小家java】java6新特性(簡述十大新特性) 雞肋升級
【小家java】java7新特性(簡述八大新特性) 不溫不火
【小家java】java8新特性(簡述十大新特性) 飽受讚譽
【小家java】java9新特性(簡述十大新特性) 褒貶不一
【小家java】java10新特性(簡述十大新特性) 小步迭代
【小家java】java11新特性(簡述八大新特性) 首個重磅LTS版本


前文

開啟執行緒執行任務,不管是使用Runnable(無返回值不支援上報異常)還是Callable(有返回值支援上報異常)介面,都可以輕鬆實現。那麼如果是開啟執行緒池並需要獲取結果歸集的情況下,如何實現,以及優劣?

本文將分別以這四種方式解決歸集的問題,然後看看效率和使用的方便程度即可

1、Futrue

Future介面封裝了取消,獲取執行緒結果,以及狀態判斷是否取消,是否完成這幾個方法,都很有用。

Demo:

使用執行緒池提交Callable介面任務,返回Future介面,新增進list,最後遍歷該List且內部使用while輪詢,併發獲取結果,程式碼如下

/**
 * 使用Futrue來實現多執行緒執行歸集操作
 *
 * @author [email protected]
 * @description //
 * @date 2018/10/31 11:02
 */
public class
FutureDemo { public static void main(String[] args) { Long start = Instant.now().toEpochMilli(); //定義一個執行緒池 方便開啟和執行多執行緒 此處為了方便,直接使用 newFixedThreadPool ExecutorService exs = Executors.newFixedThreadPool(10); //結果集 裝載在list裡面 List<Integer> list = new ArrayList
<>(); List<Future<Integer>> futureList = new ArrayList<>(); try { //1.高速提交10個任務,每個任務返回一個Future入futureList 裝載起來 這樣10個執行緒就並行去處理和計算了 for (int i = 0; i < 10; i++) { futureList.add(exs.submit(new CallableTask(i + 1))); } Long getResultStart = Instant.now().toEpochMilli(); System.out.println("結果歸集開始時間=" + LocalDateTime.now()); //2.結果歸集,用迭代器遍歷futureList,高速輪詢(模擬實現了併發),任務完成就移除 while (futureList.size() > 0) { Iterator<Future<Integer>> iterable = futureList.iterator(); //遍歷 輪詢 while (iterable.hasNext()) { Future<Integer> future = iterable.next(); //如果任務完成就立馬取結果,並且,並且把該任務直接從futureList移除掉 否則判斷下一個任務是否完成 if (future.isDone() && !future.isCancelled()) { //獲取結果 Integer i = future.get(); System.out.println("任務i=" + i + "獲取完成,移出任務佇列!" + LocalDateTime.now()); //把結果裝入進去 然後把futrue任務移除 list.add(i); iterable.remove(); } else { Thread.sleep(1);//避免CPU高速運轉(這就是輪詢的弊端),這裡休息1毫秒,CPU納秒級別 } } } System.out.println("list=" + list); //任務的處理結果 System.out.println("總耗時=" + (System.currentTimeMillis() - start) + ",取結果歸集耗時=" + (System.currentTimeMillis() - getResultStart)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } // 任務 採用sleep模擬處理任務需要消耗的時間 static class CallableTask implements Callable<Integer> { Integer i; //用來編號任務 方便日誌裡輸出識別 public CallableTask(Integer i) { super(); this.i = i; } @Override public Integer call() throws Exception { if (i == 1) { Thread.sleep(3000);//任務1耗時3秒 } else if (i == 5) { Thread.sleep(5000);//任務5耗時5秒 } else { Thread.sleep(1000);//其它任務耗時1秒 } System.out.println("task執行緒:" + Thread.currentThread().getName() + "任務i=" + i + ",完成!" + LocalDateTime.now()); return i; } } }

如上圖,開啟定長為10的執行緒池:ExecutorService exs = Executors.newFixedThreadPool(10);+任務1耗時3秒,任務5耗時5秒,其他1秒。控制檯列印如下:

結果歸集開始時間=2018-10-31T11:01:19.457
task執行緒:pool-1-thread-2任務i=2,完成!2018-10-31T11:01:19.976
task執行緒:pool-1-thread-4任務i=4,完成!2018-10-31T11:01:19.977
task執行緒:pool-1-thread-3任務i=3,完成!2018-10-31T11:01:19.977
任務i=4獲取完成,移出任務佇列!2018-10-31T11:01:19.978
task執行緒:pool-1-thread-9任務i=9,完成!2018-10-31T11:01:19.978
task執行緒:pool-1-thread-8任務i=8,完成!2018-10-31T11:01:19.978
task執行緒:pool-1-thread-7任務i=7,完成!2018-10-31T11:01:19.978
task執行緒:pool-1-thread-6任務i=6,完成!2018-10-31T11:01:19.978
任務i=6獲取完成,移出任務佇列!2018-10-31T11:01:19.979
任務i=7獲取完成,移出任務佇列!2018-10-31T11:01:19.979
task執行緒:pool-1-thread-10任務i=10,完成!2018-10-31T11:01:19.979
任務i=8獲取完成,移出任務佇列!2018-10-31T11:01:19.979
任務i=9獲取完成,移出任務佇列!2018-10-31T11:01:19.979
任務i=10獲取完成,移出任務佇列!2018-10-31T11:01:19.979
任務i=2獲取完成,移出任務佇列!2018-10-31T11:01:19.980
任務i=3獲取完成,移出任務佇列!2018-10-31T11:01:19.980
task執行緒:pool-1-thread-1任務i=1,完成!2018-10-31T11:01:21.964
任務i=1獲取完成,移出任務佇列!2018-10-31T11:01:21.965
task執行緒:pool-1-thread-5任務i=5,完成!2018-10-31T11:01:23.977
任務i=5獲取完成,移出任務佇列!2018-10-31T11:01:23.979
list=[4, 6, 7, 8, 9, 10, 2, 3, 1, 5]
總耗時=5070,取結果歸集耗時=5037

看最後的兩個結果輸出:

list=[4, 6, 7, 8, 9, 10, 2, 3, 1, 5]--》多執行幾遍,最後2個總是15最後加進去的,可實現按照任務完成先後順序獲取結果! 因為1需要3s,5需要5s是最慢的,所以最後進入list
總耗時=5046,取結果歸集耗時=5040 ---》符合邏輯,10個任務,定長10執行緒池,其中一個任務耗時3秒,一個任務耗時5秒,由於併發高速輪訓,耗時取最長5

建議:此種方法可實現基本目標,任務並行且按照完成順序獲取結果。使用很普遍,老少皆宜,就是CPU有消耗,可以使用!

2、FutureTask

FutureTask是介面RunnableFuture的唯一實現類(實現了Future+Runnable).
1.Runnable介面,可開啟單個執行緒執行。
2.Future介面,可接受Callable介面的返回值,futureTask.get()阻塞獲取結果。

demo:

demo1:兩個步驟:1.開啟單個執行緒執行任務,2.阻塞等待執行結果,分離這兩步驟,可在這兩步中間穿插別的相關業務邏輯

/**
 * FutureTask彌補了Future必須用執行緒池提交返回Future的缺陷,實現功能如下:
 * 這兩個步驟:一個開啟執行緒執行任務,一個阻塞等待執行結果,分離這兩步驟,可在這兩步中間穿插別的相關業務邏輯。
 *
 * @author [email protected]
 * @description //
 * @date 2018/10/31 11:15
 */
public class FutureTaskContorlDemo {

    public static void main(String[] args) {
        try {
            System.out.println("=====例如一個統計公司總部和分部的總利潤是否達標100萬==========");
            //利潤 記錄總公司的利潤綜合
            Integer count = 0;
            //1.定義一個futureTask,假設去遠端http獲取各個分公司業績(任務都比較耗時).
            FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask());
            Thread futureTaskThread = new Thread(futureTask);
            futureTaskThread.start();
            System.out.println("futureTaskThread start!" + new Date());

            //2.主執行緒先做點別的事
            System.out.println("主執行緒查詢總部公司利潤開始時間:" + new Date());
            Thread.sleep(5000);
            count += 10; //10表示北京集團總部利潤。
            System.out.println("主執行緒查詢總部公司利潤結果時間:" + new Date());

            //總部已達標100萬利潤,就不再繼續執行獲取分公司業績任務了
            if (count >= 100) {
                System.out.println("總部公司利潤達標,取消futureTask!" + new Date());
                futureTask.cancel(true);//不需要再去獲取結果,那麼直接取消即可
            } else {
                System.out.println("總部公司利潤未達標,進入阻塞查詢分公司利潤!" + new Date());

                //3總部未達標.阻塞獲取,各個分公司結果  然後分別去獲取分公司的利潤
                Integer i = futureTask.get();//真正執行CallableTask
                System.out.println("i=" + i + "獲取到結果!" + new Date());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 模擬一個十分耗時的任務  去所有的分公司裡去獲取利潤結果
    static class CallableTask implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("CallableTask-call,查詢分公司利潤,執行開始!" + new Date());
            Thread.sleep(10000);
            System.out.println("CallableTask-call,查詢分公司利潤,執行完畢!" + new Date());
            return 10;
        }
    }
}

輸出:

=====例如一個統計公司總部和分部的總利潤是否達標100==========
futureTaskThread start!Wed Oct 31 11:21:33 CST 2018
主執行緒查詢總部公司利潤開始時間:Wed Oct 31 11:21:33 CST 2018
CallableTask-call,查詢分公司利潤,執行開始!Wed Oct 31 11:21:33 CST 2018
主執行緒查詢總部公司利潤結果時間:Wed Oct 31 11:21:38 CST 2018
總部公司利潤未達標,進入阻塞查詢分公司利潤!Wed Oct 31 11:21:38 CST 2018
CallableTask-call,查詢分公司利潤,執行完畢!Wed Oct 31 11:21:43 CST 2018
i=10獲取到結果!Wed Oct 31 11:21:43 CST 2018

如上,分離之後,futureTaskThread耗時10秒期間,主執行緒還穿插的執行了耗時5秒的操作,大大減小總耗時。且可根據業務邏輯實時判斷是否需要繼續執行futureTask。

Demo2:FutureTask一樣可以併發執行任務並獲取結果,如下:

/**
 * FutureTask實現多執行緒併發執行任務並取結果歸集
 *
 * @author [email protected]
 * @description //
 * @date 2018/10/31 11:26
 */
public class FutureTaskDemo {

    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //結果集
        List<Integer> list = new ArrayList<>();
        List<FutureTask<Integer>> futureList = new ArrayList<>();
        try {
            //啟動執行緒池  和上面Futrue對比,只有這塊有點不一樣
            for (int i = 0; i < 10; i++) {
                FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask(i + 1));
                //提交任務,新增返回,Runnable特性
                exs.submit(futureTask);
                //Future特性 提交任務後  把futureTask新增進futureList
                futureList.add(futureTask);
            }

            Long getResultStart = System.currentTimeMillis();
            System.out.println("結果歸集開始時間=" + new Date());
            //結果歸集
            while (futureList.size() > 0) {
                Iterator<FutureTask<Integer>> iterable = futureList.iterator();
                //遍歷一遍
                while (iterable.hasNext()) {
                    Future<Integer> future = iterable.next();
                    if (future.isDone() && !future.isCancelled()) {
                        //Future特性
                        Integer i = future.get();
                        System.out.println("任務i=" + i + "獲取完成,移出任務佇列!" + new Date());
                        list.add(i);
                        //任務完成移除任務
                        iterable.remove();
                    } else {
                        //避免CPU高速輪循,可以休息一下。
                        Thread.sleep(1);
                    }
                }
            }

            System.out.println("list=" + list);
            System.out.println("總耗時=" + (System.currentTimeMillis() - start) + ",取結果歸集耗時=" + (System.currentTimeMillis() - getResultStart));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    static class CallableTask implements Callable<Integer> {
        Integer i;

        public CallableTask(Integer i) {
            super();
            this.i = i;
        }

        @Override
        public Integer call() throws Exception {
            if (i == 1) {
                Thread.sleep(3000);//任務1耗時3秒
            } else if (i == 5) {
                Thread.sleep(