1. 程式人生 > >Java併發程式設計筆記4-執行緒池

Java併發程式設計筆記4-執行緒池

我們使用執行緒的時候就去建立一個執行緒,但是就會有一個問題:

  如果併發的執行緒數量非常多,而且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會導致大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。

那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?

執行緒池正好能解決這樣的問題。正如名稱所稱的那樣,執行緒池管理一個工作者執行緒的同構池。執行緒池是與工作佇列緊密繫結的。所謂工作佇列,其作用是持有所有等待執行的任務。

工作者執行緒的生活從此輕鬆起來:它從工作佇列中獲取下一個任務,執行它,然後回來等待另外一個執行緒。

這類似於企業應用程式中事務監聽器(transaction monitor)的角色:它將課執行事務的數量控制在一個合理的水平中,不會因過渡濫用事務而耗盡有限資源。

  執行緒池中執行任務執行緒,這方法有很多“每任務每執行緒”無法筆記的優勢。重用存在的執行緒,而不是建立新的執行緒,這可以在處理多請求時抵消執行緒建立,消亡產生的開銷。還有一個好處就是,在請求到達時,工作者執行緒通常已經存在

,用於建立執行緒的等待時間並不會延遲任務的執行,因此提高響應性。通過適當地調整執行緒池的大小,你可以得到足夠多的執行緒以保持處理器忙碌,同時可以還防止過多的執行緒互相競爭資源,導致應用程式耗盡記憶體或者失敗。

每任務每執行緒例子如下:

複製程式碼
public class ThreadPool {

    public static void main(String[] args) throws
IOException { ServerSocket serverSocket = new ServerSocket(80); while (true){ final Socket socket = serverSocket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(socket); } };
new Thread(task).start(); } } private static void handleRequest(Socket socket) { } }
複製程式碼

可以看到這個例子是一個粗製濫造的併發服務端,來一個使用者就建立一個執行緒,你根本就不知道有多少使用者來,要建立多少個執行緒。這樣頻繁建立執行緒就會導致大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間,

過渡濫建立執行緒而耗盡有限資源。

由於這原因,java中給我們提供Executor框架。通過Executors中的某個靜態工廠方法來建立一個執行緒池:

  1.newFixedThreadPool 建立一個定長的執行緒池,每當提交一個任務就建立一個執行緒,知道達到池的最大長度,這時執行緒池會保持長度不再變化(如果一個執行緒由於非預期的Exception而結束,執行緒池會補充一個新的執行緒)。

  下面用newFixedThreadPool 建立一個定長的執行緒池來改造上面的例子,如下:

複製程式碼
public class ThreadPool {

    public static void main(String[] args) throws IOException {
        //newFixedThreadPool引數為執行緒池的大小
        Executor executor = Executors.newFixedThreadPool(100);
        
        ServerSocket serverSocket = new ServerSocket(80);
        while (true){
            final Socket socket = serverSocket.accept();
            Runnable task = new Runnable() {
                public void run() {
                    handleRequest(socket);
                }
            };
            //直接將任務丟進執行緒池來執行任務
            executor.execute(task);
        }
    }

    private static void handleRequest(Socket socket) {
    }

}
複製程式碼

這樣就不會發生過渡濫建立執行緒而耗盡有限資源。

  2.newSingleThreadExecutor建立一個單執行緒化的executor,他只建立唯一的工作者執行緒來執行任務,如果這個執行緒異常結束,會有另一個取代它。executor會保證任務依照任務佇列所規定的順序(FIFO,LIFO,優先順序)執行。

  3.newCachedThreadPool建立一個可快取的執行緒池,如果當前執行緒的長度超過了處理的需要,它可以靈活的回收空閒的執行緒,當需求增加時,它可以靈活的增加新的執行緒,並不會對池的長度做任何限制。但是認為改執行緒池的長度沒有任何限制,有可能會把資源耗盡,

這需要自己很好的把控了。

  4.newScheduledThreadPool建立一個定長的執行緒池,而且支援定時的以及週期性的任務執行,類似於Timer。

Executor的生命週期:

  Executor實現通常知識為執行任務而建立執行緒。但是JVM會在所有(非後臺的,nondaemon)執行緒全部終止後才退出。因此,如果無法正確關閉Executor,將會阻止JVM的結束。

因為Executor是非同步地執行任務,所以在任何時間裡,所有之前提交的任務狀態都不能立即可見。這些任務中,有些可能已經完成,有些可能正在執行,其他的還可能在佇列中等待執行。關閉應用程式時,程式會出現很多中情況:從平緩關閉

到最突然的關閉,以及介於這兩種階段情況之間的各種可能。Executor是為應用服務提供服務的,他們理應可以關閉,無論是平緩還是突然。

注意:關閉操作還會影響到記錄應用程式任務狀態的反饋資訊。

Executor就是一個介面,原始碼如下圖:

我們可以進入Executors這個類的原始碼,如下:

