1. 程式人生 > >併發新特性—Executor框架與執行緒池(含程式碼)

併發新特性—Executor框架與執行緒池(含程式碼)

Executor框架簡介

在Java 5之後併發程式設計引入了一堆新的啟動、排程和管理執行緒的API。Executor框架便是Java 5中引入的,其內部使用了執行緒池機制,它在java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行和關閉,可以簡化併發程式設計的操作。因此,在Java 5之後,通過Executor來啟動執行緒比使用Thread的start方法更好,除了更易管理,效率更好(用執行緒池實現,節約開銷)外,還有關鍵的一點:有助於避免this逃逸問題——如果我們在構造器中啟動一個執行緒,因為另一個任務可能會在構造器結束之前開始執行,此時可能會訪問到初始化了一半的物件用Executor在構造器中。

    Executor框架包括:執行緒池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

    Executor介面中之定義了一個方法execute(Runnable command),該方法接收一個Runable例項,它用來執行一個任務,任務即一個實現了Runnable介面的類。ExecutorService介面繼承自Executor介面,它提供了更豐富的實現多執行緒的方法,比如,ExecutorService提供了關閉自己的方法,以及可為跟蹤一個或多個非同步任務執行狀況而生成 Future 的方法。 可以呼叫ExecutorService的shutdown()方法來平滑地關閉 ExecutorService,呼叫該方法後,將導致ExecutorService停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉ExecutorService

。因此我們一般用該介面來實現和管理多執行緒。

    ExecutorService的生命週期包括三種狀態:執行、關閉、終止。建立後便進入執行狀態,當呼叫了shutdown()方法時,便進入關閉狀態,此時意味著ExecutorService不再接受新的任務,但它還在執行已經提交了的任務,當素有已經提交了的任務執行完後,便到達終止狀態。如果不呼叫shutdown()方法,ExecutorService會一直處在執行狀態,不斷接收新的任務,執行新的任務,伺服器端一般不需要關閉它,保持一直執行即可。

    Executors提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了ExecutorService介面。

    public static ExecutorService newFixedThreadPool(int nThreads)

    建立固定數目執行緒的執行緒池。

    public static ExecutorService newCachedThreadPool()

    建立一個可快取的執行緒池,呼叫execute將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新線   程並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。

    public static ExecutorService newSingleThreadExecutor()

    建立一個單執行緒化的Executor。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。

    這四種方法都是用的Executors中的ThreadFactory建立的執行緒,下面就以上四個方法做個比較




newCachedThreadPool()                                                                                                                                         

-快取型池子,先檢視池中有沒有以前建立的執行緒,如果有,就 reuse.如果沒有,就建一個新的執行緒加入池中
-快取型池子通常用於執行一些生存期很短的非同步型任務
 因此在一些面向連線的daemon型SERVER中用得不多。但對於生存期短的非同步任務,它是Executor的首選。
-能reuse的執行緒,必須是timeout IDLE內的池中執行緒,預設     timeout是60s,超過這個IDLE時長,執行緒例項將被終止及移出池。
  注意,放入CachedThreadPool的執行緒不必擔心其結束,超過TIMEOUT不活動,其會自動被終止。



newFixedThreadPool(int)                                                      

-newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的執行緒

-其獨特之處:任意時間點,最多隻能有固定數目的活動執行緒存在,此時如果有新的執行緒要建立,只能放在另外的佇列中等待直到當前的執行緒中某個執行緒終止直接被移出池子
-和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文件沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規併發執行緒,多用於伺服器
-從方法的原始碼看,cache池和fixed 池呼叫的是同一個底層 池,只不過引數不同:
fixed池執行緒數固定,並且是0秒IDLE(無IDLE)    
cache池執行緒數支援0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE  


newScheduledThreadPool(int)

-排程型執行緒池
-這個池子裡的執行緒可以按schedule依次delay執行,或週期執行

SingleThreadExecutor()

-單例執行緒,任意時間池中只能有一個執行緒
-用的是和cache池和fixed池相同的底層池,但執行緒數目是1-1,0秒IDLE(無IDLE)

    一般來說,CachedTheadPool在程式執行過程中通常會建立與所需數量相同的執行緒,然後在它回收舊執行緒時停止建立新執行緒,因此它是合理的Executor的首選,只有當這種方式會引發問題時(比如需要大量長時間面向連線的執行緒時),才需要考慮用FixedThreadPool。(該段話摘自《Thinking in Java》第四版)

Executor執行Runnable任務

    通過Executors的以上四個靜態工廠方法獲得 ExecutorService例項,而後呼叫該例項的execute(Runnable command)方法即可。一旦Runnable任務傳遞到execute()方法,該方法便會自動在一個執行緒上執行。下面是是Executor執行Runnable任務的示例程式碼:

  1. import java.util.concurrent.ExecutorService;   
  2. import java.util.concurrent.Executors;   
  3. publicclass TestCachedThreadPool{   
  4.     publicstaticvoid main(String[] args){   
  5.         ExecutorService executorService = Executors.newCachedThreadPool();   
  6. //      ExecutorService executorService = Executors.newFixedThreadPool(5);
  7. //      ExecutorService executorService = Executors.newSingleThreadExecutor();
  8.         for (int i = 0; i < 5; i++){   
  9.             executorService.execute(new TestRunnable());   
  10.             System.out.println("************* a" + i + " *************");   
  11.         }   
  12.         executorService.shutdown();   
  13.     }   
  14. }   
  15. class TestRunnable implements Runnable{   
  16.     publicvoid run(){   
  17.         System.out.println(Thread.currentThread().getName() + "執行緒被呼叫了。");   
  18.     }   
  19. }  
  某次執行後的結果如下:

   從結果中可以看出,pool-1-thread-1和pool-1-thread-2均被呼叫了兩次,這是隨機的,execute會首先線上程池中選擇一個已有空閒執行緒來執行任務,如果執行緒池中沒有空閒執行緒,它便會建立一個新的執行緒來執行任務。

Executor執行Callable任務

    在Java 5之後,任務分兩類:一類是實現了Runnable介面的類,一類是實現了Callable介面的類。兩者都可以被ExecutorService執行,但是Runnable任務沒有返回值,而Callable任務有返回值。並且Callable的call()方法只能通過ExecutorService的submit(Callable<T> task) 方法來執行,並且返回一個 <T>Future<T>,是表示任務等待完成的 Future。

    Callable介面類似於Runnable,兩者都是為那些其例項可能被另一個執行緒執行的類設計的。但是 Runnable 不會返回結果,並且無法丟擲經過檢查的異常而Callable又返回結果,而且當獲取返回結果時可能會丟擲異常。Callable中的call()方法類似Runnable的run()方法,區別同樣是有返回值,後者沒有。

    當將一個Callable的物件傳遞給ExecutorService的submit方法,則該call方法自動在一個執行緒上執行,並且會返回執行結果Future物件。同樣,將Runnable的物件傳遞給ExecutorService的submit方法,則該run方法自動在一個執行緒上執行,並且會返回執行結果Future物件,但是在該Future物件上呼叫get方法,將返回null。

    下面給出一個Executor執行Callable任務的示例程式碼:

  1. import java.util.ArrayList;   
  2. import java.util.List;   
  3. import java.util.concurrent.*;   
  4. publicclass CallableDemo{   
  5.     publicstaticvoid main(String[] args){   
  6.         ExecutorService executorService = Executors.newCachedThreadPool();   
  7.         List<Future<String>> resultList = new ArrayList<Future<String>>();   
  8.         //建立10個任務並執行 
  9.         for (int i = 0; i < 10; i++){   
  10.             //使用ExecutorService執行Callable型別的任務,並將結果儲存在future變數中 
  11.             Future<String> future = executorService.submit(new TaskWithResult(i));   
  12.             //將任務執行結果儲存到List中 
  13.             resultList.add(future);   
  14.         }   
  15.         //遍歷任務的結果 
  16.         for (Future<String> fs : resultList){   
  17.                 try{   
  18.                     while(!fs.isDone);//Future返回如果沒有完成,則一直迴圈等待,直到Future返回完成
  19.                     System.out.println(fs.get());     //列印各個執行緒(任務)執行的結果 
  20.                 }catch(InterruptedException e){   
  21.                     e.printStackTrace();   
  22.                 }catch(ExecutionException e){   
  23.                     e.printStackTrace();   
  24.                 }finally{   
  25.                     //啟動一次順序關閉,執行以前提交的任務,但不接受新任務
  26.                     executorService.shutdown();   
  27.                 }   
  28.         }   
  29.     }   
  30. }   
  31. class TaskWithResult implements Callable<String>{   
  32.     privateint id;   
  33.     public TaskWithResult(int id){   
  34.         this.id = id;   
  35.     }   
  36.     /**  
  37.      * 任務的具體過程,一旦任務傳給ExecutorService的submit方法, 
  38.      * 則該方法自動在一個執行緒上執行 
  39.      */
  40.     public String call() throws Exception {  
  41.         System.out.println("call()方法被自動呼叫!!!    " + Thread.currentThread().getName());   
  42.         //該返回結果將被Future的get方法得到
  43.         return"call()方法被自動呼叫,任務返回的結果是:" + id + "    " + Thread.currentThread().getName();   
  44.     }   
  45. }  
    某次執行結果如下:

   

    從結果中可以同樣可以看出,submit也是首先選擇空閒執行緒來執行任務,如果沒有,才會建立新的執行緒來執行任務。另外,需要注意:如果Future的返回尚未完成,則get()方法會阻塞等待,直到Future完成返回,可以通過呼叫isDone()方法判斷Future是否完成了返回。

