1. 程式人生 > >【Java併發學習四】如何實現一個定時執行緒池

【Java併發學習四】如何實現一個定時執行緒池

所謂 “定時任務執行緒池” 就是指放入執行緒池的任務,可以按照指定的等待週期迴圈執行。

Java裡面ScheduledThreadPoolExecutor這個類實現了這種功能。Spring裡面的定時任務也是在ScheduledThreadPoolExecutor的基礎上擴充套件而來。

如何實現

如圖,放入執行緒池的任務,線上程數超過corePoolSize的情況下會放入佇列,而執行緒池內的執行緒則不斷從佇列中讀取任務消費。

如果我們想要一個放入的任務每隔一段時間(如一小時)定時執行,似乎挺簡單:
1. 消費完的任務,需要再放進佇列中被消費,
2. 執行緒池中執行緒取任務的時間不能是馬上,得等待一小時後才消費。

第一點不難,關鍵是第二點如何實現。思考下,應該不能從執行緒下手,因為每個任務定時時間是不同的,執行緒消費時是不好控制的。那就只有從佇列下手了。

我們將放入的任務增加一個delay延遲欄位,然後使它被取出時,等待delay這麼長就行。

OK,看到這裡,你已經把定時任務執行緒池的原理理解的差不多啦~ 接下來我們看具體實現細節。

實現細節

1. 延遲佇列的實現

最難的地方也就是 延遲佇列 的實現。我們借鑑下已有的實現,在Java裡面查找了下已有佇列,果然發現一個DelayQueue的類。研究一番後發現,延遲佇列需要用到一種 叫做 “” 的資料結構。

堆其實就是一個完全二叉樹,延遲佇列中用的是 最小堆(父結點<=子結點)。一般用陣列來儲存,i

結點的父結點下標就為(i – 1) / 2。它的左右子結點下標分別為2i + 12i + 2

  • 堆陣列尾部新增元素時,需要不停將該元素與其父結點進行對比交換,類似於元素在“上升”。也就是使新新增的元素插入一個有序的序列中,形成一個新的有序堆序列
  • 堆刪除元素時,總是先刪除根結點,然後將最後一個元素移到根結點,與子結點對比交換,類似於元素在“下沉”。最終形成新的有序堆序列。

關於堆的更多細節可以自行百度谷歌,或者檢視我之前排序演算法總結裡面的堆排序: 資料結構基礎(六)排序

自定義延遲佇列

為什麼延遲佇列用堆實現呢?
     因為堆中元素是有順序的,延遲佇列中排序是以 任務的等待週期

比較,這樣保證了從延遲佇列中取出元素(即堆中獲取頭節點),總是取出的等待週期最小的任務消費。

   /**
     * 定義任務的compareTo方法,用於堆插入或者刪除時的堆排序
     */
    @Override
    public int compareTo(Delayed o) {
        MyScheduledFutureTask x = (MyScheduledFutureTask)o;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else
            return 1;
    }

那延遲佇列的延遲如何實現呢?
    延遲佇列用Condition.awaitNanos(delay)條件變數來實現了執行緒的等待。我們看下延遲佇列的成員變數,queue 就是一個堆結構的任務陣列;lock鎖保證的佇列新增和刪除時的執行緒安全;lockConditionleader(領導執行緒) 一起,控制執行緒的等待和喚醒。具體細節看下面。

/**
 * 自定義延遲佇列
 */
public class MyDelayQueue extends AbstractQueue implements BlockingQueue{

    /** 堆資料結構 */
    private MyScheduledFutureTask[] queue = new MyScheduledFutureTask[16];

    /** 佇列元素個數 */
    private int size = 0;

    /** 鎖,用於佇列新增和刪除時保持執行緒安全 */
    private final transient ReentrantLock lock = new ReentrantLock();

    /** 用於實現佇列延遲取出元素 */
    private final Condition available = lock.newCondition();

    /**
     * 領導執行緒,可理解為正在獲取節點的執行緒
     * 和鎖、Condition一起,
     * 控制佇列延遲獲取節點時,執行緒的等待和喚醒
     */
    private Thread leader = null;

     ………………
}

延遲佇列中新增任務是怎樣的?
     延遲佇列中新增任務很簡單,直接往堆尾部增加節點,然後執行 “上升”操作來重排序即可

