Android小知識-定時任務ScheduledThreadPoolExecutor
本平臺的文章更新會有延遲,大家可以關注微信公眾號-顧林海,包括年底前會更新kotlin由淺入深系列教程,目前計劃在微信公眾號進行首發,如果大家想獲取最新教程,請關注微信公眾號,謝謝!
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,而ThreadPoolExecutor是執行緒池的核心實現類,用來執行被提交的任務,ScheduledThreadPoolExecutor是一個實現定時任務的類,可以在給定的延遲後執行命令,或者定期執行命令。
ScheduledThreadPoolExecutor定義了四個建構函式,這四個建構函式如下:
/** * @param corePoolSize 核心執行緒池的大小 */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } /** * @param corePoolSize核心執行緒池的大小 * @param threadFactory 用於設定建立執行緒的工廠 */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory); } /** * @param corePoolSize 核心執行緒池的大小 * @param handler飽和策略 */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), handler); } /** * @param corePoolSize核心執行緒池的大小 * @param threadFactory 用於設定建立執行緒的工廠 * @param handler飽和策略 */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler); }
通過原始碼可以發現,ScheduledThreadPoolExecutor的構造器都是呼叫父類的構造器也就是ThreadPoolExecutor的構造器,以此來建立一個執行緒池。
ThreadPoolExecutor的構造器如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
建立一個執行緒池時需要輸入幾個引數,如下:
-
corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其它空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立,會把到達的任務放到快取隊列當中。如果呼叫了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有基本執行緒,或呼叫執行緒池的prestartCoreThread()方法,執行緒池會提前建立一個執行緒。
-
maximumPoolSize(執行緒池最大數量):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是,如果使用了無界的任務佇列這個引數就沒什麼效果。
-
KeepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存貨的時間。如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提供執行緒的利用率。
-
unit(執行緒活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微妙(MICROSECONDS)、千分之一毫秒和納秒(NANOSECONDS、千分之一微妙)。
-
workQueue(任務佇列):用於保持等待執行的任務的阻塞佇列,可以選擇以下幾個阻塞佇列。
(1)ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按FIFO(先進先出)原則對元素進行排序。
(2)LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。
(3)SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue。
(4)PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
-
ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。
-
RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。在JDK1.5中Java執行緒池框架提供了4種策略(也可通過實現RejectedExecutionHandler介面自定義策略)。
(1)AbortPolicy:直接丟擲異常。
(2)CallerRunsPolicy:只用呼叫者所線上程來執行任務。
(3)DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
(4)DiscardPolicy:處理,丟棄掉。
在ScheduledThreadPoolExecutor構造器中使用了工作佇列java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,DelayedWorkQueue是一個無界的BlockingQueue,
用於放置實現了Delayed介面的物件,其中的物件只能在其到期才能從佇列中取走。
由於ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,因此它也實現了ThreadPoolExecutor的方法,如下:
public void execute(Runnable command) { ... } public Future<?> submit(Runnable task) { ... } public <T> Future<T> submit(Runnable task, T result) { ... } public <T> Future<T> submit(Callable<T> task) { ... }
同時它也有自己的定時執行任務的方法:
/** * 延遲delay時間後開始執行task,無法獲取task的執行結果。 */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { ... } /** * 延遲delay時間後開始執行callable,它接收的是一個Callable例項, * 此方法會返回一個ScheduleFuture物件,通過ScheduleFuture我們 * 可以取消一個未執行的task,也可以獲得這個task的執行結果。 */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { ... } /** * 延遲initialDelay時間後開始執行command,並且按照period時間週期性 * 重複呼叫,當任務執行時間大於間隔時間時,之後的任務都會延遲,此時與 * Timer中的schedule方法類似。 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { ... } /** *延遲initialDelay時間後開始執行command,並且按照period時間週期性重複 *呼叫,這裡的間隔時間delay是等上一個任務完全執行完畢才開始計算。 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { ... }
ScheduledThreadPoolExecutor把待排程的任務放到一個DelayedWorkQueue ,並且DelayedWorkQueue 是一個無界佇列,ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中沒有什麼意義。整個ScheduledThreadPoolExecutor的執行可以分為兩大部分。
1、當呼叫ScheduledThreadPoolExecutor的上面4個方法時,會向ScheduledThreadPoolExecutor的DelayedWorkQueue 新增一個實現了RunnableScheduleFuture介面的ScheduledFutureTask,如下ScheduledThreadPoolExecutor其中的一個schedule方法。
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit), sequencer.getAndIncrement())); delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task);//向ScheduledThreadPoolExecutor的DelayedWorkQueue新增一個實現了RunnableScheduleFuture介面的ScheduledFutureTask if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
2、執行緒池中的執行緒從DelayedWorkQueue 中獲取ScheduledFutureTask,然後執行。
ScheduledFutureTask是ScheduledThreadPoolExecutor的內部類並繼承自FutureTask,包含3個成員變數。
//ong型成員變數sequenceNumber,表示這個任務被新增到 //ScheduledThreadPoolExecutor中的序號。 private final long sequenceNumber; //long型成員變數time,表示這個任務將要被執行的具體時間。 private volatile long time; //long型成員變數period,表示任務執行的間隔週期。 private final long period; ScheduledFutureTask內部實現了compareTo()方法,用於對task的排序 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
排序時,time小的排在前面,如果兩個ScheduledFutureTask的time相同,就比較sequenceNumber,sequenceNumber小的排在前面。
DelayedWorkQueue 內部使用了二叉堆演算法,DelayedWorkQueue 中的元素第一個元素永遠是 延遲時間最小的那個元素。當執行 schedule 方法是。如果不是重複的任務,那任務從 DelayedWorkQueue 取出之後執行完了就結束了。如果是重複的任務,那在執行結束前會重置執行時間並將自己重新加入到 DelayedWorkQueue 中
總結來說,ScheduledThreadPoolExecutor是一個實現ScheduledExecutorService的可以排程任務的執行框架;DelayedWorkQueue是一個數組實現的阻塞佇列,根據任務所提供的時間引數來調整位置,實際上就是個小根堆(優先佇列);ScheduledFutureTask包含任務單元,存有時間、週期、外部任務、堆下標等排程過程中必須用到的引數,被工作執行緒執行。ScheduledThreadPoolExecutor與Timer都是用作定時任務,它們直接的差異是Timer使用的是絕對時間,系統時間的改變會對Timer產生一定的影響;而ScheduledThreadPoolExecutor使用的是相對時間,不會導致這個問題。Timer使用的是單執行緒來處理任務,長時間執行的任務會導致其他任務的延遲處理;而ScheduledThreadPoolExecutor可以自定義執行緒數量。並且Timer沒有對執行時異常進行處理,一旦某個任務觸發執行時異常,會導致整個Timer崩潰;而ScheduledThreadPoolExecutor對執行時異常做了捕獲(通過afterExecute()回撥方法中進行處理),所以更安全。

838794-506ddad529df4cd4.webp.jpg