1. 程式人生 > >java執行緒池ThreadPoolExecutor類使用詳解

java執行緒池ThreadPoolExecutor類使用詳解

在《阿里巴巴java開發手冊》中指出了執行緒資源必須通過執行緒池提供,不允許在應用中自行顯示的建立執行緒,這樣一方面是執行緒的建立更加規範,可以合理控制開闢執行緒的數量;另一方面執行緒的細節管理交給執行緒池處理,優化了資源的開銷。而執行緒池不允許使用Executors去建立,而要通過ThreadPoolExecutor方式,這一方面是由於jdk中Executor框架雖然提供瞭如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等建立執行緒池的方法,但都有其侷限性,不夠靈活;另外由於前面幾種方法內部也是通過ThreadPoolExecutor方式實現,使用ThreadPoolExecutor有助於大家明確執行緒池的執行規則,建立符合自己的業務場景需要的執行緒池,避免資源耗盡的風險。

下面我們就對ThreadPoolExecutor的使用方法進行一個詳細的概述。

首先看下ThreadPoolExecutor的建構函式

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue
<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime
< 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

建構函式的引數含義如下:

corePoolSize:指定了執行緒池中的執行緒數量,它的數量決定了新增的任務是開闢新的執行緒去執行,還是放到workQueue任務佇列中去;

maximumPoolSize:指定了執行緒池中的最大執行緒數量,這個引數會根據你使用的workQueue任務佇列的型別,決定執行緒池會開闢的最大執行緒數量;

keepAliveTime:當執行緒池中執行緒數量超過corePoolSize時,多餘的執行緒會在多長時間內被銷燬;

unit:keepAliveTime的單位

workQueue:任務佇列,被新增到執行緒池中,但尚未被執行的任務;它一般分為直接提交佇列、有界任務佇列、無界任務佇列、優先任務佇列幾種;

threadFactory:執行緒工廠,用於建立執行緒,一般用預設即可;

handler:拒絕策略;當任務太多來不及處理時,如何拒絕任務;

接下來我們對其中比較重要引數做進一步的瞭解:

一、workQueue任務佇列

上面我們已經介紹過了,它一般分為直接提交佇列、有界任務佇列、無界任務佇列、優先任務佇列;

1、直接提交佇列:設定為SynchronousQueue佇列,SynchronousQueue是一個特殊的BlockingQueue,它沒有容量,沒執行一個插入操作就會阻塞,需要再執行一個刪除操作才會被喚醒,反之每一個刪除操作也都要等待對應的插入操作。

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //maximumPoolSize設定為2 ,拒絕策略為AbortPolic策略,直接丟擲異常
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            readPool.execute(new ThreadTask());
        }   
    }
}

public class ThreadTask implements Runnable{
    
    public ThreadTask() {
        
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

輸出結果為

pool-1-thread-1
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at com.hhxx.test.ThreadPool.main(ThreadPool.java:17)

可以看到,當任務佇列為SynchronousQueue,建立的執行緒數大於maximumPoolSize時,直接執行了拒絕策略丟擲異常。

使用SynchronousQueue佇列,提交的任務不會被儲存,總是會馬上提交執行。如果用於執行任務的執行緒數量小於maximumPoolSize,則嘗試建立新的程序,如果達到maximumPoolSize設定的最大值,則根據你設定的handler執行拒絕策略。因此這種方式你提交的任務不會被快取起來,而是會被馬上執行,在這種情況下,你需要對你程式的併發量有個準確的評估,才能設定合適的maximumPoolSize數量,否則很容易就會執行拒絕策略;

2、有界的任務佇列:有界的任務佇列可以使用ArrayBlockingQueue實現,如下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用ArrayBlockingQueue有界任務佇列,若有新的任務需要執行時,執行緒池會建立新的執行緒,直到建立的執行緒數量達到corePoolSize時,則會將新的任務加入到等待佇列中。若等待佇列已滿,即超過ArrayBlockingQueue初始化的容量,則繼續建立執行緒,直到執行緒數量達到maximumPoolSize設定的最大執行緒數量,若大於maximumPoolSize,則執行拒絕策略。在這種情況下,執行緒數量的上限與有界任務佇列的狀態有直接關係,如果有界佇列初始容量較大或者沒有達到超負荷的狀態,執行緒數將一直維持在corePoolSize以下,反之當任務佇列已滿時,則會以maximumPoolSize為最大執行緒數上限。

3、無界的任務佇列:有界任務佇列可以使用LinkedBlockingQueue實現,如下所示

pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

使用無界任務佇列,執行緒池的任務佇列可以無限制的新增新的任務,而執行緒池建立的最大執行緒數量就是你corePoolSize設定的數量,也就是說在這種情況下maximumPoolSize這個引數是無效的,哪怕你的任務佇列中快取了很多未執行的任務,當執行緒池的執行緒數達到corePoolSize後,就不會再增加了;若後續有新的任務加入,則直接進入佇列等待,當使用這種任務佇列模式時,一定要注意你任務提交與處理之間的協調與控制,不然會出現佇列中的任務由於無法及時處理導致一直增長,直到最後資源耗盡的問題。

4、優先任務佇列:優先任務佇列通過PriorityBlockingQueue實現,下面我們通過一個例子演示下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //優先任務佇列
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
          
        for(int i=0;i<20;i++) {
            pool.execute(new ThreadTask(i));
        }    
    }
}

public class ThreadTask implements Runnable,Comparable<ThreadTask>{
    
    private int priority;
    
    public int getPriority() {
        return priority;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public ThreadTask() {
        
    }
    
    public ThreadTask(int priority) {
        this.priority = priority;
    }

    //當前物件和其他物件做比較,當前優先順序大就返回-1,優先順序小就返回1,值越小優先順序越高
    public int compareTo(ThreadTask o) {
         return  this.priority>o.priority?-1:1;
    }
    
    public void run() {
        try {
            //讓執行緒阻塞,使後續任務進入快取佇列
            Thread.sleep(1000);
            System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

我們來看下執行的結果情況

priority:0,ThreadName:pool-1-thread-1
priority:9,ThreadName:pool-1-thread-1
priority:8,ThreadName:pool-1-thread-1
priority:7,ThreadName:pool-1-thread-1
priority:6,ThreadName:pool-1-thread-1
priority:5,ThreadName:pool-1-thread-1
priority:4,ThreadName:pool-1-thread-1
priority:3,ThreadName:pool-1-thread-1
priority:2,ThreadName:pool-1-thread-1
priority:1,ThreadName:pool-1-thread-1

大家可以看到除了第一個任務直接建立執行緒執行外,其他的任務都被放入了優先任務佇列,按優先順序進行了重新排列執行,且執行緒池的執行緒數一直為corePoolSize,也也就是隻有一個。

通過執行的程式碼我們可以看出PriorityBlockingQueue它其實是一個特殊的無界佇列,它其中無論添加了多少個任務,執行緒池建立的執行緒數也不會超過corePoolSize的數量,只不過其他佇列一般是按照先進先出的規則處理任務,而PriorityBlockingQueue佇列可以自定義規則根據任務的優先順序順序先後執行。

二、拒絕策略

一般我們建立執行緒池時,為防止資源被耗盡,任務佇列都會選擇建立有界任務佇列,但種模式下如果出現任務佇列已滿且執行緒池建立的執行緒數達到你設定的最大執行緒數時,這時就需要你指定ThreadPoolExecutor的RejectedExecutionHandler引數即合理的拒絕策略,來處理執行緒池"超載"的情況。ThreadPoolExecutor自帶的拒絕策略如下:

1、AbortPolicy策略:該策略會直接丟擲異常,阻止系統正常工作;

2、CallerRunsPolicy策略:如果執行緒池的執行緒數量達到上限,該策略會把任務佇列中的任務放在呼叫者執行緒當中執行;

3、DiscardOledestPolicy策略:該策略會丟棄任務佇列中最老的一個任務,也就是當前任務佇列中最先被新增進去的,馬上要被執行的那個任務,並嘗試再次提交;

4、DiscardPolicy策略:該策略會默默丟棄無法處理的任務,不予任何處理。當然使用此策略,業務場景中需允許任務的丟失;

以上內建的策略均實現了RejectedExecutionHandler介面,當然你也可以自己擴充套件RejectedExecutionHandler介面,定義自己的拒絕策略,我們看下示例程式碼:

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定義拒絕策略
        pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                Executors.defaultThreadFactory(), new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(r.toString()+"執行了拒絕策略");
                
            }
        });
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        try {
            //讓執行緒阻塞,使後續任務進入快取佇列
            Thread.sleep(1000);
            System.out.println("ThreadName:"+Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

輸出結果:

[email protected]執行了拒絕策略
[email protected]執行了拒絕策略
[email protected]執行了拒絕策略
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1
ThreadName:pool-1-thread-2
ThreadName:pool-1-thread-1

可以看到由於任務加了休眠阻塞,執行需要花費一定時間,導致會有一定的任務被丟棄,從而執行自定義的拒絕策略;

三、ThreadFactory自定義執行緒建立

 執行緒池中執行緒就是通過ThreadPoolExecutor中的ThreadFactory,執行緒工廠建立的。那麼通過自定義ThreadFactory,可以按需要對執行緒池中建立的執行緒進行一些特殊的設定,如命名、優先順序等,下面程式碼我們通過ThreadFactory對執行緒池中建立的執行緒進行記錄與命名

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args )
    {
        //自定義執行緒工廠
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("執行緒"+r.hashCode()+"建立");
                //執行緒命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy());
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask());
        }    
    }
}