可以看到newFixedThreadPool 建立一個定長的執行緒池,返回的是一個ExecutorService,但是我們上面例子接收的是Executor,為什麼Executor也可以接收呢?我們繼續進入ExecutorService原始碼如下:

可以看到原來ExecutorService繼承了Executor。ExecutorService擴充套件了Executor,並且添加了一些用於宣告週期管理的方法。

原始碼如下:

 

 ExecutorService暗示了生命週期有3種狀態:執行、關閉、終止。ExecutorService最初建立後的初始狀態是執行狀態。

shutdown方法會啟動一個平緩的關閉過程:停止接受新的任務,同時等待已經提交的任務完成,包括尚未開始執行的任務。

shutdownNow方法會啟動要給強制的關閉過程:嘗試取消所有執行中的任務和排在佇列中尚未開始執行的任務is。

isShutdown方法:判斷執行緒池(即ExecutorService)是否關閉。

isTerminated方法:是執行緒池(即ExecutorService)是否進入終止狀態。

  在關閉後提交到ExecutorService中的任務,會被拒絕執行處理器(rejected execution handler)處理。拒絕執行處理器是ExecutorService的一種實現,ThreadPoolExecutor提供的,ExecutorService介面中的方法並不提供拒絕執行處理器。拒絕執行處理器可能只是

簡單的放棄任務,也可能會引起execute丟擲一個未檢查的RejectedExecutionException。一旦所有任務全部完成後,ExecutorService會轉入終止狀態。

awaitTermination方法:等待ExecutorService到達終止狀態。

通常shutdown會緊隨awaitTermination之後,這樣可以產生同步地關閉ExecutorService的效果。

上面的Executor的例子的程式是沒辦法關閉執行緒池,會一直跑下去,那麼我們如何寫一個支援關閉的webserver呢?

明顯我們現在要用ExecutorService來改造上面的Executor的例子。虛擬碼如下:

複製程式碼
public class ThreadPool {
    private static ExecutorService executorService = Executors.newCachedThreadPool();
    public static void main(String[] args) throws IOException {

        //newFixedThreadPool引數為執行緒池的大小

        ServerSocket serverSocket = new ServerSocket(80);
        //這裡就不再像上面的例子一樣無限的接受任務了,要根據我的執行緒池是否處於關閉狀態來決定
        while (!executorService.isShutdown()){
            final Socket socket = serverSocket.accept();
            try{
                executorService.execute(new Runnable() {
                    public void run() {
                        handleRequest(socket);
                    }
                });
            }catch (RejectedExecutionException e){
                //如果拒絕服務不是因為我執行緒池關閉導致的,我們要在這裡列印一下日誌
                if (!executorService.isShutdown()){
                    System.out.println("接受任務被拒絕");
                    throw e;
                }
            }
        }
    }

    //用一個公共的方法去關閉執行緒池
    public void stop(){
        executorService.shutdown();
    }


    private static void handleRequest(Socket socket) {
        //獲取請求
        Request req = readRequest(socket);
        //如果請求已經關閉
        if (isShutdownRequest(req)){
            //關閉執行緒池
            stop();
        }else {
            //請求轉發
            dispatchRequest(req);
        }
    }

}
複製程式碼

經過改造,這服務端變的優雅多了。

延時的,並具有周期性的任務

在newScheduledThreadPool出來之前我們一般會用Timer和TimerTask來做。Timer在JDK裡面,是很早的一個API了。

但是Timer存在一些缺陷,Timer只建立唯一的執行緒來執行所有Timer任務。如果一個timer任務的執行很耗時,會導致其他TimerTask的時效準確性出問題。例如一個TimerTask每10秒執行一次,

而另外一個TimerTask每40ms執行一次,重複出現的任務會在後市的任務完成後快速連續的被呼叫4次,要麼完全“丟失”4次呼叫。

Timer的另外一個問題在於,如果TimerTask丟擲未檢查的異常會終止timer執行緒。這種情況下,Timer也不會重新回覆執行緒的執行了;它錯誤的認為整個Timer都被取消了。此時

已經被安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了。

現在我們看一下Timer的例子,如下:

複製程式碼
public class Shedule {
    private static long start;

    public static void main(String[] args) {
        TimerTask task = new TimerTask() {
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
                try{
                    Thread.sleep(3000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        };

        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
            }
        };

        Timer timer = new Timer();
        start = System.currentTimeMillis();
        //啟動一個排程任務,1S鍾後執行
        timer.schedule(task,1000);
        //啟動一個排程任務,3S鍾後執行
        timer.schedule(task1,3000);


    }

}
複製程式碼

上面程式我們預想是第一個任務執行後,第二個任務3S後執行的,即輸出一個1000,一個3000.

實際執行結果如下:

 實際執行結果並不如我們所願。世界結果,是過了4S後才輸出第二個任務,即4001約等於4秒。那部分時間時間到哪裡去了呢?那個時間是被我們第一個任務的sleep所佔用了。

現在我們在第一個任務中去掉Thread.sleep();這一行程式碼,執行是否正確了呢?如下:

