1. 程式人生 > >Java多執行緒系列--“JUC執行緒池”01之 執行緒池架構

Java多執行緒系列--“JUC執行緒池”01之 執行緒池架構

概要

前面分別介紹了”Java多執行緒基礎”、”JUC原子類”和”JUC鎖”。本章介紹JUC的最後一部分的內容——執行緒池。內容包括:

  • 執行緒池架構圖
  • 執行緒池示例

執行緒池架構圖

執行緒池的架構圖如下: 這裡寫圖片描述

1、Executor

它是”執行者”介面,它是來執行任務的。準確的說,Executor提供了execute()介面來執行已提交的 Runnable 任務的物件。Executor存在的目的是提供一種將”任務提交”與”任務如何執行”分離開來的機制。

它只包含一個函式介面:

void execute(Runnable command)
2、ExecutorService

ExecutorService繼承於Executor。它是”執行者服務”介面,它是為”執行者介面Executor”服務而存在的;準確的話,ExecutorService提供了”將任務提交給執行者的介面(submit方法)”,”讓執行者執行任務(invokeAll, invokeAny方法)”的介面等等。

ExecutorService的函式列表

// 請求關閉、發生超時或者當前執行緒中斷,無論哪一個首先發生之後,都將導致阻塞,直到所有任務完成執行。
boolean awaitTermination(long timeout, TimeUnit unit)
// 執行給定的任務,當所有任務完成時,返回保持任務狀態和結果的 Future 列表。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) // 執行給定的任務,當所有任務完成或超時期滿時(無論哪個首先發生),返回保持任務狀態和結果的 Future 列表。 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) // 執行給定的任務,如果某個任務已成功完成(也就是未丟擲異常),則返回其結果。
<T> T invokeAny(Collection<? extends Callable<T>> tasks) // 執行給定的任務,如果在給定的超時期滿前某個任務已成功完成(也就是未丟擲異常),則返回其結果。 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) // 如果此執行程式已關閉,則返回 true。 boolean isShutdown() // 如果關閉後所有任務都已完成,則返回 true。 boolean isTerminated() // 啟動一次順序關閉,執行以前提交的任務,但不接受新任務。 void shutdown() // 試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。 List<Runnable> shutdownNow() // 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。 <T> Future<T> submit(Callable<T> task) // 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。 Future<?> submit(Runnable task) // 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。 <T> Future<T> submit(Runnable task, T result)
3、AbstractExecutorService

AbstractExecutorService是一個抽象類,它實現了ExecutorService介面。AbstractExecutorService存在的目的是為ExecutorService中的函式介面提供了預設實現。

AbstractExecutorService函式列表

由於它的函式列表和ExecutorService一樣,這裡就不再重複列舉了。

4、ThreadPoolExecutor

ThreadPoolExecutor就是大名鼎鼎的”執行緒池”。它繼承於AbstractExecutorService抽象類。

ThreadPoolExecutor函式列表

// 用給定的初始引數和預設的執行緒工廠及被拒絕的執行處理程式建立新的 ThreadPoolExecutor。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
// 用給定的初始引數和預設的執行緒工廠建立新的 ThreadPoolExecutor。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
// 用給定的初始引數和預設被拒絕的執行處理程式建立新的 ThreadPoolExecutor。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
// 用給定的初始引數建立新的 ThreadPoolExecutor。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

