1. 程式人生 > >Java執行緒池使用與原理

Java執行緒池使用與原理

執行緒池是什麼?

我們可以利用java很容易建立一個新執行緒,同時作業系統建立一個執行緒也是一筆不小的開銷。所以基於執行緒的複用,就提出了執行緒池的概念,我們使用執行緒池創建出若干個執行緒,執行完一個任務後,該執行緒會存在一段時間(使用者可以設定空閒執行緒的存活時間,後面會介紹),等到新任務來的時候就直接複用這個空閒執行緒,這樣就省去了建立、銷燬執行緒損耗。當然空閒執行緒也會是一種資源的浪費(所有才有空閒執行緒存活時間的限制),但總比頻繁的建立銷燬執行緒好太多。
下面是我的測試程式碼

    /*
     * @TODO 執行緒池測試
     */
    @Test
    public
void threadPool(){ /*java提供的統計執行緒執行數,一開始設定其值為50000,每一個執行緒任務執行完 * 呼叫CountDownLatch#coutDown()方法(其實就是自減1) * 當所有的執行緒都執行完其值就為0 */ CountDownLatch count = new CountDownLatch(50000); long start = System.currentTimeMillis(); Executor pool = Executors.newFixedThreadPool(10
);//開啟執行緒池最多會建立10個執行緒 for(int i=0;i<50000;i++){ pool.execute(new Runnable() { @Override public void run() { System.out.println("hello"); count.countDown(); } }); } while
(count.getCount()!=0){//堵塞等待5w個執行緒執行完畢 } long end = System.currentTimeMillis(); System.out.println("50個執行緒都執行完了,共用時:"+(end-start)+"ms"); } /** *@TODO 手動建立執行緒測試 */ @Test public void thread(){ CountDownLatch count = new CountDownLatch(50000); long start = System.currentTimeMillis(); for(int i=0;i<50000;i++){ Thread thread = new Thread(new Runnable() { @Override public void run() { System.out.println("hello"); count.countDown(); } }); thread.start(); } while(count.getCount()!=0){//堵塞等待5w個執行緒執行完畢 } long end = System.currentTimeMillis(); System.out.println("50000個執行緒都執行完了,共用時:"+(end-start)+"ms"); }

使用執行緒池5w執行緒執行完大約為400ms,不使用執行緒池執行大約為4350ms左右,其效率可見一斑(讀者可以自行測試,不過由於電腦配置不一樣,跑出來的資料會有差別,但使用執行緒池絕對是比建立執行緒要快的)。

java如何使用執行緒池?

上面的測試程式碼中已經使用了執行緒池,下面正式介紹一下。

java所有的執行緒池最頂層是一個Executor介面,其只有一個execute方法,用於執行所有的任務,java又提供了ExecutorService介面繼承自Executor並且擴充了一下方法,在往下就是AbstractExecutorService這個抽象類,其實現了ExecutorService,最後就是ThreadPoolExecutor其繼承自上面的抽象類,我們常使用的java執行緒池就是建立的這個類的例項。

而上面我們使用Executors是一個工具類,它就是一個語法糖,為我們把各種不同的業務的執行緒池引數進行封裝,進行new操作。

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

上面就是Executors.newFixedThreadPool(10)的原始碼。

下面重點來了,說一說ThreadPoolExecutor構造方法各引數的意思。

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

上面這個構造方法是最全的。

下面我們根據原始碼來解釋部分引數意思,這樣更有說服力。

下面是ThreadPoolExecutor#execute方法,就是我們上面介面呼叫的execute實際執行者。

  public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

ctl是一個AtomicInteger例項,是一個提供了原子語句的CAS操作的類,它用來記錄執行緒池中當前執行的執行緒數量加上-2^29,workCountOf方法就取得其絕對值(可以去看原始碼如何實現),當其小於corePoolSize時,會呼叫addWorker方法(是用來建立一個新Workder,Workder會建立一個Thread,所以就是建立執行緒的方法),addWorkd建立執行緒過程中會跟corePoolSize或者maxnumPoolSize的值比較(當傳入true會根corePoolSize比較,false會根據maxnumPoolSize比較,大於等於其值會建立失敗)。可見如何當前執行中的執行緒數量小於corePoolSize就是建立並且也會建立成功(
只簡單的討論執行緒池Running狀態下)。

如果當執行中執行緒數大於等於corePoolSize時,進入第二個if,isRunning是跟SHUTDOWN(其值=0)比較,之前說過c等於當前執行的執行緒數量加上-2^29,如果當前當前執行的執行緒資料達到2^29時其值就=0,isRunning返回false,else中在執行addWorkd也會返回false(addWorkd也對其進行了檢驗),所以這表示執行緒池最多能支援2^29個執行緒同時執行(足夠用了)。

workQueue.offer(command)就是將runnable加入等待佇列,加入等待佇列後runWorker方法會從佇列中獲取任務執行的。如果當前佇列採用的是有界佇列(ArrayBlockingQueue)當佇列滿了offer就會返回false,這是就進入else if,看!這裡傳入了false,說明這裡要跟maxnumPoolSize比較了,如果這裡執行的執行緒數大於等於maxnumPoolSize,那麼這個執行緒任務就要被執行緒池拒絕了,執行reject(command),拒絕方法中使用了我們ThreadPoolExecutor構造方法中的RejectedExecutionHandler(拒絕策略),後面再詳細解釋。

經過上面的結合原始碼的介紹,下面對們ThreadPoolExecutor的引數介紹就好理解了。

執行緒池中執行緒建立和拒絕策略

corePoolSize,maxnumPoolSize,BlockingQueue這三個要一塊說

當執行緒池執行的執行緒小於corePoolSize時,來一個新執行緒任務總是會新建一個執行緒來執行;當大於corePoolSize就會把任務加入到等待佇列blockingQueue中,如果你傳入的BlockingQueue是一個無界佇列(LinkedBlockingQueue)這是佇列可以存放“無窮多”的任務,所有總是會加入佇列成功,跟maxnumPoolSize就沒關係了,這也表示執行緒池中執行緒數最多為corePoolSize個;但是如果你傳入的是有界佇列(ArrayBlockingQueue,SynchronousQueue),當佇列滿時,並且執行緒數小於maxmunPoolSize就是建立新的執行緒直至執行緒數大於maxnumPoolSize;如果當執行緒數量大於maxnumPoolSize時,在加入任務就會被執行緒池拒絕。

RejectedExecutionHandler拒絕策略java給實現了4個AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy使用者也可以自己實現該介面實現自己的拒絕策略;第一個就是直接丟擲異常,我們可以進行trycatch處理;第二個就是該新任務直接執行;第三個是取消佇列中最老的;第四個是取消當前任務。