自定義執行緒池

    自定義執行緒池,可以用ThreadPoolExecutor類建立,它有多個構造方法來建立執行緒池,用該類很容易實現自定義的執行緒池,這裡先貼上示例程式:
  1. import java.util.concurrent.ArrayBlockingQueue;   
  2. import java.util.concurrent.BlockingQueue;   
  3. import java.util.concurrent.ThreadPoolExecutor;   
  4. import java.util.concurrent.TimeUnit;   
  5. publicclass ThreadPoolTest{   
  6.     publicstaticvoid main(String[] args){   
  7.         //建立等待佇列 
  8.         BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);   
  9.         //建立執行緒池,池中儲存的執行緒數為3,允許的最大執行緒數為5
  10.         ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);   
  11.         //建立七個任務 
  12.         Runnable t1 = new MyThread();   
  13.         Runnable t2 = new MyThread();   
  14.         Runnable t3 = new MyThread();   
  15.         Runnable t4 = new MyThread();   
  16.         Runnable t5 = new MyThread();   
  17.         Runnable t6 = new MyThread();   
  18.         Runnable t7 = new MyThread();   
  19.         //每個任務會在一個執行緒上執行
  20.         pool.execute(t1);   
  21.         pool.execute(t2);   
  22.         pool.execute(t3);   
  23.         pool.execute(t4);   
  24.         pool.execute(t5);   
  25.         pool.execute(t6);   
  26.         pool.execute(t7);   
  27.         //關閉執行緒池 
  28.         pool.shutdown();   
  29.     }   
  30. }   
  31. class MyThread implements Runnable{   
  32.     @Override
  33.     publicvoid run(){   
  34.         System.out.println(Thread.currentThread().getName() + "正在執行。。。");   
  35.         try{   
  36.             Thread.sleep(100);   
  37.         }catch(InterruptedException e){   
  38.             e.printStackTrace();   
  39.         }   
  40.     }   
  41. }  
    執行結果如下:

    從結果中可以看出,七個任務是線上程池的三個執行緒上執行的。這裡簡要說明下用到的ThreadPoolExecuror類的構造方法中各個引數的含義。

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<

相關推薦

併發特性Executor框架執行程式碼

Executor框架簡介 在Java 5之後,併發程式設計引入了一堆新的啟動、排程和管理執行緒的API。Executor框架便是Java 5中引入的,其內部使用了執行緒池機制,它在java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行

併發特性Executor 框架執行

蘭亭風雨 · 更新於 2018-11-14 09:00:31 併發新特性—Executor 框架與執行緒池 Executor 框架簡介 在 Java 5 之後,併發程式設計引入了一堆新的啟動、排程和管理執行緒的API。Executor 框架便是 Java 5 中引入的,其內部使用了執行緒池機

轉:【Java並發編程】之十九:並發特性Executor框架線程代碼

接口類 容易 20px 了解 大小 執行c 生命周期 schedule p s Executor框架簡介 在Java 5之後,並發編程引入了一堆新的啟動、調度和管理線程的API。Executor框架便是Java 5中引入的,其內部使用了線程池機制,它在java.

【JVM第九篇】:Executor框架執行

Executor框架簡介 在Java 5之後,併發程式設計引入了一堆新的啟動、排程和管理執行緒的API。Executor框架便是Java 5中引入的,其內部使用了執行緒池機制,它在java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行和關閉,可以簡化併發程式設計

java執行Executor框架執行

執行緒雖然在web開發中用的不算特別多,但在特定的情況下還是能發揮重要重要作用的,因此即使用的少還是掌握下比較好;下面先回顧下比較執行緒的常規實現方法 1 繼承Thread類 2 實現runnable介面(使用較多) java5之後有了新的執行緒實現方式,java5可以使用

Java併發程式設計2執行中斷程式碼