複製程式碼
public class Shedule {
    private static long start;

    public static void main(String[] args) {
        TimerTask task = new TimerTask() {
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
               
            }
        };

        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
            }
        };

        Timer timer = new Timer();
        start = System.currentTimeMillis();
        //啟動一個排程任務,1S鍾後執行
        timer.schedule(task,1000);
        //啟動一個排程任務,3S鍾後執行
        timer.schedule(task1,3000);


    }

}
複製程式碼

執行結果如下:

可以看到確實是第一個任務過了1S後執行,第二個任務在第一個任務執行完後過3S執行了。

這就說明了Timer只建立唯一的執行緒來執行所有Timer任務。如果一個timer任務的執行很耗時,會導致其他TimerTask的時效準確性出問題

 Timer存在一些缺陷,因此你應該考慮使用ScheduledThreadPoolExecutor作為替代品。你可以通過建構函式或者通過newScheduledThreadPool工廠方法建立一個ScheduledThreadPoolExecutor。

如下:

複製程式碼
public class Shedule {
    private static long start;

    private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

    public static void main(String[] args) {
        TimerTask task = new TimerTask() {
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
                try{
                    Thread.sleep(3000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        };

        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
            }
        };

        Timer timer = new Timer();
        start = System.currentTimeMillis();

        //TimeUnit.MILLISECONDS指定毫秒為單位
        executorService.schedule(task,1000, TimeUnit.MILLISECONDS);
        executorService.schedule(task1,3000, TimeUnit.MILLISECONDS);

    }

}
複製程式碼

執行結果如下:

可以看到執行結果符合預期。

可以看到如果一個timer任務的執行很耗時(例如Thread.sleep),ScheduledThreadPoolExecutor並不會導致其他TimerTask的時效準確性出問題。

還可以看到,這兩個TimerTask互不干擾。

互相干擾還有一個反面:

    如果TimerTask丟擲未檢查的異常會終止timer執行緒。這種情況下,Timer也不會重新回覆執行緒的執行了;它錯誤的認為整個Timer都被取消了。此時

    已經被安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了。

例子如下:

複製程式碼
public class Shedule {
    private static long start;

    private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

    public static void main(String[] args) {
        TimerTask task = new TimerTask() {
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
               throw new RuntimeException();
            }
        };

        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
            }
        };

        Timer timer = new Timer();
        start = System.currentTimeMillis();

        timer.schedule(task,1000);
        timer.schedule(task1,3000);
        

    }

}
複製程式碼

如果第一TimerTask出現未知異常,第二個TimerTask還能執行起來嗎?

結果如下:

明顯第一TimerTask出現未知異常,第二個TimerTask不能執行起來了。這就說明

如果TimerTask丟擲未檢查的異常會終止timer執行緒。這種情況下,Timer也不會重新回覆執行緒的執行了;它錯誤的認為整個Timer都被取消了。此時

已經被安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了。

ScheduledThreadPoolExecutor可以解決此問題,例子如下:

複製程式碼
public class Shedule {
    private static long start;

    private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

    public static void main(String[] args) {
        TimerTask task = new TimerTask() {
            public void run() {throw new RuntimeException();
            }
        };

        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(System.currentTimeMillis()-start);
            }
        };

        Timer timer = new Timer();
        start = System.currentTimeMillis();

        //TimeUnit.MILLISECONDS指定毫秒為單位
        executorService.schedule(task,1000, TimeUnit.MILLISECONDS);
        executorService.schedule(task1,3000, TimeUnit.MILLISECONDS);


    }

}
複製程式碼

執行結果如下:

可以看到第一個執行緒掛了,第二個執行緒並沒有受到影響。這就說明了ScheduledThreadPoolExecutor可以解決了

如果TimerTask丟擲未檢查的異常會終止timer執行緒。這種情況下,Timer也不會重新回覆執行緒的執行了;它錯誤的認為整個Timer都被取消了。此時

    已經被安排但尚未執行的TimerTask永遠不會再執行了,新的任務也不能被排程了的問題

還要注意一點,Timer是和系統時間掛鉤的,如果當前伺服器的時間一改,Timer就不那麼靠譜了。

還要注意的是ThreadPoolExecutor。

原始碼如下圖:

可以看到new一個FixedThreadPool/newSingleThreadExecutor/newCachedThreadPool/newScheduledThreadPool實際上返回的都是是new ThreadPoolExecutor()。

我們再看一下ThreadPoolExecutor原始碼如下:

可以看到ThreadPoolExecutor配置的非常靈活,如果我們用普通的一個FixedThreadPool/newSingleThreadExecutor/newCachedThreadPool/newScheduledThreadPool沒辦法滿足你的需求了,你可以用

ThreadPoolExecutor靈活的指定引數來完成你的需求。這適合精確的任務執行。還不如說我們的任務被拒絕(RejecedExecutionHandler)後,我們可以用ThreadPoolExecutor靈活處理