【小家Java】Future、FutureTask、CompletionService、CompletableFuture解決多執行緒併發中歸集問題的效率對比
相關閱讀
【小家java】java5新特性(簡述十大新特性) 重要一躍
【小家java】java6新特性(簡述十大新特性) 雞肋升級
【小家java】java7新特性(簡述八大新特性) 不溫不火
【小家java】java8新特性(簡述十大新特性) 飽受讚譽
【小家java】java9新特性(簡述十大新特性) 褒貶不一
【小家java】java10新特性(簡述十大新特性) 小步迭代
【小家java】java11新特性(簡述八大新特性) 首個重磅LTS版本
前文
開啟執行緒執行任務,不管是使用Runnable(無返回值不支援上報異常)還是Callable(有返回值支援上報異常)介面,都可以輕鬆實現。那麼如果是開啟執行緒池並需要獲取結果歸集的情況下,如何實現,以及優劣?
本文將分別以這四種方式解決歸集的問題,然後看看效率和使用的方便程度即可
1、Futrue
Future介面封裝了取消,獲取執行緒結果,以及狀態判斷是否取消,是否完成這幾個方法,都很有用。
Demo:
使用執行緒池提交Callable介面任務,返回Future介面,新增進list,最後遍歷該List且內部使用while輪詢,併發獲取結果,程式碼如下
/**
* 使用Futrue來實現多執行緒執行歸集操作
*
* @author [email protected]
* @description //
* @date 2018/10/31 11:02
*/
public class FutureDemo {
public static void main(String[] args) {
Long start = Instant.now().toEpochMilli();
//定義一個執行緒池 方便開啟和執行多執行緒 此處為了方便,直接使用 newFixedThreadPool
ExecutorService exs = Executors.newFixedThreadPool(10);
//結果集 裝載在list裡面
List<Integer> list = new ArrayList <>();
List<Future<Integer>> futureList = new ArrayList<>();
try {
//1.高速提交10個任務,每個任務返回一個Future入futureList 裝載起來 這樣10個執行緒就並行去處理和計算了
for (int i = 0; i < 10; i++) {
futureList.add(exs.submit(new CallableTask(i + 1)));
}
Long getResultStart = Instant.now().toEpochMilli();
System.out.println("結果歸集開始時間=" + LocalDateTime.now());
//2.結果歸集,用迭代器遍歷futureList,高速輪詢(模擬實現了併發),任務完成就移除
while (futureList.size() > 0) {
Iterator<Future<Integer>> iterable = futureList.iterator();
//遍歷 輪詢
while (iterable.hasNext()) {
Future<Integer> future = iterable.next();
//如果任務完成就立馬取結果,並且,並且把該任務直接從futureList移除掉 否則判斷下一個任務是否完成
if (future.isDone() && !future.isCancelled()) {
//獲取結果
Integer i = future.get();
System.out.println("任務i=" + i + "獲取完成,移出任務佇列!" + LocalDateTime.now());
//把結果裝入進去 然後把futrue任務移除
list.add(i);
iterable.remove();
} else {
Thread.sleep(1);//避免CPU高速運轉(這就是輪詢的弊端),這裡休息1毫秒,CPU納秒級別
}
}
}
System.out.println("list=" + list); //任務的處理結果
System.out.println("總耗時=" + (System.currentTimeMillis() - start) + ",取結果歸集耗時=" + (System.currentTimeMillis() - getResultStart));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();
}
}
// 任務 採用sleep模擬處理任務需要消耗的時間
static class CallableTask implements Callable<Integer> {
Integer i; //用來編號任務 方便日誌裡輸出識別
public CallableTask(Integer i) {
super();
this.i = i;
}
@Override
public Integer call() throws Exception {
if (i == 1) {
Thread.sleep(3000);//任務1耗時3秒
} else if (i == 5) {
Thread.sleep(5000);//任務5耗時5秒
} else {
Thread.sleep(1000);//其它任務耗時1秒
}
System.out.println("task執行緒:" + Thread.currentThread().getName() + "任務i=" + i + ",完成!" + LocalDateTime.now());
return i;
}
}
}
如上圖,開啟定長為10的執行緒池:ExecutorService exs = Executors.newFixedThreadPool(10);+任務1耗時3秒,任務5耗時5秒,其他1秒。控制檯列印如下:
結果歸集開始時間=2018-10-31T11:01:19.457
task執行緒:pool-1-thread-2任務i=2,完成!2018-10-31T11:01:19.976
task執行緒:pool-1-thread-4任務i=4,完成!2018-10-31T11:01:19.977
task執行緒:pool-1-thread-3任務i=3,完成!2018-10-31T11:01:19.977
任務i=4獲取完成,移出任務佇列!2018-10-31T11:01:19.978
task執行緒:pool-1-thread-9任務i=9,完成!2018-10-31T11:01:19.978
task執行緒:pool-1-thread-8任務i=8,完成!2018-10-31T11:01:19.978
task執行緒:pool-1-thread-7任務i=7,完成!2018-10-31T11:01:19.978
task執行緒:pool-1-thread-6任務i=6,完成!2018-10-31T11:01:19.978
任務i=6獲取完成,移出任務佇列!2018-10-31T11:01:19.979
任務i=7獲取完成,移出任務佇列!2018-10-31T11:01:19.979
task執行緒:pool-1-thread-10任務i=10,完成!2018-10-31T11:01:19.979
任務i=8獲取完成,移出任務佇列!2018-10-31T11:01:19.979
任務i=9獲取完成,移出任務佇列!2018-10-31T11:01:19.979
任務i=10獲取完成,移出任務佇列!2018-10-31T11:01:19.979
任務i=2獲取完成,移出任務佇列!2018-10-31T11:01:19.980
任務i=3獲取完成,移出任務佇列!2018-10-31T11:01:19.980
task執行緒:pool-1-thread-1任務i=1,完成!2018-10-31T11:01:21.964
任務i=1獲取完成,移出任務佇列!2018-10-31T11:01:21.965
task執行緒:pool-1-thread-5任務i=5,完成!2018-10-31T11:01:23.977
任務i=5獲取完成,移出任務佇列!2018-10-31T11:01:23.979
list=[4, 6, 7, 8, 9, 10, 2, 3, 1, 5]
總耗時=5070,取結果歸集耗時=5037
看最後的兩個結果輸出:
list=[4, 6, 7, 8, 9, 10, 2, 3, 1, 5]--》多執行幾遍,最後2個總是1,5最後加進去的,可實現按照任務完成先後順序獲取結果! 因為1需要3s,5需要5s是最慢的,所以最後進入list
總耗時=5046,取結果歸集耗時=5040 ---》符合邏輯,10個任務,定長10執行緒池,其中一個任務耗時3秒,一個任務耗時5秒,由於併發高速輪訓,耗時取最長5秒
建議:此種方法可實現基本目標,任務並行且按照完成順序獲取結果。使用很普遍,老少皆宜,就是CPU有消耗,可以使用!
2、FutureTask
FutureTask是介面RunnableFuture的唯一實現類(實現了Future+Runnable).
1.Runnable介面,可開啟單個執行緒執行。
2.Future介面,可接受Callable介面的返回值,futureTask.get()阻塞獲取結果。
demo:
demo1:兩個步驟:1.開啟單個執行緒執行任務,2.阻塞等待執行結果,分離這兩步驟,可在這兩步中間穿插別的相關業務邏輯
/**
* FutureTask彌補了Future必須用執行緒池提交返回Future的缺陷,實現功能如下:
* 這兩個步驟:一個開啟執行緒執行任務,一個阻塞等待執行結果,分離這兩步驟,可在這兩步中間穿插別的相關業務邏輯。
*
* @author [email protected]
* @description //
* @date 2018/10/31 11:15
*/
public class FutureTaskContorlDemo {
public static void main(String[] args) {
try {
System.out.println("=====例如一個統計公司總部和分部的總利潤是否達標100萬==========");
//利潤 記錄總公司的利潤綜合
Integer count = 0;
//1.定義一個futureTask,假設去遠端http獲取各個分公司業績(任務都比較耗時).
FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask());
Thread futureTaskThread = new Thread(futureTask);
futureTaskThread.start();
System.out.println("futureTaskThread start!" + new Date());
//2.主執行緒先做點別的事
System.out.println("主執行緒查詢總部公司利潤開始時間:" + new Date());
Thread.sleep(5000);
count += 10; //10表示北京集團總部利潤。
System.out.println("主執行緒查詢總部公司利潤結果時間:" + new Date());
//總部已達標100萬利潤,就不再繼續執行獲取分公司業績任務了
if (count >= 100) {
System.out.println("總部公司利潤達標,取消futureTask!" + new Date());
futureTask.cancel(true);//不需要再去獲取結果,那麼直接取消即可
} else {
System.out.println("總部公司利潤未達標,進入阻塞查詢分公司利潤!" + new Date());
//3總部未達標.阻塞獲取,各個分公司結果 然後分別去獲取分公司的利潤
Integer i = futureTask.get();//真正執行CallableTask
System.out.println("i=" + i + "獲取到結果!" + new Date());
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 模擬一個十分耗時的任務 去所有的分公司裡去獲取利潤結果
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("CallableTask-call,查詢分公司利潤,執行開始!" + new Date());
Thread.sleep(10000);
System.out.println("CallableTask-call,查詢分公司利潤,執行完畢!" + new Date());
return 10;
}
}
}
輸出:
=====例如一個統計公司總部和分部的總利潤是否達標100萬==========
futureTaskThread start!Wed Oct 31 11:21:33 CST 2018
主執行緒查詢總部公司利潤開始時間:Wed Oct 31 11:21:33 CST 2018
CallableTask-call,查詢分公司利潤,執行開始!Wed Oct 31 11:21:33 CST 2018
主執行緒查詢總部公司利潤結果時間:Wed Oct 31 11:21:38 CST 2018
總部公司利潤未達標,進入阻塞查詢分公司利潤!Wed Oct 31 11:21:38 CST 2018
CallableTask-call,查詢分公司利潤,執行完畢!Wed Oct 31 11:21:43 CST 2018
i=10獲取到結果!Wed Oct 31 11:21:43 CST 2018
如上,分離之後,futureTaskThread耗時10秒期間,主執行緒還穿插的執行了耗時5秒的操作,大大減小總耗時。且可根據業務邏輯實時判斷是否需要繼續執行futureTask。
Demo2:FutureTask一樣可以併發執行任務並獲取結果,如下:
/**
* FutureTask實現多執行緒併發執行任務並取結果歸集
*
* @author [email protected]
* @description //
* @date 2018/10/31 11:26
*/
public class FutureTaskDemo {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
ExecutorService exs = Executors.newFixedThreadPool(10);
//結果集
List<Integer> list = new ArrayList<>();
List<FutureTask<Integer>> futureList = new ArrayList<>();
try {
//啟動執行緒池 和上面Futrue對比,只有這塊有點不一樣
for (int i = 0; i < 10; i++) {
FutureTask<Integer> futureTask = new FutureTask<>(new CallableTask(i + 1));
//提交任務,新增返回,Runnable特性
exs.submit(futureTask);
//Future特性 提交任務後 把futureTask新增進futureList
futureList.add(futureTask);
}
Long getResultStart = System.currentTimeMillis();
System.out.println("結果歸集開始時間=" + new Date());
//結果歸集
while (futureList.size() > 0) {
Iterator<FutureTask<Integer>> iterable = futureList.iterator();
//遍歷一遍
while (iterable.hasNext()) {
Future<Integer> future = iterable.next();
if (future.isDone() && !future.isCancelled()) {
//Future特性
Integer i = future.get();
System.out.println("任務i=" + i + "獲取完成,移出任務佇列!" + new Date());
list.add(i);
//任務完成移除任務
iterable.remove();
} else {
//避免CPU高速輪循,可以休息一下。
Thread.sleep(1);
}
}
}
System.out.println("list=" + list);
System.out.println("總耗時=" + (System.currentTimeMillis() - start) + ",取結果歸集耗時=" + (System.currentTimeMillis() - getResultStart));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();
}
}
static class CallableTask implements Callable<Integer> {
Integer i;
public CallableTask(Integer i) {
super();
this.i = i;
}
@Override
public Integer call() throws Exception {
if (i == 1) {
Thread.sleep(3000);//任務1耗時3秒
} else if (i == 5) {
Thread.sleep(