【Java併發學習四】如何實現一個定時執行緒池
所謂 “定時任務執行緒池” 就是指放入執行緒池的任務,可以按照指定的等待週期迴圈執行。
Java裡面ScheduledThreadPoolExecutor
這個類實現了這種功能。Spring裡面的定時任務也是在ScheduledThreadPoolExecutor
的基礎上擴充套件而來。
如何實現
如圖,放入執行緒池的任務,線上程數超過corePoolSize
的情況下會放入佇列,而執行緒池內的執行緒則不斷從佇列中讀取任務消費。
如果我們想要一個放入的任務每隔一段時間(如一小時)定時執行,似乎挺簡單:
1. 消費完的任務,需要再放進佇列中被消費,
2. 執行緒池中執行緒取任務的時間不能是馬上,得等待一小時後才消費。
第一點不難,關鍵是第二點如何實現。思考下,應該不能從執行緒下手,因為每個任務定時時間是不同的,執行緒消費時是不好控制的。那就只有從佇列下手了。
我們將放入的任務增加一個delay
延遲欄位,然後使它被取出時,等待delay
這麼長就行。
OK,看到這裡,你已經把定時任務執行緒池的原理理解的差不多啦~ 接下來我們看具體實現細節。
實現細節
1. 延遲佇列的實現
最難的地方也就是 延遲佇列 的實現。我們借鑑下已有的實現,在Java裡面查找了下已有佇列,果然發現一個DelayQueue
的類。研究一番後發現,延遲佇列需要用到一種 叫做 “堆” 的資料結構。
堆
堆其實就是一個完全二叉樹,延遲佇列中用的是 最小堆(父結點<=子結點)。一般用陣列來儲存,i
(i – 1) / 2
。它的左右子結點下標分別為2i + 1
和2i + 2
。 - 在堆陣列尾部新增元素時,需要不停將該元素與其父結點進行對比交換,類似於元素在“上升”。也就是使新新增的元素插入一個有序的序列中,形成一個新的有序堆序列
- 堆刪除元素時,總是先刪除根結點,然後將最後一個元素移到根結點,與子結點對比交換,類似於元素在“下沉”。最終形成新的有序堆序列。
關於堆的更多細節可以自行百度谷歌,或者檢視我之前排序演算法總結裡面的堆排序: 資料結構基礎(六)排序
自定義延遲佇列
為什麼延遲佇列用堆實現呢?
因為堆中元素是有順序的,延遲佇列中排序是以 任務的等待週期
/**
* 定義任務的compareTo方法,用於堆插入或者刪除時的堆排序
*/
@Override
public int compareTo(Delayed o) {
MyScheduledFutureTask x = (MyScheduledFutureTask)o;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else
return 1;
}
那延遲佇列的延遲如何實現呢?
延遲佇列用Condition.awaitNanos(delay)
條件變數來實現了執行緒的等待。我們看下延遲佇列的成員變數,queue
就是一個堆結構的任務陣列;lock
鎖保證的佇列新增和刪除時的執行緒安全;lock
、Condition
和leader
(領導執行緒) 一起,控制執行緒的等待和喚醒。具體細節看下面。
/**
* 自定義延遲佇列
*/
public class MyDelayQueue extends AbstractQueue implements BlockingQueue{
/** 堆資料結構 */
private MyScheduledFutureTask[] queue = new MyScheduledFutureTask[16];
/** 佇列元素個數 */
private int size = 0;
/** 鎖,用於佇列新增和刪除時保持執行緒安全 */
private final transient ReentrantLock lock = new ReentrantLock();
/** 用於實現佇列延遲取出元素 */
private final Condition available = lock.newCondition();
/**
* 領導執行緒,可理解為正在獲取節點的執行緒
* 和鎖、Condition一起,
* 控制佇列延遲獲取節點時,執行緒的等待和喚醒
*/
private Thread leader = null;
………………
}
延遲佇列中新增任務是怎樣的?
延遲佇列中新增任務很簡單,直接往堆尾部增加節點,然後執行 “上升”操作來重排序即可
public boolean add(Object o) {
lock.lock();
try {
//佇列空間不足時,擴充套件
if(size >= queue.length-1){
queue = Arrays.copyOf(queue, queue.length*2);
}
MyScheduledFutureTask task = (MyScheduledFutureTask) o;
//queue沒有任務時,直接往陣列第一個放入任務
if(size == 0){
queue[size++] = task;
}
//queue已經有任務時,在堆尾部增加任務,並實行堆上浮操作
else {
size++;
siftUp(size-1, task);
}
} finally {
lock.unlock();
}
return true;
}
重點!!!:延遲佇列中取出任務是怎樣的?
延遲佇列中獲取並刪除任務比較複雜,因為執行緒池中多個執行緒同時在從延遲佇列中取任務,所以需要用lock
、Condition
和leader
(領導執行緒) 一起,控制當一個執行緒在取任務,其餘執行緒阻塞,等到該任務獲取完畢,再喚醒其餘執行緒。
重點是leader
的理解,leader
可理解為正在獲取節點的執行緒。當leader
為空時,證明沒有執行緒在從佇列中獲取節點,該執行緒可自己成為leader
獲取任務節點;當leader
不為空時,證明有執行緒正在獲取節點,此時的leader
在堵塞倒計時中(awaitNanos(delay)
),故該執行緒需要阻塞;當執行緒取元素結束時,都需要喚醒Condition
等待上的任一執行緒。
public MyScheduledFutureTask take() throws InterruptedException {
lock.lockInterruptibly();
try {
for (;;) {
//取出堆中的頭節點
MyScheduledFutureTask first = queue[0];
//如果堆中沒有節點,則掛起執行緒
if (first == null)
available.await();
else {
//獲取節點任務的等待時間
long delay = first.getDelay(NANOSECONDS);
//如果已經不需要等待,直接返回節點任務,並將堆中尾節點視為頭節點進行堆下沉排序
if (delay <= 0)
return finishPoll(first);
//為下面程式碼執行緒等待時不再持有無用的first物件,直接釋放它
first = null;
//leader不為空,則某個awaitNanos執行緒已經在取任務,掛起執行緒
if (leader != null)
available.await();
//leader為空,此時沒有執行緒在取任務
else {
//設定leader為當前執行緒
Thread thisThread = Thread.currentThread();
leader = thisThread;
//呼叫awaitNanos方法等待固定時間後,等待其他執行緒的喚醒
try {
available.awaitNanos(delay);
} finally {
//leader執行緒被喚醒,下個迴圈將返回節點,此時將leader設定為null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//佇列非空時,unlock前 隨機喚醒等待條件上的任一佇列
if (queue[0] != null)
available.signal();
lock.unlock();
}
}
/**
* 刪除f節點,將堆中尾節點設定為頭節點,然後進行下沉排序
*/
private MyScheduledFutureTask finishPoll(MyScheduledFutureTask f) {
int s = --size;
MyScheduledFutureTask x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return f;
}
最後貼出完整的自定義延遲佇列程式碼:
/**
* 自定義延遲佇列
*/
public class MyDelayQueue extends AbstractQueue implements BlockingQueue{
/** 堆資料結構 */
private MyScheduledFutureTask[] queue = new MyScheduledFutureTask[16];
/** 佇列元素個數 */
private int size = 0;
/** 鎖,用於佇列新增和刪除時保持執行緒安全 */
private final transient ReentrantLock lock = new ReentrantLock();
/** 用於實現佇列延遲取出元素 */
private final Condition available = lock.newCondition();
/**
* 領導執行緒,可理解為正在獲取節點的執行緒
* 和鎖、Condition一起,
* 控制佇列延遲獲取節點時,執行緒的等待和喚醒
*/
private Thread leader = null;
@Override
public int size() {
return size;
}
@Override
public boolean add(Object o) {
lock.lock();
try {
//佇列空間不足時,擴充套件
if(size >= queue.length-1){
queue = Arrays.copyOf(queue, queue.length*2);
}
MyScheduledFutureTask task = (MyScheduledFutureTask) o;
//queue沒有任務時,直接往陣列第一個放入任務
if(size == 0){
queue[size++] = task;
}
//queue已經有任務時,在堆尾部增加任務,並實行堆上浮操作
else {
size++;
siftUp(size-1, task);
}
} finally {
lock.unlock();
}
return true;
}
@Override
public MyScheduledFutureTask take() throws InterruptedException {
lock.lockInterruptibly();
try {
for (;;) {
//取出堆中的頭節點
MyScheduledFutureTask first = queue[0];
//如果堆中沒有節點,則掛起執行緒
if (first == null)
available.await();
else {
//獲取節點任務的等待時間
long delay = first.getDelay(NANOSECONDS);
//如果已經不需要等待,直接返回節點任務,並將下一個節點視為頭節點進行堆排序
if (delay <= 0)
return finishPoll(first);
//下面程式碼執行緒等待時不再持有無用的first物件,直接釋放它
first = null;
//leader不為空,則某個awaitNanos執行緒已經在取任務,掛起執行緒
if (leader != null)
available.await();
//leader為空,此時沒有執行緒在取任務
else {
//設定leader為當前執行緒
Thread thisThread = Thread.currentThread();
leader = thisThread;
//呼叫awaitNanos方法等待固定時間後,將被喚醒
try {
available.awaitNanos(delay);
} finally {
//任務等待完畢,leader執行緒被喚醒,下個迴圈將返回節點,此時將leader設定為null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//佇列非空時,unlock前 隨機喚醒等待條件上的任一佇列
if (queue[0] != null)
available.signal();
lock.unlock();
}
}
/**
* 刪除f節點,將堆中尾節點設定為頭節點,然後進行下沉排序
*/
private MyScheduledFutureTask finishPoll(MyScheduledFutureTask f) {
int s = --size;
MyScheduledFutureTask x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return f;
}
/**
* 在堆尾部增加節點,實行堆排序的上浮操作
*/
private void siftUp(int k, MyScheduledFutureTask key) {
//如果子節點比父節點大,則替換
while (k > 0) {
int parent = (k - 1) / 2;
MyScheduledFutureTask e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
/**
* 從堆的頂部拿取節點,實現堆排序的下沉操作
*/
private void siftDown(int k, MyScheduledFutureTask key) {
int half = size / 2;
while (k < half) {
int child = (k*2) + 1;
MyScheduledFutureTask c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
//…………其餘方法略
}
2. 執行緒任務的實現
Runnable
的包裝類,記錄每個任務執行時的等待週期period
和下個週期任務應該觸發的時間time
,以及run()
結束前,將任務再次放入佇列中
/**
* 定時任務執行類
*/
public class MyScheduledFutureTask
implements Runnable, Delayed {
/** 任務觸發時間的納秒值 */
private long time;
/** 迴圈間隔的納秒值 */
private final long period;
/** 執行緒池中的佇列 */
private BlockingQueue queue;
/** 執行任務 */
private Runnable task;
public MyScheduledFutureTask(Runnable r, long time, int period, BlockingQueue<Runnable> queue) {
this.task = r;
this.time = time;
this.period = period;
this.queue = queue;
}
/**
* 自定義任務佇列實現了堆資料結構,此方法用於堆插入或者刪除時的堆排序
*/
@Override
public int compareTo(Delayed o) {
MyScheduledFutureTask x = (MyScheduledFutureTask)o;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else
return 1;
}
/**
* 獲取觸發時間與當前時間的時間差
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), NANOSECONDS);
}
@Override
public void run() {
//執行邏輯
task.run();
//任務執行結束後,將下次任務觸發的時間增加一週期
time += TimeUnit.SECONDS.toNanos((long)period);
//重新往執行緒池佇列中加入此任務
queue.add(this);
}
}
3. 定時執行緒池的實現
有了上面的基礎,定時執行緒池的實現就很簡單了。scheduleAtFixedRate()
方法中,將執行的任務command
封裝為包裝類MyScheduledFutureTask
;然後放入延遲佇列中;最後呼叫ensurePrestart()
方法往執行緒池放入一個空任務,使執行緒池建立執行緒開始不斷讀取佇列中的任務執行。
/**
* 自定義簡單定時執行緒池
*/
public class MyScheduledThreadPool extends MyThreadPool{
public MyScheduledThreadPool(int initPoolNum) {
super(initPoolNum, Integer.MAX_VALUE, new MyDelayQueue());
}
/**
* 每隔固定時間週期執行任務
* @param command 任務
* @param period 時間週期(以秒為單位)
*/
public void scheduleAtFixedRate(Runnable command, int period) {
if (command == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//包裝任務為週期任務
MyScheduledFutureTask mScheduledTask =
new MyScheduledFutureTask(command, triggerTime(period), period, getTaskQueue());
//延遲週期執行
delayedExecute(mScheduledTask);
}
private void delayedExecute(MyScheduledFutureTask task) {
getTaskQueue().add(task);
ensurePrestart();
}
/**
* 確保執行緒池已經啟動,有執行緒會去讀取佇列,並執行任務
*/
void ensurePrestart() {
execute(null);
}
/**
* 獲取觸發時間
* @param period 延遲時間
* @return 返回觸發時間
*/
long triggerTime(int period) {
return System.nanoTime() + TimeUnit.SECONDS.toNanos((long)period);
}
}
/**
* 自定義簡單執行緒池
*/
public class MyThreadPool{
/**存放執行緒的集合*/
private ArrayList<MyWorkThread> threads;
/**任務佇列*/
private BlockingQueue<Runnable> taskQueue;
/**執行緒池初始限定大小*/
private int threadNum;
/** 執行緒池最大大小 */
private int maxThreadNum;
/**已經工作的執行緒數目*/
private int workThreadNum;
private final ReentrantLock mainLock = new ReentrantLock();
public MyThreadPool(int initPoolNum) {
this.threadNum = initPoolNum;
maxThreadNum = initPoolNum;
this.threads = new ArrayList<>(initPoolNum);
//任務佇列初始化為執行緒池執行緒數的四倍
this.taskQueue = new ArrayBlockingQueue<>(initPoolNum*4);
this.workThreadNum = 0;
}
public MyThreadPool(int initPoolNum, int maxThreadNum, BlockingQueue<Runnable> taskQueue) {
this.threadNum = initPoolNum;
this.maxThreadNum = maxThreadNum;
this.threads = new ArrayList<>(initPoolNum);
//任務佇列初始化為執行緒池執行緒數的四倍
this.taskQueue = taskQueue;
this.workThreadNum = 0;
}
public void execute(Runnable runnable) {
try {
mainLock.lock();
//執行緒池未滿,每加入一個任務則開啟一個執行緒
if(workThreadNum < threadNum) {
MyWorkThread myThead = new MyWorkThread(runnable);
myThead.start();
threads.add(myThead);
workThreadNum++;
}
//執行緒池已滿,放入任務佇列,等待有空閒執行緒時執行
else {
//佇列已滿,無法新增、且執行緒數小於最大執行緒數時,新增一個任務執行緒跑任務
if(!taskQueue.offer(runnable) && workThreadNum < maxThreadNum) {
MyWorkThread overLimitThead = new MyWorkThread(runnable);
overLimitThead.start();
threads.add(overLimitThead);
workThreadNum++;
}
//佇列已滿,無法新增、且執行緒數大於等於最大執行緒數時,拒絕任務
else{
rejectTask();
}
}
} finally {
mainLock.unlock();
}
}
public BlockingQueue<Runnable> getTaskQueue() {
return taskQueue;
}
private void rejectTask() {
System.out.println("任務佇列已滿,無法繼續新增,請擴大您的初始化執行緒池!");
}
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool(5);
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"執行中");
}
};
for (int i = 0; i < 20; i++) {
myThreadPool.execute(task);
}
}
/**
* 自定義執行緒類
*/
class MyWorkThread extends Thread{
private Runnable task;
public MyWorkThread(Runnable runnable) {
this.task = runnable;
}
@Override
public void run() {
//該執行緒一直啟動著,不斷從任務佇列取出任務執行
while (true) {
//如果初始化任務不為空,則執行初始化任務
if(task != null) {
task.run();
task = null;
}
//否則去任務佇列取任務並執行
else {
Runnable queueTask = null;
try {
queueTask = taskQueue.take();
queueTask.run();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
補充:Java中定時執行緒池的實現
看完上面的內容,你其實已經對Java中ScheduledThreadPoolExecutor
的實現原理了解的差不多了。
這裡只補充下ScheduledThreadPoolExecutor
的定時執行任務的四種方法差別:
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {…………}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {…………}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {…………}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit{…………}
前兩個schedule方法是延遲執行任務,且只執行一次不迴圈。它們的區別在於形參中的任務型別不同,一個是Runnable
,一個是Callable
,其實Runnable
最後也被包裝成了Callable
型別。
只執行一次的實現是:將任務(ScheduledFutureTask
)的成員變數period
設定為0,當period
為0時任務執行結束不再往佇列中重新加入。
後兩個方法是週期執行任務,initialDelay
形參指明任務第一次執行時的延遲時間。它們的差別在於,scheduleAtFixedRate
是嚴格按照週期period
執行任務,例如任務每隔四秒執行一次。scheduleWithFixedDelay
是從任務結束起開始計時執行任務,例如任務執行完成後,再隔四秒執行一次。
它們的實現原理是:將任務(ScheduledFutureTask
)的成員變數period
設定為正數時,代表fixed-rate
方式執行;設定為負數時,代表fixed-delay
方式執行任務,。具體體現在:方法setNextRunTime()
(設定下一次任務執行時間的)中,根據period
的不同,計算方式不同:
private void setNextRunTime() {
long p = period;
//fixed-rate方式,直接在上一次執行任務的time上加上週期
if (p > 0)
time += p;
//fixed-delay方式,在現在時間now()上加上週期
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}