public boolean add(Object o) {
        lock.lock();

        try {
            //佇列空間不足時,擴充套件
            if(size >= queue.length-1){
                queue = Arrays.copyOf(queue, queue.length*2);
            }

            MyScheduledFutureTask task = (MyScheduledFutureTask) o;
            //queue沒有任務時,直接往陣列第一個放入任務
            if(size == 0){
                queue[size++] = task;
            }
            //queue已經有任務時,在堆尾部增加任務,並實行堆上浮操作
            else {
                size++;
                siftUp(size-1, task);
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

重點!!!:延遲佇列中取出任務是怎樣的?
     延遲佇列中獲取並刪除任務比較複雜,因為執行緒池中多個執行緒同時在從延遲佇列中取任務,所以需要用lockConditionleader(領導執行緒) 一起,控制當一個執行緒在取任務,其餘執行緒阻塞,等到該任務獲取完畢,再喚醒其餘執行緒。

重點是leader的理解,leader可理解為正在獲取節點的執行緒。當leader為空時,證明沒有執行緒在從佇列中獲取節點,該執行緒可自己成為leader獲取任務節點;當leader不為空時,證明有執行緒正在獲取節點,此時的leader在堵塞倒計時中(awaitNanos(delay)),故該執行緒需要阻塞;當執行緒取元素結束時,都需要喚醒Condition等待上的任一執行緒。

public MyScheduledFutureTask take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            for (;;) {
                //取出堆中的頭節點
                MyScheduledFutureTask first = queue[0];
                //如果堆中沒有節點,則掛起執行緒
                if (first == null)
                    available.await();
                else {
                    //獲取節點任務的等待時間
                    long delay = first.getDelay(NANOSECONDS);
                    //如果已經不需要等待,直接返回節點任務,並將堆中尾節點視為頭節點進行堆下沉排序
                    if (delay <= 0)
                        return finishPoll(first);
                    //為下面程式碼執行緒等待時不再持有無用的first物件,直接釋放它
                    first = null;

                    //leader不為空,則某個awaitNanos執行緒已經在取任務,掛起執行緒
                    if (leader != null)
                        available.await();
                    //leader為空,此時沒有執行緒在取任務
                    else {
                        //設定leader為當前執行緒
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        //呼叫awaitNanos方法等待固定時間後,等待其他執行緒的喚醒
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            //leader執行緒被喚醒,下個迴圈將返回節點,此時將leader設定為null
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //佇列非空時,unlock前 隨機喚醒等待條件上的任一佇列
            if (queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }

   /**
     * 刪除f節點,將堆中尾節點設定為頭節點,然後進行下沉排序
     */
    private MyScheduledFutureTask finishPoll(MyScheduledFutureTask f) {
        int s = --size;
        MyScheduledFutureTask x = queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return f;
    }

最後貼出完整的自定義延遲佇列程式碼:

/**
 * 自定義延遲佇列
 */
public class MyDelayQueue extends AbstractQueue implements BlockingQueue{

    /** 堆資料結構 */
    private MyScheduledFutureTask[] queue = new MyScheduledFutureTask[16];

    /** 佇列元素個數 */
    private int size = 0;

    /** 鎖,用於佇列新增和刪除時保持執行緒安全 */
    private final transient ReentrantLock lock = new ReentrantLock();

    /** 用於實現佇列延遲取出元素 */
    private final Condition available = lock.newCondition();

    /**
     * 領導執行緒,可理解為正在獲取節點的執行緒
     * 和鎖、Condition一起,
     * 控制佇列延遲獲取節點時,執行緒的等待和喚醒
     */
    private Thread leader = null;

    @Override
    public int size() {
        return size;
    }

    @Override
    public boolean add(Object o) {
        lock.lock();

        try {
            //佇列空間不足時,擴充套件
            if(size >= queue.length-1){
                queue = Arrays.copyOf(queue, queue.length*2);
            }

            MyScheduledFutureTask task = (MyScheduledFutureTask) o;
            //queue沒有任務時,直接往陣列第一個放入任務
            if(size == 0){
                queue[size++] = task;
            }
            //queue已經有任務時,在堆尾部增加任務,並實行堆上浮操作
            else {
                size++;
                siftUp(size-1, task);
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

    @Override
    public MyScheduledFutureTask take() throws InterruptedException {
        lock.lockInterruptibly();
        try {
            for (;;) {
                //取出堆中的頭節點
                MyScheduledFutureTask first = queue[0];
                //如果堆中沒有節點,則掛起執行緒
                if (first == null)
                    available.await();
                else {
                    //獲取節點任務的等待時間
                    long delay = first.getDelay(NANOSECONDS);
                    //如果已經不需要等待,直接返回節點任務,並將下一個節點視為頭節點進行堆排序
                    if (delay <= 0)
                        return finishPoll(first);
                    //下面程式碼執行緒等待時不再持有無用的first物件,直接釋放它
                    first = null;

                    //leader不為空,則某個awaitNanos執行緒已經在取任務,掛起執行緒
                    if (leader != null)
                        available.await();
                    //leader為空,此時沒有執行緒在取任務
                    else {
                        //設定leader為當前執行緒
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        //呼叫awaitNanos方法等待固定時間後,將被喚醒
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            //任務等待完畢,leader執行緒被喚醒,下個迴圈將返回節點,此時將leader設定為null
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //佇列非空時,unlock前 隨機喚醒等待條件上的任一佇列
            if (queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }

    /**
     * 刪除f節點,將堆中尾節點設定為頭節點,然後進行下沉排序
     */
    private MyScheduledFutureTask finishPoll(MyScheduledFutureTask f) {
        int s = --size;
        MyScheduledFutureTask x = queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return f;
    }

    /**
     * 在堆尾部增加節點,實行堆排序的上浮操作
     */
    private void siftUp(int k, MyScheduledFutureTask key) {
        //如果子節點比父節點大,則替換
        while (k > 0) {
            int parent = (k - 1) / 2;
            MyScheduledFutureTask e = queue[parent];
            if (key.compareTo(e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }

        queue[k] = key;
    }

    /**
     * 從堆的頂部拿取節點,實現堆排序的下沉操作
     */
    private void siftDown(int k, MyScheduledFutureTask key) {
        int half = size / 2;
        while (k < half) {
            int child = (k*2) + 1;
            MyScheduledFutureTask c = queue[child];
            int right = child + 1;
            if (right < size && c.compareTo(queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo(c) <= 0)
                break;
            queue[k] = c;
            k = child;
        }

        queue[k] = key;
    }


    //…………其餘方法略
}

2. 執行緒任務的實現

Runnable的包裝類,記錄每個任務執行時的等待週期period和下個週期任務應該觸發的時間time,以及run()結束前,將任務再次放入佇列中

/**
 * 定時任務執行類
 */
public class MyScheduledFutureTask
        implements Runnable, Delayed {

    /** 任務觸發時間的納秒值 */
    private long time;

    /** 迴圈間隔的納秒值 */
    private final long period;

    /** 執行緒池中的佇列 */
    private BlockingQueue queue;

    /** 執行任務 */
    private Runnable task;

    public MyScheduledFutureTask(Runnable r, long time, int period, BlockingQueue<Runnable> queue) {
        this.task = r;
        this.time = time;
        this.period = period;
        this.queue = queue;
    }

    /**
     * 自定義任務佇列實現了堆資料結構,此方法用於堆插入或者刪除時的堆排序
     */
    @Override
    public int compareTo(Delayed o) {
        MyScheduledFutureTask x = (MyScheduledFutureTask)o;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else
            return 1;
    }

    /**
     *  獲取觸發時間與當前時間的時間差
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - System.nanoTime(), NANOSECONDS);
    }

    @Override
    public void run() {
        //執行邏輯
        task.run();
        //任務執行結束後,將下次任務觸發的時間增加一週期
        time += TimeUnit.SECONDS.toNanos((long)period);
        //重新往執行緒池佇列中加入此任務
        queue.add(this);
    }


}

3. 定時執行緒池的實現

有了上面的基礎,定時執行緒池的實現就很簡單了。scheduleAtFixedRate()方法中,將執行的任務command封裝為包裝類MyScheduledFutureTask;然後放入延遲佇列中;最後呼叫ensurePrestart()方法往執行緒池放入一個空任務,使執行緒池建立執行緒開始不斷讀取佇列中的任務執行。

/**
 * 自定義簡單定時執行緒池
 */
public class MyScheduledThreadPool extends  MyThreadPool{

    public MyScheduledThreadPool(int initPoolNum) {
        super(initPoolNum, Integer.MAX_VALUE, new MyDelayQueue());
    }


    /**
     * 每隔固定時間週期執行任務
     * @param command 任務
     * @param period 時間週期(以秒為單位)
     */
    public void scheduleAtFixedRate(Runnable command, int period) {
        if (command == null)
            throw new NullPointerException();

        if (period <= 0)
            throw new IllegalArgumentException();

        //包裝任務為週期任務
        MyScheduledFutureTask mScheduledTask =
                new MyScheduledFutureTask(command, triggerTime(period), period, getTaskQueue());
        //延遲週期執行
        delayedExecute(mScheduledTask);
    }

    private void delayedExecute(MyScheduledFutureTask task) {
        getTaskQueue().add(task);
        ensurePrestart();
    }


    /**
     * 確保執行緒池已經啟動,有執行緒會去讀取佇列,並執行任務
     */
    void ensurePrestart() {
        execute(null);
    }

    /**
     * 獲取觸發時間
     * @param period 延遲時間
     * @return 返回觸發時間
     */
    long triggerTime(int period) {

        return System.nanoTime() + TimeUnit.SECONDS.toNanos((long)period);
    }
}

/**
 * 自定義簡單執行緒池
 */
public class MyThreadPool{
    /**存放執行緒的集合*/
    private ArrayList<MyWorkThread> threads;
    /**任務佇列*/
    private BlockingQueue<Runnable> taskQueue;
    /**執行緒池初始限定大小*/
    private int threadNum;
    /** 執行緒池最大大小 */
    private int maxThreadNum;
    /**已經工作的執行緒數目*/
    private int workThreadNum;

    private final ReentrantLock mainLock = new ReentrantLock();

    public MyThreadPool(int initPoolNum) {
        this.threadNum = initPoolNum;
        maxThreadNum = initPoolNum;
        this.threads = new ArrayList<>(initPoolNum);
        //任務佇列初始化為執行緒池執行緒數的四倍
        this.taskQueue = new ArrayBlockingQueue<>(initPoolNum*4);

        this.workThreadNum = 0;
    }

    public MyThreadPool(int initPoolNum, int maxThreadNum, BlockingQueue<Runnable> taskQueue) {
        this.threadNum = initPoolNum;
        this.maxThreadNum = maxThreadNum;
        this.threads = new ArrayList<>(initPoolNum);
        //任務佇列初始化為執行緒池執行緒數的四倍
        this.taskQueue = taskQueue;

        this.workThreadNum = 0;
    }

    public void execute(Runnable runnable) {
        try {
            mainLock.lock();
            //執行緒池未滿,每加入一個任務則開啟一個執行緒
            if(workThreadNum < threadNum) {
                MyWorkThread myThead = new MyWorkThread(runnable);
                myThead.start();
                threads.add(myThead);
                workThreadNum++;
            }
            //執行緒池已滿,放入任務佇列,等待有空閒執行緒時執行
            else {
                //佇列已滿,無法新增、且執行緒數小於最大執行緒數時,新增一個任務執行緒跑任務
                if(!taskQueue.offer(runnable) && workThreadNum < maxThreadNum) {
                    MyWorkThread overLimitThead = new MyWorkThread(runnable);
                    overLimitThead.start();
                    threads.add(overLimitThead);
                    workThreadNum++;
                }

                //佇列已滿,無法新增、且執行緒數大於等於最大執行緒數時,拒絕任務
                else{
                    rejectTask();
                }
            }
        } finally {
            mainLock.unlock();
        }
    }

    public BlockingQueue<Runnable> getTaskQueue() {
        return taskQueue;
    }

    private void rejectTask() {
        System.out.println("任務佇列已滿,無法繼續新增,請擴大您的初始化執行緒池!");
    }

    public static void main(String[] args) {
        MyThreadPool myThreadPool = new MyThreadPool(5);
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"執行中");
            }
        };

        for (int i = 0; i < 20; i++) {
            myThreadPool.execute(task);
        }
    }

    /**
     * 自定義執行緒類
     */
    class MyWorkThread extends Thread{
        private Runnable task;

        public MyWorkThread(Runnable runnable) {
            this.task = runnable;
        }
        @Override
        public void run() {
            //該執行緒一直啟動著,不斷從任務佇列取出任務執行
            while (true) {
                //如果初始化任務不為空,則執行初始化任務
                if(task != null) {
                    task.run();
                    task = null;
                }
                //否則去任務佇列取任務並執行
                else {
                    Runnable queueTask = null;
                    try {
                        queueTask = taskQueue.take();
                        queueTask.run();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

補充:Java中定時執行緒池的實現

看完上面的內容,你其實已經對Java中ScheduledThreadPoolExecutor的實現原理了解的差不多了。

這裡只補充下ScheduledThreadPoolExecutor的定時執行任務的四種方法差別:

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {…………}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {…………}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {…………}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit{…………}

前兩個schedule方法是延遲執行任務,且只執行一次不迴圈。它們的區別在於形參中的任務型別不同,一個是Runnable,一個是Callable,其實Runnable最後也被包裝成了Callable型別。

只執行一次的實現是:將任務(ScheduledFutureTask)的成員變數period設定為0,當period為0時任務執行結束不再往佇列中重新加入。

後兩個方法是週期執行任務,initialDelay形參指明任務第一次執行時的延遲時間。它們的差別在於,scheduleAtFixedRate是嚴格按照週期period執行任務,例如任務每隔四秒執行一次。scheduleWithFixedDelay是從任務結束起開始計時執行任務,例如任務執行完成後,再隔四秒執行一次。

它們的實現原理是:將任務(ScheduledFutureTask)的成員變數period設定為正數時,代表fixed-rate方式執行;設定為負數時,代表fixed-delay方式執行任務,。具體體現在:方法setNextRunTime()(設定下一次任務執行時間的)中,根據period的不同,計算方式不同:

private void setNextRunTime() {
            long p = period;
            //fixed-rate方式,直接在上一次執行任務的time上加上週期
            if (p > 0)
                time += p;
            //fixed-delay方式,在現在時間now()上加上週期
            else
                time = triggerTime(-p);
        }

long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }