1. 程式人生 > >多執行緒併發執行任務,取結果歸集:Future、FutureTask、CompletionService、CompletableFuture

多執行緒併發執行任務,取結果歸集:Future、FutureTask、CompletionService、CompletableFuture

Future

(1)cancle

(2)get

(3)isCancle

(4)isDone

示例:

使用執行緒池提交Callable介面任務,返回Future介面,新增進李斯特,最後遍歷FutureList且內部使用while輪詢,併發獲取結果

package thread.future;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 * @ClassName: FutureDemo
 * @Description: Future多執行緒併發任務結果歸集
 * @author denny.zhang
 * @date 2016年11月4日 下午1:50:32
 *
 */
public class FutureDemo{

    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //開啟多執行緒
        ExecutorService exs = Executors.newFixedThreadPool(10);
        try {
            //結果集
            List<Integer> list = new ArrayList<Integer>();
            List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
            //1.高速提交10個任務,每個任務返回一個Future入list
            for(int i=0;i<10;i++){
                futureList.add(exs.submit(new CallableTask(i+1)));
            }
            Long getResultStart = System.currentTimeMillis();
            System.out.println("結果歸集開始時間="+new Date());
            //2.結果歸集,遍歷futureList,高速輪詢(模擬實現了併發)獲取future狀態成功完成後獲取結果,退出當前迴圈
            for (Future<Integer> future : futureList) {
                while (true) {//CPU高速輪詢:每個future都併發輪循,判斷完成狀態然後獲取結果,這一行,是本實現方案的精髓所在。即有10個future在高速輪詢,完成一個future的獲取結果,就關閉一個輪詢
                    if (future.isDone()&& !future.isCancelled()) {//獲取future成功完成狀態,如果想要限制每個任務的超時時間,取消本行的狀態判斷+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超時異常使用即可。
                        Integer i = future.get();//獲取結果
                        System.out.println("任務i="+i+"獲取完成!"+new Date());
                        list.add(i);
                        break;//當前future獲取結果完畢,跳出while
                    } else {
                        Thread.sleep(1);//每次輪詢休息1毫秒(CPU納秒級),避免CPU高速輪循耗空CPU---》新手別忘記這個
                    }
                }
            }
            System.out.println("list="+list);
            System.out.println("總耗時="+(System.currentTimeMillis()-start)+",取結果歸集耗時="+(System.currentTimeMillis()-getResultStart));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }
    static class CallableTask implements Callable<Integer>{
        Integer i;
        
        public CallableTask(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==1){
                Thread.sleep(3000);//任務1耗時3秒
            }else if(i==5){
                Thread.sleep(5000);//任務5耗時5秒
            }else{
                Thread.sleep(1000);//其它任務耗時1秒
            }
            System.out.println("task執行緒:"+Thread.currentThread().getName()+"任務i="+i+",完成!");  
            return i;
        }
    }
}

FutureTask

1.Runnable介面,可開啟單個執行緒執行。

2.Future<v>介面,可接受Callable介面的返回值,futureTask.get阻塞獲取結果。

/**
 * 
 * @ClassName:FutureTaskDemo
 * @Description:FutureTask彌補了Future必須用執行緒池提交返回Future的缺陷,實現功能如下:
 * 1.Runnable介面,可開啟執行緒執行。
 * 2.Future<v>介面,可接受Callable介面的返回值,futureTask.get()阻塞獲取結果。
 * 這兩個步驟:一個開啟執行緒執行任務,一個阻塞等待執行結果,分離這兩步驟,可在這兩步中間穿插別的相關業務邏輯。
 * @author diandian.zhang
 * @date 2017年6月16日上午10:36:05
 */
public class FutureTaskContorlDemo {
    
