1. 程式人生 > >java併發——執行緒池(一)執行機制和如何使用

java併發——執行緒池(一)執行機制和如何使用

合理利用執行緒池能夠帶來三個好處。
1、第一:降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
2、第二:提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。
3、第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。但是要做到合理的利用執行緒池,必須對其原理了如指掌。

執行緒池建立

我們可以通過ThreadPoolExecutor來建立一個執行緒池。

    public ThreadPoolExecutor
(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if
(corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this
.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

建立一個執行緒池需要輸入幾個引數:

  • corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。
  • runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。可以選擇以下幾個阻塞佇列。
    1、ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
    2、LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
    3、SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
    4、PriorityBlockingQueue:一個具有優先順序得無限阻塞佇列。

  • maximumPoolSize(執行緒池最大大小):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。

  • ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字,Debug和定位問題時非常又幫助。
  • RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。以下是JDK1.5提供的四種策略。n AbortPolicy:直接丟擲異常。
    1、CallerRunsPolicy:只用呼叫者所線上程來執行任務。
    2、DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
    3、DiscardPolicy:不處理,丟棄掉。
    4、當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。

  • keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。

  • TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

執行緒池的狀態

執行緒池中定義了五種狀態,這些狀態都和執行緒的執行密切相關。

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING: 自然是執行狀態,指可以接受任務執行佇列裡的任務。
  • SHUTDOWN: 指呼叫了 shutdown() 方法,不再接受新任務了,但是佇列裡的任務得執行完畢。
  • STOP: 指呼叫了 shutdownNow() 方法,不再接受新任務,同時拋棄阻塞佇列裡的所有任務並中斷所有正在執行任務。
  • TIDYING: 所有任務都執行完畢,在呼叫 shutdown()/shutdownNow() 中都會嘗試更新為這個狀態。
  • TERMINATED: 終止狀態,當執行 terminated() 後會更新為這個狀態。

用圖表示為:
這裡寫圖片描述

執行緒池的流程分析

這裡寫圖片描述
從上圖我們可以看出,當提交一個新任務到執行緒池時,執行緒池的處理流程如下:

  • 首先執行緒池判斷基本執行緒池是否已滿?沒滿,建立一個工作執行緒來執行任務。滿了,則進入下個流程。
  • 其次執行緒池判斷工作佇列是否已滿?沒滿,則將新提交的任務儲存在工作佇列裡。滿了,則進入下個流程。
  • 最後執行緒池判斷整個執行緒池是否已滿?沒滿,則建立一個新的工作執行緒來執行任務,滿了,則交給飽和策略來處理這個任務。

原始碼分析:上面的流程分析讓我們很直觀的瞭解的執行緒池的工作原理,讓我們再通過原始碼來看看是如何實現的。執行緒池執行任務的方法如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //獲取當前執行緒池的狀態    
        int c = ctl.get();
        //如果執行緒數量小於核心執行緒池
        if (workerCountOf(c) < corePoolSize) {
            //建立新的執行緒並執行
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如執行緒數大於等於基本執行緒數,執行緒池處於執行狀態,則將當前任務放到工作佇列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //雙重檢查,再次獲取執行緒狀態;如果執行緒狀態變了(非執行狀態)就需要從阻塞佇列移除任務,並嘗試判斷執行緒是否全部執行完畢。同時執行拒絕策略。
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果當前執行緒池為空就新建立一個執行緒並執行    
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果執行緒池不處於執行中或任務無法放入佇列,並且當前執行緒數量小於最大允許的執行緒數量,則建立一個執行緒執行任務。如果失敗則執行拒絕策略。
        else if (!addWorker(command, false))
            reject(command);
    }

工作執行緒:執行緒池建立執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務後,還會無限迴圈獲取工作佇列裡的任務來執行。我們可以從Worker的runWorker方法裡看到這點:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

執行緒池相關的類

  • Executor:執行緒池最頂層介面,它只有一個方法execute(Runnable command)。用來執行Runnable型別的介面。
  • ExecutorService:Executor的子介面,聲明瞭管理執行緒池的一些方法,比如關閉執行緒池、檢視執行緒池的狀態。
  • ThreadPoolExecutor:ExecutorService預設的實現類,我們通常建立這個類的例項來實現執行緒池。
  • Executors:執行緒池的工具類,包含了很多靜態方法,可以讓我們建立不同型別的執行緒池。
  • ScheduledExecutorService:解決那些需要任務重複執行的問題。
  • ScheduledThreadPoolExecutor:繼承ThreadPoolExecutor的ScheduledExecutorService介面實現,週期性任務排程的類實現。

執行緒池型別

通過Executors可以建立不同的執行緒池。

  • newFixedThreadPool (int nThreads):固定大小執行緒池

可以看到,corePoolSize和maximumPoolSize的大小是一樣的(實際上,後面會介紹,如果使用無界queue的話maximumPoolSize引數是沒有意義的),keepAliveTime和unit的設值表名什麼?-就是該實現不想keep alive!最後的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • newSingleThreadExecutor():單執行緒的執行緒池,同樣使用無界限的佇列。
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • newCachedThreadPool():無界執行緒池,可以進行自動執行緒回收

首先是無界的執行緒池,所以我們可以發現maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue。這樣每當有新任務時都去建立新的執行緒,而60秒內沒有執行任務的執行緒將被銷燬。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

執行緒池執行和關閉

  • 執行緒池提交任務有兩種方式:
    1、我們可以使用execute提交的任務,但是execute方法沒有返回值,所以無法判斷任務知否被執行緒池執行成功。通過以下程式碼可知execute方法輸入的任務是一個Runnable類的例項。
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

2、我們也可以使用submit 方法來提交任務,它會返回一個future,那麼我們可以通過這個future來判斷任務是否執行成功,通過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後立即返回,這時有可能任務沒有執行完。

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
  • 執行緒池的關閉
    有執行任務自然也有關閉任務,從上文提到的 5 個狀態就能看出如何來關閉執行緒池。其實無非就是兩個方法 shutdown()/shutdownNow()。但他們有著重要的區別:
    • shutdown():執行後停止接受新任務,會把佇列的任務執行完畢。
    • shutdownNow():也是停止接受新任務,但會中斷所有的任務,將執行緒池狀態變為 stop。

關閉執行緒池的例子:

        long start = System.currentTimeMillis();
        for (int i = 0; i <= 5; i++) {
            pool.execute(new Job());
        }

        pool.shutdown();

        while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
            LOGGER.info("執行緒還在執行。。。");
        }
        long end = System.currentTimeMillis();
        LOGGER.info("一共處理了【{}】", (end - start));

執行緒池的狀態監控

其實 ThreadPool 本身已經提供了不少 api 可以獲取執行緒狀態。這樣我們可以通過呼叫這些方法獲取執行緒池狀態。
甚至我們可以繼承執行緒池擴充套件其中的幾個函式來自定義監控邏輯:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }