1. 程式人生 > >java併發學習1---ThreadpoolExecutor

java併發學習1---ThreadpoolExecutor

開發過程中,合理地使用執行緒池可以帶來3個好處:

降低資源消耗:通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。

提高響應速度:當任務到達時,任務可以不需要等到執行緒建立就能立即執行。

提高執行緒的可管理性:執行緒是稀缺資源,如果無限制地建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一分配、調優和監控。

1 執行緒池的建立

ThreadPoolExecutor有以下四個構造方法

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
這裡面需要幾個引數

1 corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有基本執行緒。

2 workQueue(任務佇列) : 用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列:

ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,按FIFO原則進行排序
LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,吞吐量高於ArrayBlockingQueue。靜態工廠方法Excutors.newFixedThreadPool()使用了這個佇列
SynchronousQueue: 一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量高於LinkedBlockingQueue,靜態工廠方法Excutors.newCachedThreadPool()使用了這個佇列
PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
3 maximumPoolSize(執行緒池最大數量):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是,如果使用了無界的任務佇列這個引數就沒用了。
4 threadFactory(執行緒工廠):可以通過執行緒工廠為每個創建出來的執行緒設定更有意義的名字,如開源框架guava
5 RejectedExecutionHandler (飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略還處理新提交的任務。它可以有如下四個選項:
AbortPolicy:直接丟擲異常,預設情況下采用這種策略
CallerRunsPolicy:只用呼叫者所線上程來執行任務
DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務
DiscardPolicy:不處理,丟棄掉
更多的時候,我們應該通過實現RejectedExecutionHandler 介面來自定義策略,比如記錄日誌或持久化儲存等。
6 keepAliveTime(執行緒活動時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高執行緒利用率。

7 TimeUnit(執行緒活動時間的單位):可選的單位有天(Days)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。

2 提交任務

可以使用execute和submit兩個方法向執行緒池提交任務

(1)execute方法用於提交不需要返回值的任務,利用這種方式提交的任務無法得知是否正常執行

threadPoolExecutor.execute(new Runnable() {

            @Override
            public
void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } });

(2) submit方法用於提交一個任務並帶有返回值,這個方法將返回一個Future型別物件。可以通過這個返回物件判斷任務是否執行成功,並且可以通過future.get()方法來獲取返回值,get()方法會阻塞當前執行緒直到任務完成。
Future

static BlockingQueue blockingQueue=new ArrayBlockingQueue<>(10);

    static ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(10, 20, 1, TimeUnit.MINUTES, blockingQueue);

(2)另外構造了一個實現Runable介面的類TaskWithoutResult,其邏輯很簡單,睡眠1秒
/**
* 無返回值的任務

* @author songxu
 *
 */
class TaskWithoutResult implements Runnable
{
    private int sleepTime=1000;//預設睡眠時間1s
    public TaskWithoutResult(int sleepTime)
    {
        this.sleepTime=sleepTime;
    }
    @Override
    public void run() 
    {
        System.out.println("執行緒"+Thread.currentThread()+"開始執行");
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {//捕捉中斷異常

            System.out.println("執行緒"+Thread.currentThread()+"被中斷");
        }
        System.out.println("執行緒"+Thread.currentThread()+"結束執行");
    }



}

(3)驗證

/**
     * 中斷測試
     */
    public static void  test1()
    {
        for(int i=0;i<10;i++)
        {
            Runnable runnable=new TaskWithoutResult(1000);
            threadPoolExecutor.submit(runnable);
        }
        //threadPoolExecutor.shutdown();//不會觸發中斷
        threadPoolExecutor.shutdownNow();//會觸發中斷
    }

分別測試shutdown和shutdownNow()方法,結果shutdown()方法的呼叫並不會引發中斷,而shutdownNow()方法則會引發中斷。這也正驗證前面所說的,shutdown方法只是發出了停止訊號,等所有執行緒執行完畢會關閉執行緒池;而shutdownNow則是立即停止所有任務。

2 示例2 驗證執行緒池的擴容

在本例中想要驗證執行緒池擴容到核心數量,然後再擴容到最大數量,最後再縮小到核心數量的過程。

(1)首先構造一個執行緒池,用ArrayBlockingQueue作為其等待佇列,佇列初始化容量為1。該執行緒池核心容量為 10,最大容量為20,執行緒存活時間為1分鐘。

static BlockingQueue blockingQueue=new ArrayBlockingQueue<>(1);

    static ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(10, 20, 1, TimeUnit.MINUTES, blockingQueue);

(2)另外構造了一個實現Runable介面的類TaskBusyWithoutResult類,其模擬一個繁忙的任務

class TaskBusyWithoutResult implements Runnable
{
    public TaskBusyWithoutResult()
    {
    }
    @Override
    public void run() 
    {
        System.out.println("執行緒"+Thread.currentThread()+"開始執行");
        int i=10000*10000;
        while(i>0)
        {
            i--;
        }
        System.out.println("執行緒"+Thread.currentThread()+"執行結束");
    }



}

(3)測試驗證,向執行緒池提交20個任務
/**
* 擴容測試

     */
    public static void  test2()
    {
        for(int i=0;i<20;i++)
        {
            Runnable runnable=new TaskBusyWithoutResult();
            threadPoolExecutor.submit(runnable);
        }
    }

(4)驗證結果

在VisualVM中觀察執行緒的變化,在任務提交的瞬間,執行緒池完成了預熱到擴容到最大執行緒,之所以這麼迅速是因為本例中的等待佇列長度只有1,可以適當地增加佇列長度,但不併不一定能看到擴大最大容量,其原因將在下一節中講到。線上程池中任務都執行完畢後,可以看到執行緒池回收了多餘的執行緒,但並沒有完全回收,而是保持在核心執行緒數量。從這裡也可以看出,合理地設定核心執行緒的數量可以減少執行緒的頻繁建立和回收,而這才是執行緒池的真正作用。