    public static void main(String[] args)  {
        try {
            System.out.println("=====例如一個統計公司總部和分部的總利潤是否達標100萬==========");
            //利潤
            Integer count = 0;
            //1.定義一個futureTask,假設去遠端http獲取各個分公司業績.
            FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableTask());
            Thread futureTaskThread =  new Thread(futureTask);
            futureTaskThread.start();
            System.out.println("futureTaskThread start!"+new Date());
            
            //2.主執行緒先做點別的事
            System.out.println("主執行緒查詢總部公司利潤開始時間:"+new Date());
            Thread.sleep(5000);
            count+=10;//北京集團總部利潤。
            System.out.println("主執行緒查詢總部公司利潤結果時間:"+new Date());
            
            //總部已達標100萬利潤,就不再繼續執行獲取分公司業績任務了
            if(count>=100){
                System.out.println("總部公司利潤達標,取消futureTask!"+new Date());
                futureTask.cancel(true);//不需要再去獲取結果,那麼直接取消即可
            }else{
                System.out.println("總部公司利潤未達標,進入阻塞查詢分公司利潤!"+new Date());
                //3總部未達標.阻塞獲取,各個分公司結果
                Integer i = futureTask.get();//真正執行CallableTask
                System.out.println("i="+i+"獲取到結果!"+new Date()+new Date());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 
     * @ClassName:CallableTask
     * @Description:一個十分耗時的任務
     * @author diandian.zhang
     * @date 2017年6月16日上午10:39:04
     */
    static class CallableTask implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            System.out.println("CallableTask-call,查詢分公司利潤,執行開始!"+new Date());
            Thread.sleep(10000);
            System.out.println("CallableTask-call,查詢分公司利潤,執行完畢!"+new Date());
            return 10;
        }
    }
package thread.future;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * 
 * @ClassName:FutureTaskDemo
 * @Description:FutureTask實現多執行緒併發執行任務並取結果歸集
 * @author diandian.zhang
 * @date 2017年6月16日上午10:36:05
 */
public class FutureTaskDemo {
    
    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //開啟多執行緒
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            //結果集
            List<Integer> list = new ArrayList<Integer>();
            List<FutureTask<Integer>> futureList = new ArrayList<FutureTask<Integer>>();
            //啟動執行緒池,10個任務固定執行緒數為5
            for(int i=0;i<10;i++){
                 FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableTask(i+1));
                //提交任務,新增返回
                exs.submit(futureTask);//Runnable特性
                futureList.add(futureTask);//Future特性
            }
            Long getResultStart = System.currentTimeMillis();
            System.out.println("結果歸集開始時間="+new Date());
            //結果歸集
            for (FutureTask<Integer> future : futureList) {
                while (true) {
                    if (future.isDone()&& !future.isCancelled()) {
                        Integer i = future.get();//Future特性
                        System.out.println("i="+i+"獲取到結果!"+new Date());
                        list.add(i);
                        break;
                    }else {
                        Thread.sleep(1);//避免CPU高速輪循,可以休息一下。
                    }
                }
            }
            System.out.println("list="+list);
            System.out.println("總耗時="+(System.currentTimeMillis()-start)+",取結果歸集耗時="+(System.currentTimeMillis()-getResultStart));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
        
    }

    static class CallableTask implements Callable<Integer>{
        Integer i;
        
        public CallableTask(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==1){
                Thread.sleep(3000);//任務1耗時3秒
            }else if(i==5){
                Thread.sleep(5000);//任務5耗時5秒
            }else{
                Thread.sleep(1000);//其它任務耗時1秒
            }
            System.out.println("執行緒:["+Thread.currentThread().getName()+"]任務i="+i+",完成!"+new Date());  
            return i;

        }
        
    }

}

CompletionService

原因:內部通過阻塞佇列+FutureTask,實現了任務先完成可優先獲取到,即結果完成後順序排序。

package thread.future;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 * @ClassName: CompletionServiceDemo
 * @Description: CompletionService多執行緒併發任務結果歸集
 * @author denny.zhang
 * @date 2016年11月4日 下午1:50:32
 *
 */
public class CompletionServiceDemo{

    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //開啟3個執行緒
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            //結果集
            List<Integer> list = new ArrayList<Integer>();
            //1.定義CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);  
            List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
            //2.新增任務
            for(int i=0;i<taskCount;i++){
                futureList.add(completionService.submit(new Task(i+1)));
            }
            //==================結果歸集===================
            //方法1:future是提交時返回的,遍歷queue則按照任務提交順序,獲取結果
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//執行緒在這裡阻塞等待該任務執行完畢,按照
//                System.out.println("任務result="+result+"獲取到結果!"+new Date());
//                list.add(result);
//            }

//            //方法2.使用內部阻塞佇列的take()
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();//採用completionService.take(),內部維護阻塞佇列,任務先完成的先獲取到
                System.out.println("任務i=="+result+"完成!"+new Date());
                list.add(result);
            }
            System.out.println("list="+list);
            System.out.println("總耗時="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//關閉執行緒池
        }
        
    }

    static class Task implements Callable<Integer>{
        Integer i;
        
        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5){
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("執行緒:"+Thread.currentThread().getName()+"任務i="+i+",執行完成!");  
            return i;
        }
        
    }
}

CompletableFuture

JDK1.8才新加入的實現類,實現Future、CompletionStage兩個介面。

package thread.future;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.google.common.collect.Lists;

/**
 * 
 * @ClassName:CompletableFutureDemo
 * @Description:多執行緒併發任務,取結果歸集
 * @author diandian.zhang
 * @date 2017年6月14日下午12:44:01
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //結果集
        List<String> list = new ArrayList<String>();
        List<String> list2 = new ArrayList<String>();
        //定長10執行緒池
        ExecutorService exs = Executors.newFixedThreadPool(10);
        //List<CompletableFuture<String>> futureList = new ArrayList<>();
        List<Integer> taskList = Lists.newArrayList(2,1,3,4,5,6,7,8,9,10);
        try {
            //方式一:迴圈建立CompletableFuture list,呼叫sequence()組裝返回一個有返回值的CompletableFuture,返回結果get()獲取
//            for(int i=0;i<taskList.size();i++){
//                final int j=i+1;
//                CompletableFuture<String> future = CompletableFuture.supplyAsync(()->calc(j), exs)//非同步執行
//                        .thenApply(e->Integer.toString(e))//Integer轉換字串    thenAccept只接受不返回不影響結果
////                        .whenComplete((v, e) -> {//如需獲取任務完成先手順序,此處程式碼即可
////                            System.out.println("任務"+v+"完成!result="+v+",異常 e="+e+","+new Date());
////                            list2.add(v);
////                        })
//                        ;
//                futureList.add(future);
//            }
//            //流式獲取結果
//            list = sequence(futureList).get();//[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]此處不理解為什麼是這樣的順序?誰知道求告知
            
            //方式二:全流式處理轉換成CompletableFuture[]+組裝成一個無返回值CompletableFuture,join等待執行完畢。返回結果whenComplete獲取
            @SuppressWarnings("rawtypes")
            CompletableFuture[] cfs = taskList.stream().map(object-> CompletableFuture.supplyAsync(()->calc(object), exs)
                    .thenApply(h->Integer.toString(h))
                    .whenComplete((v, e) -> {//如需獲取任務完成先手順序,此處程式碼即可
                        System.out.println("任務"+v+"完成!result="+v+",異常 e="+e+","+new Date());
                        list2.add(v);
                    }))
                    .toArray(CompletableFuture[]::new);
            CompletableFuture.allOf(cfs).join();//封裝後無返回值,必須自己whenComplete()獲取
            System.out.println("list2="+list2+"list="+list+",耗時="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            exs.shutdown();
        }
    }
    
    public static Integer calc(Integer i){
        try {
            if(i==1){
                Thread.sleep(3000);//任務1耗時3秒
            }else if(i==5){
                Thread.sleep(5000);//任務5耗時5秒
            }else{
                Thread.sleep(1000);//其它任務耗時1秒
            }
            System.out.println("task執行緒:"+Thread.currentThread().getName()+"任務i="+i+",完成!+"+new Date());  
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
    
    /**
     * 
     * @Description 組合多個CompletableFuture為一個CompletableFuture,所有子任務全部完成,組合後的任務才會完成。帶返回值,可直接get.
     * @param futures List
     * @return
     * @author diandian.zhang
     * @date 2017年6月19日下午3:01:09
     * @since JDK1.8
     */
    public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
        //1.構造一個空CompletableFuture,子任務數為入參任務list size
        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        //2.流式(每個子任務join操作後轉換為list)往空CompletableFuture中新增結果
        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
    }
   
    /**
     * 
     * @Description Stream流式型別futures轉換成一個CompletableFuture,所有子任務全部完成,組合後的任務才會完成。帶返回值,可直接get.
     * @param futures Stream
     * @return
     * @author diandian.zhang
     * @date 2017年6月19日下午6:23:40
     * @since JDK1.8
     */
    public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
        List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
        return sequence(futureList);
    }
}

最後歡迎大家訪問我的個人網站:1024s​​​​​​​