// 基於完成執行給定 Runnable 所呼叫的方法。
protected void afterExecute(Runnable r, Throwable t)
// 如果在保持活動時間內沒有任務到達,新任務到達時正在替換(如果需要),則設定控制核心執行緒是超時還是終止的策略。
void allowCoreThreadTimeOut(boolean value)
// 如果此池允許核心執行緒超時和終止,如果在 keepAlive 時間內沒有任務到達,新任務到達時正在替換(如果需要),則返回 trueboolean allowsCoreThreadTimeOut()
// 請求關閉、發生超時或者當前執行緒中斷,無論哪一個首先發生之後,都將導致阻塞,直到所有任務完成執行。
boolean awaitTermination(long timeout, TimeUnit unit)
// 在執行給定執行緒中的給定 Runnable 之前呼叫的方法。
protected void beforeExecute(Thread t, Runnable r)
// 在將來某個時間執行給定任務。
void execute(Runnable command)
// 當不再引用此執行程式時,呼叫 shutdown。
protected void finalize()
// 返回主動執行任務的近似執行緒數。
int getActiveCount()
// 返回已完成執行的近似任務總數。
long getCompletedTaskCount()
// 返回核心執行緒數。
int getCorePoolSize()
// 返回執行緒保持活動的時間,該時間就是超過核心池大小的執行緒可以在終止前保持空閒的時間值。
long getKeepAliveTime(TimeUnit unit)
// 返回曾經同時位於池中的最大執行緒數。
int getLargestPoolSize()
// 返回允許的最大執行緒數。
int getMaximumPoolSize()
// 返回池中的當前執行緒數。
int getPoolSize()
// 返回此執行程式使用的任務佇列。
BlockingQueue<Runnable> getQueue()
// 返回用於未執行任務的當前處理程式。
RejectedExecutionHandler getRejectedExecutionHandler()
// 返回曾計劃執行的近似任務總數。
long getTaskCount()
// 返回用於建立新執行緒的執行緒工廠。
ThreadFactory getThreadFactory()
// 如果此執行程式已關閉,則返回 trueboolean isShutdown()
// 如果關閉後所有任務都已完成,則返回 trueboolean isTerminated()
// 如果此執行程式處於在 shutdown 或 shutdownNow 之後正在終止但尚未完全終止的過程中,則返回 trueboolean isTerminating()
// 啟動所有核心執行緒,使其處於等待工作的空閒狀態。
int prestartAllCoreThreads()
// 啟動核心執行緒,使其處於等待工作的空閒狀態。
boolean prestartCoreThread()
// 嘗試從工作佇列移除所有已取消的 Future 任務。
void purge()
// 從執行程式的內部佇列中移除此任務(如果存在),從而如果尚未開始,則其不再執行。
boolean remove(Runnable task)
// 設定核心執行緒數。
void setCorePoolSize(int corePoolSize)
// 設定執行緒在終止前可以保持空閒的時間限制。
void setKeepAliveTime(long time, TimeUnit unit)
// 設定允許的最大執行緒數。
void setMaximumPoolSize(int maximumPoolSize)
// 設定用於未執行任務的新處理程式。
void setRejectedExecutionHandler(RejectedExecutionHandler handler)
// 設定用於建立新執行緒的執行緒工廠。
void setThreadFactory(ThreadFactory threadFactory)
// 按過去執行已提交任務的順序發起一個有序的關閉,但是不接受新任務。
void shutdown()
// 嘗試停止所有的活動執行任務、暫停等待任務的處理,並返回等待執行的任務列表。
List<Runnable> shutdownNow()
// 當 Executor 已經終止時呼叫的方法。
protected void terminated()
5、ScheduledExecutorService

ScheduledExecutorService是一個介面,它繼承于于ExecutorService。它相當於提供了”延時”和”週期執行”功能的ExecutorService。

ScheduledExecutorService提供了相應的函式介面,可以安排任務在給定的延遲後執行,也可以讓任務週期的執行。

ScheduledExecutorService函式列表

// 建立並執行在給定延遲後啟用的 ScheduledFuture。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
// 建立並執行在給定延遲後啟用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 建立並執行一個在給定初始延遲後首次啟用的定期操作,後續操作具有給定的週期;也就是將在 initialDelay 後開始執行,然後在 initialDelay+period 後執行,接著在 initialDelay + 2 * period 後執行,依此類推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 建立並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
6、ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor繼承於ThreadPoolExecutor,並且實現了ScheduledExecutorService介面。它相當於提供了”延時”和”週期執行”功能的ScheduledExecutorService。

ScheduledThreadPoolExecutor類似於Timer,但是在高併發程式中,ScheduledThreadPoolExecutor的效能要優於Timer。

ScheduledThreadPoolExecutor函式列表

// 使用給定核心池大小建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 使用給定初始引數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
// 使用給定的初始引數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
// 使用給定初始引數建立一個新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)

// 修改或替換用於執行 callable 的任務。
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task)
// 修改或替換用於執行 runnable 的任務。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task)
// 使用所要求的零延遲執行命令。
void execute(Runnable command)
// 獲取有關在此執行程式已 shutdown 的情況下、是否繼續執行現有定期任務的策略。
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy()
// 獲取有關在此執行程式已 shutdown 的情況下是否繼續執行現有延遲任務的策略。
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy()
// 返回此執行程式使用的任務佇列。
BlockingQueue<Runnable> getQueue()
// 從執行程式的內部佇列中移除此任務(如果存在),從而如果尚未開始,則其不再執行。
boolean remove(Runnable task)
// 建立並執行在給定延遲後啟用的 ScheduledFuture。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
// 建立並執行在給定延遲後啟用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 建立並執行一個在給定初始延遲後首次啟用的定期操作,後續操作具有給定的週期;也就是將在 initialDelay 後開始執行,然後在 initialDelay+period 後執行,接著在 initialDelay + 2 * period 後執行,依此類推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 建立並執行一個在給定初始延遲後首次啟用的定期操作,隨後,在每一次執行終止和下一次執行開始之間都存在給定的延遲。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
// 設定有關在此執行程式已 shutdown 的情況下是否繼續執行現有定期任務的策略。
void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
// 設定有關在此執行程式已 shutdown 的情況下是否繼續執行現有延遲任務的策略。
void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)
// 在以前已提交任務的執行中發起一個有序的關閉,但是不接受新任務。
void shutdown()
// 嘗試停止所有正在執行的任務、暫停等待任務的處理,並返回等待執行的任務列表。
List<Runnable> shutdownNow()
// 提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。
<T> Future<T> submit(Callable<T> task)
// 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。
Future<?> submit(Runnable task)
// 提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。
<T> Future<T> submit(Runnable task, T result)
7、Executors