使用interrupt()中斷執行緒當一個執行緒執行時,另一個執行緒可以呼叫對應的Thread物件的interrupt()方法來中斷它,該方法只是在目標執行緒中設定一個標誌,表示它已經被中斷,並立即返回。這裡需要注意的是,如果只是單純的呼叫interrupt()方法,執行緒並沒有實際被中斷,會繼續往下執行。

Java併發基礎知識—— Executor框架執行

在Java併發(基礎知識)—— 建立、執行以及停止一個執行緒中講解了兩種建立執行緒的方式:直接繼承Thread類以及實現Runnable介面並賦給Thread,這兩種建立執行緒的方式線上程比較少的時候是沒有問題的,但是當需要建立大量執行緒時就會出現問題,因為

JAVA Executor框架建立執行

為了更好的控制多執行緒,JDK提供理論一套執行緒框架Executor,幫助開發人員有效的進行執行緒控制。它們都在java.util.concurrent包中,是JDK併發包的核心。其中有一個比較重要的類:Executors,他扮演著執行緒工廠的角色,我們通過Executors可以建立特定功能的執行緒

Java的Executor框架執行實現原理

一,Java的Executor框架 1,Executor介面 public interface Executor {     void execute(Runnable command); } Executor介面是Executor框架中最基礎的部分,定義了一個用於

Executor框架執行

在JDK5後主要提供的多執行緒處理都在java.util.concurrent包中,多執行緒的主要抽象不是Thread,而是Executor,Executor為介面,定義在java.util.concurrent包下,只定義了一個方法: public in

程序執行總結比較全面

1.程序和執行緒 1.1 概述: 程序是具有一定獨立功能的程式關於某個資料集合上的一次執行活動,程序是系統進行資源分配和排程的一個獨立單位. 執行緒是程序的一個實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位.執行緒自己基本上不擁有系統資源,只擁有一點在執行中必不

Android執行執行

前言,學習安卓很久了,一直也沒有學部落格的習慣,下決心從今天開始要養成寫部落格總結學習經驗的好習慣! 一.Android中執行緒與執行緒池的簡介 在Android中執行緒主要可以分為兩大類:一個用於處理介面相關與使用者互動的執行緒-主執行緒;一個用於處理耗時任務-子執行緒

socket,執行TCP通訊

Server 1 package day20150914socket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; impo

Linux雜談: 實現一種簡單實用的執行C語言

基本功能 1. 實現一個執行緒的佇列,佇列中的執行緒啟動後不再釋放; 2. 沒有任務執行時,執行緒處於pending狀態,等待喚醒,不佔cpu; 3. 當有任務需要執行時,從執行緒佇列中取出一個執行緒執行任務; 4. 任務執行完成後執行緒再次進入pending狀態,等待喚醒;   擴充套件功能 1.

java8特性:CompletableFuture多執行併發非同步程式設計

首先因為現在的應用越來越複雜,越來越多模組多系統之間相互依賴,一個操作可能需要使用多個模組或者多個系統提供的多個服務來完成一個功能,如果每個服務順序的執行,可能需要消耗很多時間,或者前端使用者需要得到及時響應,不需要等待所有服務完成便可以返回部分結果,而且現在的計算機處理器效能越來越強大,多核處理器越來越普遍

Java併發程式設計3執行掛起、恢復終止的正確方法程式碼

JAVA大資料中高階架構 2018-11-06 14:24:56掛起和恢復執行緒Thread 的API中包含兩個被淘汰的方法,它們用於臨時掛起和重啟某個執行緒,這些方法已經被淘汰,因為它們是不安全的,不穩定的。如果在不合適的時候掛起執行緒(比如,鎖定共享資源時),此時便可能會發生死鎖條件——其他執行緒在等待該

C++11 併發執行未完成

從C++11新標準開始,C++語言本身增加了對多執行緒的支援,意味著使用C++可實現多執行緒程式的可移植,跨平臺。 在標準的C++程式中,主執行緒從main()開始執行,我們自己在C++中建立的執行緒,也需要從一個函式開始執行(這個函式叫做初始函式),一旦這個函式執行完

java socket 服務端併發處理 執行的使用

package yiwangzhibujian.threadserver; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.

jdk8特性Lambda表示式結合spring 執行,一行程式碼實現多執行

1.配置spring 執行緒池 @Configuration @EnableAsync @ConfigurationProperties(prefix="threadpool") public class ExecutePoolConfiguration { @V

java8特性:Stream多執行並行資料處理

將一個順序執行的流轉變成一個併發的流只要呼叫 parallel()方法 public static long parallelSum(long n){     return Stream.iterate(1L, i -> i +1).limit(n).parallel().reduce(0L,Long