1. 程式人生 > >從零開始學多執行緒之執行緒池(五)

從零開始學多執行緒之執行緒池(五)

單執行緒的缺點&使用多執行緒的好處

圍繞執行任務來管理應用程式時,第一步要指明一個清晰的任務邊界(task boundaries).理想情況下,任務是獨立的活動:它的工作並不依賴於其他任務的狀態、結果或者邊界效應.獨立有利於併發性,如果能得到相應的處理器資源,獨立的任務還可以並行執行.

 

在正常的負載下,伺服器應用程式應該兼具良好的吞吐量和快速的響應性.應用程式提供商希望程式支援儘可能多的使用者,所以會努力降低每個使用者的開銷;而使用者希望儘快獲得響應.進一步講,應用程式應該在負荷過載時平緩地劣化,而不應該負載一高就簡單地以失敗告終.為了達到這些目的,你要選擇一個清晰的任務邊界,並配合一個明確的任務執行策略.

 

大多數伺服器應用程式都選擇了下面這個自然的任務邊界:單獨的客戶請求.將獨立的請求作為任務邊界,可以讓任務兼顧獨立性和適當的大小.

 

單執行緒的缺點: 對於一個單執行緒化的伺服器,阻塞不僅僅延遲了當前請求的完成,而且還完全阻止了需要被處理的等待請求.如果請求阻塞的時間過長,使用者將看不到響應,可能任務伺服器已經不可用了.同時,單執行緒在等待它的I/O操作時,CPU會處於閒置狀態,因此導致了資源利用率非常低.

 

順序化處理幾乎不能為伺服器應用程式提供良好的吞吐量或快速的響應性.不過也有特例,比如當任務的數量很少但生命週期很長時,或者當伺服器只服務於唯一的使用者時,伺服器在同一時刻只需同時處理一個請求---但是大多數伺服器應用程式都不是以這種方式工作的.

 

 

使用多執行緒程式設計,可以提高響應性,避免上述的缺點.

複製程式碼

    public static void main(String[] args) {
        //要執行的任務
        Runnable r = new Runnable() {
            @Override
            public void run() {
                //do something
            }
        };

        //建立一個新的執行緒
        Thread t = new Thread(r);

        //執行任務
        t.start();
    }

複製程式碼

 

  

多執行緒的好處&注意事項:

1. 由新建立的執行緒去執行任務,主執行緒可以更迅速的開始下一個任務.從而提高了響應性

2. 並行處理任務,這使得多個任務可以同時得到處理,如果有多個處理器,或者出於I/O未完成、鎖請求以及資源可用性等任何因素需要阻塞任務時,程式的吞吐量會得到提高.

3. 任務處理程式碼必須是執行緒安全的,因為有多個任務會併發地呼叫它.

 

在中等強度的負載水平下,"每任務每執行緒(thread-per-taks)"方法是對順序化執行的良好改進.只要請求的到達速度尚未超出

伺服器的請求處理能力,那麼這種方法可以同時的帶來更快的響應性和更大的吞吐量,

 

無限制建立執行緒的缺點

1. 執行緒生命週期的開銷.執行緒的建立與關閉不是"免費的".如果請求是頻繁的且輕量的,就像大多數伺服器程式一樣,那麼為每個請求建立一個新執行緒的做法就會消耗大量的計算資源.

 

2. 資源消耗量.活動執行緒會消耗系統資源,尤其是記憶體.如果可執行的執行緒數多於可用的處理器數,執行緒將會空閒.大量空閒執行緒佔用更多記憶體,給垃圾回收器帶來壓力,而且大量執行緒在競爭CPU資源時,還會產生其他的效能開銷.如果你已經有了足夠多的執行緒保持所有CPU忙碌,那麼再建立更多的執行緒是有百害而無一利的.

 

3. 穩定性.應該限制可建立執行緒的數目,否則可能會出現記憶體溢位錯誤.

 

