Java定時任務Timer排程器【二】 多執行緒原始碼分析(圖文版)
上一節通過一個小例子分析了Timer執行過程,牽涉的執行執行緒雖然只有兩個,但實際場景會比上面複雜一些。
首先通過一張簡單類圖(只列出簡單的依賴關係)看一下Timer暴露的介面。
為了演示Timer所暴露的介面,下面舉一個極端的例子(每一個介面方法面向單獨的執行執行緒),照樣以鬧鐘為例(原始碼只列出關鍵部分,下同)。
public class ScheduleDemo { public static void main(String[] args) throws Exception { AlarmTask alarm1 = new AlarmTask("鬧鐘1"); AlarmTask alarm2 = new AlarmTask("鬧鐘2"); new Thread("執行緒1"){ public void run() { log.info("["+Thread.currentThread().getName()+"]排程鬧鐘1"); timer.schedule(alarm1,delay,period); } }.start(); new Thread("執行緒2"){ public void run() { log.info("["+Thread.currentThread().getName()+"]排程鬧鐘2"); timer.schedule(alarm2,delay,period); } }.start(); Thread.sleep(1500); new Thread("執行緒3"){ public void run() { log.info("["+Thread.currentThread().getName()+"]取消鬧鐘2"); alarm2.cancel(); } }.start(); new Thread("執行緒4"){ public void run() { log.info("["+Thread.currentThread().getName()+"]清理無用鬧鐘"); timer.purge(); } }.start(); new Thread("執行緒5"){ public void run() { log.info("["+Thread.currentThread().getName()+"]關閉所有鬧鐘"); timer.cancel(); } }.start(); } /** * 模擬鬧鐘 */ static class AlarmTask extends TimerTask{ String name ; public AlarmTask(String name){ this.name=name; } public void run() { log.info("["+Thread.currentThread().getName()+"]-["+name+"]嘀。。。"); Thread.sleep(1000); //模擬鬧鐘執行時間 } } }
執行結果
[執行緒2]排程鬧鐘2
[執行緒1]排程鬧鐘1
[Timer-0]-[鬧鐘2]嘀。。。
[執行緒3]取消鬧鐘2
[執行緒4]清理無用鬧鐘
[執行緒5]關閉所有鬧鐘
下面我們依次檢視一下每個介面方法的原始碼。
1. 檢視Timer.sched()原始碼
public void schedule(TimerTask task, long delay, long period) { sched(task, System.currentTimeMillis()+delay, -period); } private void sched(TimerTask task, long time, long period) { // 如果period無限大,保證其在一個合理的範圍內 if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; // 加queue鎖,保證佇列操作的執行緒安全 synchronized(queue) { // 加lock鎖,保證任務狀態的一致性(多執行緒環境下) synchronized(task.lock) { task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } // 將任務加入佇列實現排序 queue.add(task); if (queue.getMin() == task) queue.notify(); } }
其中queue.add(task在)將任務加入佇列的同時實現了內部排序。
void add(TimerTask task) { // 佇列不足時,以兩倍容量擴增 if (size + 1 == queue.length) // 從效能上要快於new一個數組的效率 queue = Arrays.copyOf(queue, 2 * queue.length); queue[++size] = task; // 利用二分查詢演算法實現任務排序 fixUp(size); } private void fixUp(int k) { while (k > 1) { int j = k >> 1; if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) break; TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp; k = j; } }
從方法sched()可以看到,該方法一方面持有queue鎖,用來維護佇列排序的執行緒安全;一方面持有lock鎖,用來維護任務狀態的執行緒安全。
2. 檢視TimerTask.cancel()原始碼
public abstract class TimerTask implements Runnable {
final Object lock = new Object();
public boolean cancel() {
synchronized(lock) {
boolean result = (state == SCHEDULED);
state = CANCELLED;
return result;
}
}
對於任務的取消操作,只是簡單的修改一下任務狀態,中途也只佔有一個lock鎖!接著看一下執行任務的執行緒邏輯。
class TimerThread extends Thread {
private TaskQueue queue;
public void run() {
mainLoop();
}
private void mainLoop() {
while (true) {
synchronized(queue) {
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
task = queue.getMin();
// 此處加task鎖,防止其他執行緒同時呼叫task.cancel()
synchronized(task.lock) {
// ...維護鬧鐘狀態
}
}
if (!taskFired) // 時間未到
queue.wait(executionTime - currentTime);
}
if (taskFired)
// 執行鬧鐘時,沒有保持任何鎖
task.run();
}
}
可以看到當TimerThead真正執行鬧鐘時,是沒有持鎖的,所以當鬧鐘正在執行的時候AlarmTask.cancel()對其是不起作用的,換言之,只能取消下一次將要執行的鬧鐘。
3. 檢視Timer.purge()原始碼
public class Timer {
private final TaskQueue queue = new TaskQueue();
// 保證被取消的task能及時進行垃圾回收
public int purge() {
int result = 0;
synchronized(queue) {
for (int i = queue.size(); i > 0; i--) {
if (queue.get(i).state == TimerTask.CANCELLED) {
queue.quickRemove(i);
result++;
}
}
if (result != 0)
// 重新整理佇列中有效的任務
queue.heapify();
}
return result;
}
進一步檢視queue.quickRemove(i)和queue.heapify()。
class TaskQueue {
void quickRemove(int i) {
queue[i] = queue[size];
queue[size--] = null; //清除無效任務,防止記憶體洩漏
}
private void fixDown(int k) {
int j;
while ((j = k << 1) <= size && j > 0) {
if (j < size &&
queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
j++; // j indexes smallest kid
if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
break;
TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;
k = j;
}
}
void heapify() {
for (int i = size/2; i >= 1; i--)
fixDown(i);
}
可以看到Timer.purge()在持有queue鎖時主要做兩件事
1.及時清除佇列中無效的鬧鐘防止記憶體洩漏。
2.重新規整佇列中鬧鐘。
4. 最後看一下Timer.cancel()原始碼
public class Timer {
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);
public void cancel() {
synchronized(queue) {
thread.newTasksMayBeScheduled = false;
queue.clear();
//防止佇列為空的情況下,TimerThead無限等待
queue.notify();
}
}
該方法在清除所有鬧鐘的同時,與TimerThread發生了一次執行緒通訊——喚醒TimerThread並讓其永久退出。
private void mainLoop() {
while (true) {
synchronized(queue) {
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // TimerThread永久退出
queue.wait(executionTime - currentTime);
}
}
}
以上是整個過程的靜態分析,現在捕捉一個執行緒快照進行動態分析。為了dump一個特定時刻的執行緒快照,現在在Timer.sched()打一個斷點(注意斷點的方式與位置)。
以debug模式執行下面的例子。
public class ScheduleDemo {
public static void main(String[] args) throws Exception {
AlarmTask alarm1 = new AlarmTask("鬧鐘1");
AlarmTask alarm2 = new AlarmTask("鬧鐘2");
new Thread("執行緒1"){
public void run() {
log.info("["+Thread.currentThread().getName()+"]排程鬧鐘1");
timer.schedule(alarm1,delay,period);
}
}.start();
new Thread("執行緒2"){
public void run() {
log.info("["+Thread.currentThread().getName()+"]排程鬧鐘2");
timer.schedule(alarm2,delay,period);
}
}.start();
Thread.sleep(1500);
new Thread("執行緒3"){
public void run() {
log.info("["+Thread.currentThread().getName()+"]取消鬧鐘2");
alarm2.cancel();
}
}.start();
new Thread("執行緒4"){
public void run() {
log.info("["+Thread.currentThread().getName()+"]清理無用鬧鐘");
timer.purge();
}
}.start();
new Thread("執行緒5"){
public void run() {
log.info("["+Thread.currentThread().getName()+"]關閉所有鬧鐘");
timer.cancel();
}
}.start();
}
/**
* 模擬鬧鐘
*/
static class AlarmTask extends TimerTask{
String name ;
public AlarmTask(String name){
this.name=name;
}
public void run() {
log.info("["+Thread.currentThread().getName()+"]-["+name+"]嘀。。。");
Thread.sleep(1000); //模擬鬧鐘執行時間
}
}
}
下圖是visualVM工具dump出的執行緒快照(斷點處)
通過上面的快照可以看到,當“執行緒1“(持有兩把鎖)處於RUNNABLE狀態時,”執行緒2“、“執行緒3”、“執行緒4”、“執行緒5”都處於BLOCKED狀態。需要注意的是,因為TimerThread的時間未到,暫時處於WATING狀態(等待喚醒)。
下面是一個簡單的形象圖
總結:Timer為了保證執行緒安全,使用了大量的鎖機制,整體上對CPU的利用率不高。