1. 程式人生 > >java併發包學習系列:執行緒複用之執行緒池

java併發包學習系列:執行緒複用之執行緒池

什麼是執行緒池

頻繁使用new Thread來建立執行緒的方式並不太好。因為每次new Thread新建和銷燬物件效能較差,執行緒缺乏統一管理。好在Java提供了執行緒池,它能夠有效的管理、排程執行緒,避免過多的資源消耗。優點如下:

重用存在的執行緒,減少物件建立、銷燬的開銷。
可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
提供定時執行、定期執行、單執行緒、併發控制等功能。
執行緒池原理簡單的解釋就是會建立多個執行緒並且進行管理,提交給執行緒的任務會被執行緒池指派給其中的執行緒進行執行,通過執行緒池的統一排程、管理執行緒池的統一排程、管理使得多執行緒的使用更簡單高效。

執行緒池負責管理工作執行緒,包含一個等待執行的任務佇列。執行緒池的任務佇列是一個Runnable集合,工作執行緒負責從任務佇列中取出並執行Runnable物件。

jdk對執行緒池的支援

為了更好的控制多執行緒,jdk提供了一套Executor框架,幫助開發人員有效的進行執行緒控制,其本質就是一個執行緒池。

以上成員都在java.util.concurrent包下,是jdk併發包的核心類。其中ThreadPoolExecutor表示一個執行緒池。Executors類則扮演者執行緒池工廠的角色,通過Executors可以取得一個擁有特定功能的執行緒池。Executors主要提供了以下方法:

static ExecutorService  newCachedThreadPool() 
          Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
static ExecutorService  newCachedThreadPool(ThreadFactory threadFactory) 
          Creates a thread pool that creates new
threads as needed, but will reuse previously constructed threads when they are available, and uses the provided ThreadFactory to create new threads when needed. static ExecutorService newFixedThreadPool(int nThreads) Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue, using the provided ThreadFactory to create new threads when needed. static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically. static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically. static ExecutorService newSingleThreadExecutor() Creates an Executor that uses a single worker thread operating off an unbounded queue. static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) Creates an Executor that uses a single worker thread operating off an unbounded queue, and uses the provided ThreadFactory to create a new thread when needed. static ScheduledExecutorService newSingleThreadScheduledExecutor() Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically. static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically.

newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

newFixedThreadPool示例

/**
 * Created by niehongtao on 16/7/12.
 */
public class ThreadPoolDemo {
    public static class MyTask implements Runnable {

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":thread id" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {
        MyTask myTask = new MyTask();
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            exec.submit(myTask);
        }
    }
}

newScheduledThreadPool示例

/**
 * Created by niehongtao on 16/7/12.
 */
public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(System.currentTimeMillis() / 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        pool.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS);
    }
}

ScheduledExecutorService有兩個方法,scheduleAtFixedRate和scheduleAtFixedDelay,注意區分。

ThreadPoolExecutor

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

其實核心的幾個執行緒池,其內部都是使用了ThreadPoolExecutor來實現。即他們只是ThreadPoolExecutor類的封裝。下面我們來看一下ThreadPoolExecutor:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

用給定的初始引數和預設的執行緒工廠及處理程式建立新的 ThreadPoolExecutor。使用 Executors 工廠方法之一比使用此通用構造方法方便得多。

引數:

corePoolSize - 池中所儲存的執行緒數,包括空閒執行緒。

maximumPoolSize - 池中允許的最大執行緒數。

keepAliveTime - 當執行緒數大於核心時,此為終止前多餘的空閒執行緒等待新任務的最長時間。

unit - keepAliveTime 引數的時間單位。

workQueue - 執行前用於保持任務的佇列。此佇列僅保持由 execute 方法提交的 Runnable 任務。

threadFactory - 執行程式建立新執行緒時使用的工廠。

handler - 由於超出執行緒範圍和佇列容量而使執行被阻塞時所使用的處理程式。

執行緒的建立和銷燬

執行緒池的基本大小(corePoolSize)、最大大小(maximumPoolSize)以及存活時間等因素共同負責執行緒的建立與銷燬。

基本大小也是執行緒池的目標大小,即在沒有任務執行時執行緒池的大小,並且只有在工作佇列滿了的情況下才會建立超出這個數量的執行緒。

最大大小表示可同時活動的執行緒數量的上限。

如果某個執行緒的空閒時間超過了存活時間,那麼將被標記為可回收的,並且當執行緒池的當前大小超過基本大小時,這個執行緒將被終止。

管理佇列任務

ThreadPoolExecutor允許提供一個BlockingQueue來儲存等待執行的任務。基本的任務排隊方法有3種:有界佇列, 無界佇列, 同步移交(Synchronous Handoff)。

有界佇列飽和策略

有界佇列被填滿後,飽和策略開始發揮作用。飽和策略可以通過呼叫setRejectedExecutionHandler來修改。jdk提供了幾種不同的RejectedExecutionHandler實現:AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy 。

AbortPolicy:預設的飽和策略。丟擲 RejectedExecutionException。呼叫者可以捕獲這個異常,然後根據需求編寫自己的處理程式碼。

CallerRunsPolicy: 不拋棄任務,不丟擲異常,而將任務退回給呼叫者。

DiscardOldestPolicy:放棄最舊的未處理請求,然後重試 execute;如果執行程式已關閉,則會丟棄該任務。

DiscardPolicy:預設情況下它將放棄被拒絕的任務。

擴充套件ThreadPoolExecutor

ThreadPoolExecutor是可擴充套件的,它提供了幾個可以在子類化中改寫的方法:beforeExecute、afterExcute和terminated。在這些方法中,還可以新增日誌、計時、監視或統計資訊收集的功能。

無論任務是從run中正常返回,還是會丟擲一個異常在而返回,afterExcute都會被呼叫。(如果任務執行完成後帶有Error,那麼就不會呼叫afterExcute)。

如果beforeExecute丟擲一個RuntimeException,那麼任務將不被執行,並且afterExcute也會被呼叫。

線上程池完成關閉操作時呼叫terminated,也就是在所有任務都已經完成並且所有工作者執行緒也已經關閉後,terminated可以釋放Executor在其生命週期裡分配的各種資源,此外還可以執行傳送通知,記錄日誌或者收集finalize統計資訊等操作。