凡事有度,在一定範圍內,增加執行緒可以提高系統的吞吐量,一旦超出了這個範圍,再建立更多的執行緒只會拖垮你的程式.建立過多的執行緒,會導致應用程式面臨崩潰.為了擺脫這種危險,應該設定一個範圍來限制你的應用程式可以建立的執行緒數,然後徹底地測試你的應用程式,確保即使執行緒數到達了這個範圍的極限,程式也不至於耗盡所有的資源.

 

"每任務每執行緒(thread-per-task)"方法的問題在於它沒有對已建立執行緒的數量進行任何限制,除非對客戶端能夠丟擲的請求速率進行限制.像其他的併發危險一樣,無限制建立執行緒的行為可能在原型和開發階段還能表現得執行良好,而當應用程式部署後,並運行於高負載下,它的問題才會暴露出來.所以一個惡意使用者或者足夠多的使用者,都會使你的Web Server的負載超過某個確定的極限值,從而導致伺服器的崩潰.對於一個伺服器,我們希望它具有高可用性,而且在高負載下可以平緩地劣化,但是上面的問題對我們的目標是個嚴重的阻礙.

 

使用Executor框架規避每任務每執行緒的缺點

單執行緒和"每任務每執行緒"的侷限性: 順序執行會產生糟糕的響應性和吞吐量,"每任務每執行緒"可能會導致程式崩潰.

 

上一篇部落格我們說到了如何用有界佇列(BlockingQueue)防止應用程式過載而耗盡記憶體.執行緒池(Thread pool)為執行緒管理提供了同樣的好處.在java中我們不是使用Thread來進行多執行緒的操作的,而是使用Executor.

public interface Executor {

    void execute(Runnable command);
}

 

 

Executor基於生產者-消費者模式.提交任務的執行者是生成者(產生待完成的工作單元),執行任務的執行緒是消費者(消耗掉這些工作單元).如果要在你的程式中實現一個生產者-消費者的模式,使用Executor通常是最簡單的方式.

 

複製程式碼

public class ExecutorTest {
    //可以把這個引數抽取到配置檔案,增加靈活性
    private static  final int NUM = 100;
    // 建立了一個定長的執行緒池,100個執行緒
    private static  final Executor EXECUTOR = Executors.newFixedThreadPool(NUM);

    public static void main(String [] args){

        for (int i = 0; i < 100; i++) {
            Runnable  r = new Runnable() {
                @Override
                public void run() {
                    //do someting.
                }
            };
            EXECUTOR.execute(r);
        }


    }
}

複製程式碼

 

 

我們通過使用Executor,將處理請求任務的提交與它的執行體進行了解耦.只要替換一個不同的Executor實現,就可以改變伺服器的行為.改變Executor的實現或者配置,所產生的影響遠遠小於直接改變任務的執行方式.

 

可以擴充套件Executor,自定義自己的執行策略

每任務每執行緒:

複製程式碼

public class ThreadPerTaskExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

複製程式碼

 

同步進行:

複製程式碼

public class WithinThreadExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}

複製程式碼

 

 

無論何時擋你看到這種形式的程式碼:

new Thread(runnable).start()

並且你可能最終希望獲得一個更加靈活的執行策略時,請認真考慮使用Executor代替Thread

 

執行緒池的具體使用

執行緒池管理一個工作者執行緒的同構池(homogeneous pool).執行緒池是與工作佇列(work queue)緊密繫結的.所謂工作佇列,其作用是持有所有等待執行的任務.工作者執行緒的生活從此輕鬆起來:它從工作佇列中獲取下一個任務,執行它,然後回來繼續等待下一個任務.

 

使用執行緒池的好處:

1.重用存在的執行緒,而不是建立新的執行緒,這可以在處理多請求時抵消縣城建立、消亡產生的開銷.

2.另外一個額外的好處就是,在請求到達時,工作者執行緒通常已經存在,用於建立執行緒的等待時間並不會延遲任務的執行,因此提高了響應性.