Executors是個靜態工廠類。它通過靜態工廠方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等類的物件。

Executors函式列表

// 返回 Callable 物件,呼叫它時可執行給定特權的操作並返回其結果。
static Callable<Object> callable(PrivilegedAction<?> action)
// 返回 Callable 物件,呼叫它時可執行給定特權的異常操作並返回其結果。
static Callable<Object> callable(PrivilegedExceptionAction<?> action)
// 返回 Callable 物件,呼叫它時可執行給定的任務並返回 null。
static Callable<Object> callable(Runnable task)
// 返回 Callable 物件,呼叫它時可執行給定的任務並返回給定的結果。
static <T> Callable<T> callable(Runnable task, T result)
// 返回用於建立新執行緒的預設執行緒工廠。
static ThreadFactory defaultThreadFactory()
// 建立一個可根據需要建立新執行緒的執行緒池,但是在以前構造的執行緒可用時將重用它們。
static ExecutorService newCachedThreadPool()
// 建立一個可根據需要建立新執行緒的執行緒池,但是在以前構造的執行緒可用時將重用它們,並在需要時使用提供的 ThreadFactory 建立新執行緒。
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
// 建立一個可重用固定執行緒數的執行緒池,以共享的無界佇列方式來執行這些執行緒。
static ExecutorService newFixedThreadPool(int nThreads)
// 建立一個可重用固定執行緒數的執行緒池,以共享的無界佇列方式來執行這些執行緒,在需要時使用提供的 ThreadFactory 建立新執行緒。
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
// 建立一個執行緒池,它可安排在給定延遲後執行命令或者定期地執行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
// 建立一個執行緒池,它可安排在給定延遲後執行命令或者定期地執行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
// 建立一個使用單個 worker 執行緒的 Executor,以無界佇列方式來執行該執行緒。
static ExecutorService newSingleThreadExecutor()
// 建立一個使用單個 worker 執行緒的 Executor,以無界佇列方式來執行該執行緒,並在需要時使用提供的 ThreadFactory 建立新執行緒。
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
// 建立一個單執行緒執行程式,它可安排在給定延遲後執行命令或者定期地執行。
static ScheduledExecutorService newSingleThreadScheduledExecutor()
// 建立一個單執行緒執行程式,它可安排在給定延遲後執行命令或者定期地執行。
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
// 返回 Callable 物件,呼叫它時可在當前的訪問控制上下文中執行給定的 callable 物件。
static <T> Callable<T> privilegedCallable(Callable<T> callable)
// 返回 Callable 物件,呼叫它時可在當前的訪問控制上下文中,使用當前上下文類載入器作為上下文類載入器來執行給定的 callable 物件。
static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable)
// 返回用於建立新執行緒的執行緒工廠,這些新執行緒與當前執行緒具有相同的許可權。
static ThreadFactory privilegedThreadFactory()
// 返回一個將所有已定義的 ExecutorService 方法委託給指定執行程式的物件,但是使用強制轉換可能無法訪問其他方法。
static ExecutorService unconfigurableExecutorService(ExecutorService executor)
// 返回一個將所有已定義的 ExecutorService 方法委託給指定執行程式的物件,但是使用強制轉換可能無法訪問其他方法。
static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor)

執行緒池示例

下面通過示例來對執行緒池的使用做簡單演示。

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class ThreadPoolDemo1 {

    public static void main(String[] args) {
        // 建立一個可重用固定執行緒數的執行緒池
        ExecutorService pool = Executors.newFixedThreadPool(2);
        // 建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面
        Thread ta = new MyThread();
        Thread tb = new MyThread();
        Thread tc = new MyThread();
        Thread td = new MyThread();
        Thread te = new MyThread();
        // 將執行緒放入池中進行執行
        pool.execute(ta);
        pool.execute(tb);
        pool.execute(tc);
        pool.execute(td);
        pool.execute(te);
        // 關閉執行緒池
        pool.shutdown();
    }
}

class MyThread extends Thread {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+ " is running.");
    }
}

執行結果:

pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.
pool-1-thread-2 is running.
pool-1-thread-1 is running.

結果說明:主執行緒中建立了執行緒池pool,執行緒池的容量是2。即,執行緒池中最多能同時執行2個執行緒。緊接著,將ta,tb,tc,td,te這3個執行緒新增到執行緒池中執行。最後,通過shutdown()關閉執行緒池。