public class ThreadTask implements Runnable{    
    public void run() {
        //輸出執行執行緒的名稱
        System.out.println("ThreadName:"+Thread.currentThread().getName());
    }
}

我們看下輸出結果

執行緒118352462建立
執行緒1550089733建立
執行緒865113938建立
ThreadName:threadPool1550089733
ThreadName:threadPool118352462
執行緒1442407170建立
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool1550089733
ThreadName:threadPool865113938
ThreadName:threadPool865113938
ThreadName:threadPool118352462
ThreadName:threadPool1550089733
ThreadName:threadPool1442407170

可以看到執行緒池中,每個執行緒的建立我們都進行了記錄輸出與命名。

四、ThreadPoolExecutor擴充套件

ThreadPoolExecutor擴充套件主要是圍繞beforeExecute()、afterExecute()和terminated()三個介面實現的,

1、beforeExecute:執行緒池中任務執行前執行

2、afterExecute:執行緒池中任務執行完畢後執行

3、terminated:執行緒池退出後執行

通過這三個介面我們可以監控每個任務的開始和結束時間,或者其他一些功能。下面我們可以通過程式碼實現一下

public class ThreadPool {
    private static ExecutorService pool;
    public static void main( String[] args ) throws InterruptedException
    {
        //實現自定義介面
        pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),
                new ThreadFactory() {
            public Thread newThread(Runnable r) {
                System.out.println("執行緒"+r.hashCode()+"建立");
                //執行緒命名
                Thread th = new Thread(r,"threadPool"+r.hashCode());
                return th;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy()) {
    
            protected void beforeExecute(Thread t,Runnable r) {
                System.out.println("準備執行:"+ ((ThreadTask)r).getTaskName());
            }
            
            protected void afterExecute(Runnable r,Throwable t) {
                System.out.println("執行完畢:"+((ThreadTask)r).getTaskName());
            }
            
            protected void terminated() {
                System.out.println("執行緒池退出");
            }
        };
          
        for(int i=0;i<10;i++) {
            pool.execute(new ThreadTask("Task"+i));
        }    
        pool.shutdown();
    }
}

public class ThreadTask implements Runnable{    
    private String taskName;
    public String getTaskName() {
        return taskName;
    }
    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }
    public ThreadTask(String name) {
        this.setTaskName(name);
    }
    public void run() {
        //輸出執行執行緒的名稱
        System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName());
    }
}

我看下輸出結果

執行緒118352462建立
執行緒1550089733建立
準備執行:Task0
準備執行:Task1
TaskNameTask0---ThreadName:threadPool118352462
執行緒865113938建立
執行完畢:Task0
TaskNameTask1---ThreadName:threadPool1550089733
執行完畢:Task1
準備執行:Task3
TaskNameTask3---ThreadName:threadPool1550089733
執行完畢:Task3
準備執行:Task2
準備執行:Task4
TaskNameTask4---ThreadName:threadPool1550089733
執行完畢:Task4
準備執行:Task5
TaskNameTask5---ThreadName:threadPool1550089733
執行完畢:Task5
準備執行:Task6
TaskNameTask6---ThreadName:threadPool1550089733
執行完畢:Task6
準備執行:Task8
TaskNameTask8---ThreadName:threadPool1550089733
執行完畢:Task8
準備執行:Task9
TaskNameTask9---ThreadName:threadPool1550089733
準備執行:Task7
執行完畢:Task9
TaskNameTask2---ThreadName:threadPool118352462
TaskNameTask7---ThreadName:threadPool865113938
執行完畢:Task7
執行完畢:Task2
執行緒池退出

可以看到通過對beforeExecute()、afterExecute()和terminated()的實現,我們對執行緒池中執行緒的執行狀態進行了監控,在其執行前後輸出了相關列印資訊。另外使用shutdown方法可以比較安全的關閉執行緒池, 當執行緒池呼叫該方法後,執行緒池中不再接受後續新增的任務。但是,此時執行緒池不會立刻退出,直到新增到執行緒池中的任務都已經處理完成,才會退出。

五、執行緒池執行緒數量

執行緒吃執行緒數量的設定沒有一個明確的指標,根據實際情況,只要不是設定的偏大和偏小都問題不大,結合下面這個公式即可

            /**
             * Nthreads=CPU數量
             * Ucpu=目標CPU的使用率,0<=Ucpu<=1
             * W/C=任務等待時間與任務計算時間的比率
             */
            Nthreads = Ncpu*Ucpu*(1+W/C)

以上就是對ThreadPoolExecutor類從建構函式、拒絕策略、自定義執行緒建立等方面介紹了其詳細的使用方法,從而我們可以根據自己的需要,靈活配置和使用執行緒池建立執行緒,其中如有不足與不正確的地方還望指出與海涵。