3.通過適當地調整執行緒池的大小,你可以得到足夠多的執行緒以保持處理器忙碌,同時還可以 防止過多的執行緒相互競爭資源,導致應用程式耗盡記憶體而失敗.

 

可以使用Executors靜態工廠方法來建立一個執行緒池(已經幫你配置好了):

 

 

複製程式碼

public static void main(String [] args){

        /*
        * 建立一個單執行緒化的executor,它只建立唯一的工作者執行緒來執行任務
        * 如果這個執行緒異常結束,會有另一個取代它.
        * 因為是單執行緒所以可以保證任務依照佇列所規定的的順序(FIFO,LIFO,優先順序)執行
        * */
        Executor singleThreadExecutor = Executors.newSingleThreadExecutor();

        /*
        * 從構造方法傳入引數,建立一個定長的執行緒池,每當提交一個任務就建立一個執行緒,
        * 直到達到池的最大長度,這時執行緒池保持長度不在變化.
        * 如果一個執行緒由於非預期的Exception而結束,執行緒池會補充一個新的執行緒
        * */
        Executor fixedThreadExecutor = Executors.newFixedThreadPool(100);


        /*
        * 這個是功能最強的,建立一個可快取的執行緒池,如果當前執行緒池的長度超過了處理的
        * 需要時,它可以靈活地回收空閒的執行緒,當需求增加時,它可以靈活地新增新的執行緒,
        * 而不會對池的長度做任何限制
        * */
        Executor cachedThreadExecutor = Executors.newCachedThreadPool();

        /*
        * 建立一個定長的執行緒池,而且支援定時的以及週期性的任務執行,類似於Timer(定時任務,
        * 取代了Timer)
        * */
        Executor scheduledThreadExecutor = Executors.newScheduledThreadPool(100);


        /*
        * 上面的工廠方法的內部實現,用的都是這個,只不過引數配置的不同.
        * 當我們需要配置個性化的執行緒池的時候,可以自己進行配置(阿里巴巴規約推薦你自己進行配置)
        * */
        Executor executor = new ThreadPoolExecutor(1,1,1L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(100));
    }  
}

複製程式碼

 

 

 

 

從"每執行緒每任務"策略遷移到基於池的策略,會對應用程式的穩定性產生重大的影響:Web Server 再也不會因過高的負載失敗了.

 

使用ExecutorService關閉執行緒池

Executor如果沒有關閉那麼JVM也無法結束:Executor通常只是為執行任務而建立執行緒.但是JVM會在所有(非後臺的,nondaemon)執行緒全部終止後才退出.因此,如果無法正確關閉Executor,將會阻止JVM的結束.

 

為了解決這個執行服務的生命週期問題,ExecutorService介面擴充套件了Executor,並且添加了一些用於生命週期管理的方法(同時還有一些用於任務提交的便利方法).

複製程式碼

public interface ExecutorService extends Executor {

    //關閉服務用的方法們...
    void shutdown();

   
    List<Runnable> shutdownNow();

 
    boolean isShutdown();

  
    boolean isTerminated();

    
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

   
    // 任務提交的便利方法..
    <T> Future<T> submit(Callable<T> task);

  
    <T> Future<T> submit(Runnable task, T result);

    
    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

複製程式碼

 

 

ExecutorService暗示了生命週期有3種狀態:執行(running)、關閉(shutting down)和終止(terminated).ExecutorServie最初建立後的初始狀態是執行狀態.shutdown方法會啟動一個平緩地關閉過程:停止接受新的任務,同時等待已經提交的任務完成----包括尚未開始執行的任務.

shutdownNow方法會啟動一個強制的關閉過程:嘗試取消所有執行中的任務和排在佇列中尚未開始的任務.

 

 在關閉後提交到ExecutorService中的任務,會被拒絕執行處理器器(rejected execution handler)處理.拒絕執行處理器(拒絕執行處理器是ExecutorService的一種實現.ThreadPoolExecutor提供的,ExecutorService介面中的方法並不提供拒絕執行處理器.)

可能只是簡單地放棄任務,也可能會引起execute丟擲一個未檢查的RejectedExecutionException.一旦所有的任務全部完成後,ExecutorService會轉入終止狀態.你可以呼叫awaitTermination等待ExecutorService到達終止狀態.也可以輪詢檢查isTerminated判斷ExecutorService是否已經終止.通常shutdown會緊隨awaitTermination之後,這樣可以產生同步地關閉ExecutorService的效果.

 

簡單的測試程式碼:

複製程式碼

public class ExecutorServiceTest {

    private final static  ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);

    public  void start(){

    //根據執行緒池的中斷狀態執行任務
        while (!EXECUTOR_SERVICE.isShutdown()){
                try {
                    Runnable r = new Runnable(){
                        @Override
                        public void run() {
                          //  System.out.println("執行著呢");
                        }
                    };
                    EXECUTOR_SERVICE.execute(r);
                } catch (RejectedExecutionException e) {
                    e.printStackTrace();
                    if (!EXECUTOR_SERVICE.isShutdown()){
                        System.out.println("捕獲了結束異常...");
                    }
            }


        }
    }


    /*
    * 終止現場城池
    * */
    public void stop(){
        EXECUTOR_SERVICE.shutdown();
    }



    public static void main(String [] args) throws InterruptedException {

        ExecutorServiceTest executorServiceTest = new ExecutorServiceTest();

        //建立一個新執行緒,3秒後停止執行緒池
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    executorServiceTest.stop();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }).start();

        executorServiceTest.start();
    }

}

複製程式碼

 

 

定時任務

Timer工具管理任務的延遲執行("100ms後執行該任務")以及週期執行("每10ms執行一次任務").但是Timer存在一些缺陷.應該考慮使用ScheduledThreadPoolExecutor替代Timer.

 

Timer的缺點:

1. 時間準確性:Timer只建立唯一的執行緒來執行所有timer任務.如果一個timer任務很耗時,會導致其他TimerTask的時效準確性出問題.例如一個TimerTask每10ms執行一次,而另一個TimerTask每40ms執行一次,重複出現的任務要麼會在耗時的任務完成後快速連續呼叫4次,要麼完全"丟失"4次呼叫(取決於它是否按照固定的頻率或延遲執行排程).排程執行緒池(Scheduled thread pool)解決了這個缺陷,它讓你可以提供多個執行緒來執行延遲、並具週期性的任務.

 

2.執行緒被意外終止不會再啟動:Timer的另一個問題在於,如果TimerTask丟擲未檢查的異常,Timer將會產生無法預料的行為.Timer執行緒並不捕獲異常,所以TimerTask丟擲的未檢查的異常會終止timer執行緒.這種情況下Timer也不會在重新恢復執行緒的執行了:它錯誤地任務整個Timer都被取消了.此時,已經背安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了.(這個問題叫執行緒洩露)

 

複製程式碼

public class OutOfTime {

    static class ThrowTask extends TimerTask {
        @Override
        public void run() {
            System.out.println("執行了任務.");
            throw new RuntimeException();
        }
    }


    public static void main(String [] args) throws InterruptedException {
        //建立一個timer
        Timer timer = new Timer();
        //延遲一秒執行執行任務.
        timer.schedule(new ThrowTask(),1);
        Thread.sleep(1000);
        //延遲三秒執行任務
        timer.schedule(new ThrowTask(),3);

    }
}

複製程式碼

 

列印輸出:

執行了任務.
Exception in thread "Timer-0" java.lang.RuntimeException
at cn.bj.lbr.test.chap6.OutOfTime$ThrowTask.run(OutOfTime.java:19)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
Exception in thread "main" java.lang.IllegalStateException: Timer already cancelled.
at java.util.Timer.sched(Timer.java:397)
at java.util.Timer.schedule(Timer.java:193)
at cn.bj.lbr.test.chap6.OutOfTime.main(OutOfTime.java:31)

 

可以看到"執行了任務"就輸出了一次,後面的任務在丟擲異常後沒有繼續執行.

 

再次重申: 應該使用ScheduledThreadPoolExecutor來替換Timer.

 

可攜帶結果的任務:Callable和Future

Callable就是一個可以有返回值的Runnable.

複製程式碼

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

複製程式碼

 

 

 Executors包含了一些工具方法,可以把其他型別的任務封裝成一個Callable,比如Runnable和

java.security.PrivilegedAction.

 

一個Executor執行的任務的生命週期有四個階段:建立、提交、開始和完成.由於任務的執行可能會話費很長時間,我們也希望可以取消一個任務.在Executor框架中,總可以取消已經提交但尚未開始的任務,但是對於已經開始的任務,只有它們響應中斷,才可以取消.取消一個已經完成的任務沒有影響.

 

Future描述了任務的生命週期,並提供了相關的方法來獲得任務的結果、取消任務以及校驗任務是否已經完成還是被取消.Future的歸約中暗示了任務的生命週期是單向的,不能後退--就像ExecutorService的生命週期一樣,一旦任務完成,他就永遠停留在完成狀態上了.

 

任務的狀態(尚未開始,執行中,完成)決定了get方法的行為,如果任務已經完成,get會立即返回或者丟擲一個Excption,如果任務沒有完成,get會阻塞直到它完成.如果任務丟擲了異常,get會將該異常封裝為ExecutionException,然後重新丟擲;如果任務被取消,get會丟擲CancellationException.當丟擲ExecutionException時,可以用getCause重新獲得被封裝的原始異常.

 

有很多種方法可以建立一個描述任務的Future.ExecutorService中的所有submit方法都返回一個Future,因此你可以將一個Runnable或一個Callable提交給executor,然後得到一個Future,用它來重新獲得任務執行的結果,或取消任務.

 

複製程式碼

public static void main(String [] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "返回callable的結果";
            }
        };

        Future<String> future = executorService.submit(callable);

        future.get();
        //取消任務
        //future.cancel(true);
    }

複製程式碼

 

 

你也可以顯示地為給定的Runnable或Callable例項化一個FutureTask.(FutureTask實現了Runnable,所以既可以將它提交給Executor來執行,又可以直接呼叫run方法執行).

複製程式碼

        //建立一個callable
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "返回callable的結果";
            }
        };

        //包裝進一個FutureTask
        FutureTask<String> futureTask = new FutureTask<String>(callable);
        //必須run否則會阻塞..
        futureTask.run();
        //獲得結果..
        String s = futureTask.get();
        System.out.println("s = " + s);

複製程式碼

 

 ExecutorService.submit()之所以能返回FutureTask是通過呼叫內部的newTaskFor方法,把Runnable或Callable包裝成一個FutureTask.

 

AbstractExecutorService,ExecutorService的實現類:

複製程式碼

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }


protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

複製程式碼

 

 

 將Runnable或Callable提交到Executor的行為,可以建立一個安全釋出(可以看第二篇部落格),以保證Runnable或Callable從提交執行緒暴露到最終執行任務的執行緒的過程是執行緒安全的.類似地,設定Future結果值的行為,也可以簡歷一個安全釋出,以保證這個結果從計算它的執行緒暴露到通過get重獲他的任何執行緒都是執行緒安全的.示例:

複製程式碼

ExecutorService executorService = Executors.newCachedThreadPool();


        //將物件宣告為final的,保證執行緒傳遞物件的執行緒安全性.
        //將物件宣告為final的,是安全釋出的一種形式.
        final List<Object> list = new ArrayList<>();
        Callable<List> callable = new Callable<List>() {
            @Override
            public List call() throws Exception {
                List<Object> newList = new ArrayList<>();
                for (Object o : list) {
                    newList.add(o);
                }
                return  newList;
            }
        };


        Future<List> future = executorService.submit(callable);
        //將物件宣告為final的,保證執行緒傳遞物件的執行緒安全性.
        final  List result = future.get();

複製程式碼

 

 

 在中斷異常中傳遞中斷狀態&取消任務:

複製程式碼

  try {
            final  List result = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
            //傳遞中斷狀態,讓呼叫棧上層程式碼對中斷操作進行處理
            Thread.currentThread().interrupt();

            //取消future任務
            future.cancel(true);    
        }

複製程式碼

 

 

我們在開發並行程式的時候,最好不要並行執行不同種類的任務.

兩個人可以有效公平地分擔清洗餐餐具的工作:一個人清洗的同時,另一個人烘乾.但是很好地按照比例分配不同種類的任務是困難的;當有更多的人出現時,如何確保他們在幫忙的同時沒有變的礙手礙腳的,或者沒有導致大範圍的勞動力重新分工,這並不是很容易辦到的.如果沒有在相似的人物之間發現更好的並行性,那麼並行執行方法應有的好處會逐漸減少.

 

更為嚴重的問題是,在給多個工作者劃分相異任務時,各個任務的大小可能完全不同.比如你為兩個工作者劃分了兩個任務A和B,但是A執行花費的時間是B的10倍,那麼你的整個過程僅僅加速了9%而已.最後在多個工作者之間劃分任務,總會涉及到一些任務協調上的開銷;為了使任務劃分是值得的,這一開銷不能多於通過並行性帶來的生產力的提高.

 

大量相互獨立且同類的任務進行並行處理,會將程式的任務量分配到不同的任務中,這樣才能真正獲得性能的提升.

 

CompletionService:當Executor遇見BlockingQueue

如果你向Executor提交了一個批處理任務,並且希望在它們完成後獲得結果,為此你可以儲存在每個任務相關聯的Future,然後不斷地呼叫timeout為零的get,來檢驗Future是否完成.這樣太麻煩了,可以使用完成服務(completion service)

 

CompletionService整合了Executor和BlockingQueue的功能.你可以將Callable任務提交給它去執行,然後使用類似於佇列中的take和poll方法,在結果完整可用時獲得這個結果,像一個打爆的Future.ExecutorCompletionService是實現CompletionService介面的一個類,並將計算任務委託給一個Executor.簡單的使用:

複製程式碼

//建立一個執行緒池
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        //建立一個CompletionService,執行緒池作為引數傳進來
        CompletionService completionService = new ExecutorCompletionService(executorService);
        for (int i = 0; i < 5; i++) {
            completionService.submit(() -> {
                Thread.sleep(1000);
                Random r = new Random();
                int num = r.nextInt();
                System.out.println(num);
                return num;
            });
        }



        for (int i = 0; i < 5; i++) {
            //內部是一個阻塞佇列,.take()是從阻塞佇列裡面拿值沒有會阻塞,返回一個future,再取值的時候.get()會阻塞結果
            System.out.println("completionService.take().get() = " + completionService.take().get());;
        }

複製程式碼

 

 

completionService.take().get()得到的結果的順序並不是任務執行的順序,而是按結果返回的順序.比如第一個任務執行一秒第二個任務執行3秒第三個任務執行2秒,返回的結果就是任務一任務三.任務二. 改進版測試:

複製程式碼

  //建立一個執行緒池,執行緒的數量改為5
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        //建立一個CompletionService,執行緒池作為引數傳進來
        CompletionService completionService = new ExecutorCompletionService(executorService);
        for (int i = 0; i < 5; i++) {
            completionService.submit(() -> {
                //執行緒休眠的時間改為隨機值.
                int num = (int) (Math.random()*1000);

                System.out.println(num);
                Thread.sleep(num);

                return num;
            });
        }



        for (int i = 0; i < 5; i++) {
            //內部是一個阻塞佇列,.take()是從阻塞佇列裡面拿值沒有會阻塞,返回一個future,再取值的時候.get()會阻塞結果
            System.out.println("completionService.take().get() = " + completionService.take().get());;
        }

複製程式碼

 

 

 輸出的結果:

317
738
415
731
636
completionService.take().get() = 317
completionService.take().get() = 415
completionService.take().get() = 636
completionService.take().get() = 731
completionService.take().get() = 738

看輸出的順序

 

為任務設定時限

有時候如果一個活動無法在某個確定時間內完成,那麼它的結果就失敗了,此時程式可以放棄該活動.(例如獲取廣告資訊,超過時限返回預設廣告資訊)

 

Future.get的限時版本指定等待的時間,超時則丟擲TimeoutException.

 

使用限時任務的第二個情況是,當他們超時後應該能夠停止它們,這樣才不會為繼續計算一個無用的結果而浪費計算資源.為了達到這個目的,可以讓任務自己嚴格管理他的預定時間,超時後就中止執行;或者也可以在超出時限後取消任務.如果你編寫的任務是可取消的.就可以更靈敏地中止它,以避免消耗更多的資源.示例:

複製程式碼

//要返回的結果
        Object obj = null;
        //假裝的future
        Future future = new FutureTask(() -> "假裝要實現的任務");
        //執行future
        ((FutureTask) future).run();
        try {
            //等待10s.
            obj = future.get(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
            //賦預設值
            obj = "預設值";
        } catch (TimeoutException e) {
            e.printStackTrace();
            //賦預設值&取消任務
            obj = "預設值";
            future.cancel(true);
        }

        return obj;

複製程式碼

 

 

更高階的用法:使用ExecutorService的invokeAll()可以處理一個任務的容器(collection,就是一個集合裡面都是任務),並返回一個Future的容器(就是一個集合裡面都是結果集),當所有任務都完成時、呼叫執行緒被中斷時或者超過實現時,限時版本的invokeAll都會返回結果.超過時限後,任何尚未完成的任務都會被取消.作為invokeAll的返回值,每個任務要麼正常地完成,要麼被取消;客戶端程式碼可以呼叫get或者isCancelled來查明是屬於哪一種情況.

複製程式碼

ExecutorService executorService = Executors.newCachedThreadPool();
        List<Callable<String>> list = new ArrayList<>();
        Callable<String> callable1 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "任務一";
            }
        };
        Callable<String> callable2 = new Callable() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return "任務二";
            }
        };
        Callable<String> callable3 = new Callable() {
            @Override
            public String call()  {
                //執行緒休息3秒
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.getCause().printStackTrace();
                    System.out.println("進入超時異常");
                }
                return "任務三";
            }
        };
        list.add(callable1);
        list.add(callable2);
        list.add(callable3);

        //超過設定時間的任務會被取消
        List<Future<String>> futures = null;
        try {
            futures = executorService.invokeAll(list, 5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for(Future f : futures){
            try {
                System.out.println(f.get());
            } catch (ExecutionException e) {
                System.out.println("進入了f.get()的執行異常");
                //e.printStackTrace();
            } catch (InterruptedException e) {
                System.out.println("進入了f.get()的中斷異常");
                //e.printStackTrace();
            }catch (CancellationException e){
                System.out.println("進入了f.get()的取消異常");
                e.printStackTrace();
            }
        }

複製程式碼

 

 最後CancellationException是手動加上的,不加也會拋這個異常,但是加上可以做一些特別的處理(比如找到取消的任務,持久化下次再操作). 還有一個就是新增任務的順序和返回的future的順序是一一對應的.

來源:https://www.cnblogs.com/xisuo/p/9786996.html

 

鄭州市不孕不育醫院

鄭州哪個醫院治療不孕不育好

鄭州不孕不育哪家好

鄭州看不孕不育哪裡好