1. 程式人生 > >Java併發程式設計系列-(6) Java執行緒池

Java併發程式設計系列-(6) Java執行緒池

6. 執行緒池

6.1 基本概念

在web開發中,伺服器需要接受並處理請求,所以會為一個請求來分配一個執行緒來進行處理。如果每次請求都新建立一個執行緒的話實現起來非常簡便,但是存在一個問題:如果併發的請求數量非常多,但每個執行緒執行的時間很短,這樣就會頻繁的建立和銷燬執行緒,如此一來會大大降低系統的效率。可能出現伺服器在為每個請求建立新執行緒和銷燬執行緒上花費的時間和消耗的系統資源要比處理實際的使用者請求的時間和資源更多。

那麼有沒有一種辦法使執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務呢?這就是執行緒池的目的了。執行緒池為執行緒生命週期的開銷和資源不足問題提供瞭解決方案。通過對多個任務重用執行緒,執行緒建立的開銷被分攤到了多個任務上。

什麼時候使用執行緒池?

  • 單個任務處理時間比較短
  • 需要處理的任務數量很大

使用執行緒池好處

  • 降低資源消耗。通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。
  • 提高響應速度。當任務到達時,任務可以不需要的等到執行緒建立就能立即執行。
  • 提高執行緒的可管理性。執行緒是稀缺資源,如果無限制的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控。

6.2 實現自己的執行緒池

實現的執行緒池需要滿足以下基本條件:

1、執行緒必須在池子已經建立好了,並且可以保持住,要有容器儲存多個執行緒;
2、執行緒還要能夠接受外部的任務,執行這個任務。容器保持這個來不及執行的任務.

以下是執行緒池的具體實現:

執行緒池中實現了任務佇列,用來儲存所有的任務;工作執行緒,來執行具體的任務。

public class MyThreadPool2 {
    // 執行緒池中預設執行緒的個數為5
    private static int WORK_NUM = 5;
    // 佇列預設任務個數為100
    private static int TASK_COUNT = 100;  
    
    // 使用者在構造這個池,希望的啟動的執行緒數
    private final int worker_num;
    // 工作執行緒組
    private WorkThread[] workThreads;
    // 任務佇列,作為一個緩衝
    private final BlockingQueue<Runnable> taskQueue;

    // 建立具有預設執行緒個數的執行緒池
    public MyThreadPool2() {
        this(WORK_NUM,TASK_COUNT);
    }

    // 建立執行緒池,worker_num為執行緒池中工作執行緒的個數
    public MyThreadPool2(int worker_num,int taskCount) {
        if (worker_num<=0) worker_num = WORK_NUM;
        if(taskCount<=0) taskCount = TASK_COUNT;
        this.worker_num = worker_num;
        taskQueue = new ArrayBlockingQueue<>(taskCount);
        workThreads = new WorkThread[worker_num];
        for(int i=0;i<worker_num;i++) {
            workThreads[i] = new WorkThread();
            workThreads[i].start();
        }
    }

    // 執行任務,其實只是把任務加入任務佇列,什麼時候執行有執行緒池管理器決定
    public void execute(Runnable task) {
        try {
            taskQueue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 銷燬執行緒池,該方法保證在所有任務都完成的情況下才銷燬所有執行緒,否則等待任務完成才銷燬
    public void destroy() {
        // 工作執行緒停止工作,且置為null
        System.out.println("ready close pool.....");
        for(int i=0;i<worker_num;i++) {
            workThreads[i].stopWorker();
            workThreads[i] = null;//help gc
        }
        taskQueue.clear();// 清空任務佇列
    }

    // 覆蓋toString方法,返回執行緒池資訊:工作執行緒個數和已完成任務個數
    @Override
    public String toString() {
        return "WorkThread number:" + worker_num
                + "  wait task number:" + taskQueue.size();
    }

    /**
     * 內部類,工作執行緒
     */
    private class WorkThread extends Thread{
        
        @Override
        public void run(){
            Runnable r = null;
            try {
                while (!isInterrupted()) {
                    r = taskQueue.take();
                    if(r!=null) {
                        System.out.println(getId()+" ready exec :"+r);
                        r.run();
                    }
                    r = null;//help gc;
                } 
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
        
        public void stopWorker() {
            interrupt();
        }
        
    }
}

以下是測試程式:

分別建立多個任務,並放入執行緒池進行執行。

public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException {
        // 建立3個執行緒的執行緒池
        MyThreadPool2 t = new MyThreadPool2(3,0);
        t.execute(new MyTask("testA"));
        t.execute(new MyTask("testB"));
        t.execute(new MyTask("testC"));
        t.execute(new MyTask("testD"));
        t.execute(new MyTask("testE"));
        t.execute(new MyTask("testF"));
        t.execute(new MyTask("testG"));
        t.execute(new MyTask("testH"));
        System.out.println(t);
        Thread.sleep(10000);
        t.destroy();// 所有執行緒都執行完成才destory
        System.out.println(t);
    }

    // 任務類
    static class MyTask implements Runnable {

        private String name;
        private Random r = new Random();

        public MyTask(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }

        @Override
        public void run() {// 執行任務
            try {
                Thread.sleep(r.nextInt(1000)+2000);
            } catch (InterruptedException e) {
                System.out.println(Thread.currentThread().getId()+" sleep InterruptedException:"
                        +Thread.currentThread().isInterrupted());
            }
            System.out.println("任務 " + name + " 完成");
        }
    }
}

6.3 Executor框架

Executor框架是一個根據一組執行策略呼叫,排程,執行和控制的非同步任務的框架,目的是提供一種將”任務提交”與”任務如何執行”分離開來的機制。

Executor框架的類繼承關係如下圖:

J.U.C中有三個Executor介面:

  • Executor:一個執行新任務的簡單介面;
  • ExecutorService:擴充套件了Executor介面。添加了一些用來管理執行器生命週期和任務生命週期的方法;
  • ScheduledExecutorService:擴充套件了ExecutorService。支援Future和定期執行任務。

下面分別進行介紹:

1. Executor介面

Executor介面只有一個execute方法,用來替代通常建立或啟動執行緒的方法。

public interface Executor {
    void execute(Runnable command);
}

Executor介面只有一個execute方法,用來替代通常建立或啟動執行緒的方法。

executor.execute(new Thread())

對於不同的Executor實現,execute()方法可能是建立一個新執行緒並立即啟動,也有可能是使用已有的工作執行緒來執行傳入的任務,也可能是根據設定執行緒池的容量或者阻塞佇列的容量來決定是否要將傳入的執行緒放入阻塞佇列中或者拒絕接收傳入的執行緒。

2. ExecutorService介面

ExecutorService介面繼承自Executor介面,提供了管理終止的方法,以及可為跟蹤一個或多個非同步任務執行狀況而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支援即時關閉,也就是shutDownNow()方法,則任務需要正確處理中斷。

3. ScheduledExecutorService介面

ScheduledExecutorService擴充套件ExecutorService介面並增加了schedule方法。呼叫schedule方法可以在指定的延時後執行一個Runnable或者Callable任務。ScheduledExecutorService介面還定義了按照指定時間間隔定期執行任務的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

4. Executor框架基本使用流程

基本使用流程如下:

6.4 ThreadPoolExecutor分析

ThreadPoolExecutor繼承自AbstractExecutorService,也實現了ExecutorService介面。JDK中的提供的內建執行緒池基本都基於ThreadPoolExecutor實現,後面會仔細介紹。

建構函式及引數意義

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

構造方法中的欄位含義如下:

  • corePoolSize:執行緒池中核心執行緒數,執行的執行緒數<corePoolSize,就會建立新執行緒,>= corePoolSize,這個任務就會儲存到BlockingQueue,如果呼叫prestartAllCoreThreads()方法就會一次性的啟動corePoolSize個數的執行緒。
  • maximumPoolSize: 允許的最大執行緒數,BlockingQueue也滿了,< maximumPoolSize時候就會再次建立新的執行緒.
  • keepAliveTime: 執行緒空閒下來後,存活的時間,這個引數只在 >corePoolSize 才有用.
  • TimeUnit unit: 存活時間的單位值.
  • workQueue:儲存等待執行的任務的阻塞佇列,當提交一個新的任務到執行緒池以後, 執行緒池會根據當前執行緒池中正在執行著的執行緒的數量來決定對該任務的處理方式,主要有以下幾種處理方式:

    1. 使用直接切換佇列:這種方式常用的佇列是SynchronousQueue.
    2. 使用無界佇列:一般使用基於連結串列的阻塞佇列LinkedBlockingQueue。如果使用這種方式,那麼執行緒池中能夠建立的最大執行緒數就是corePoolSize,而maximumPoolSize就不會起作用了(後面也會說到)。當執行緒池中所有的核心執行緒都是RUNNING狀態時,這時一個新的任務提交就會放入等待佇列中。
    3. 使用有界佇列:一般使用ArrayBlockingQueue。使用該方式可以將執行緒池的最大執行緒數量限制為maximumPoolSize,這樣能夠降低資源的消耗,但同時這種方式也使得執行緒池對執行緒的排程變得更困難,因為執行緒池和佇列的容量都是有限的值,所以要想使執行緒池處理任務的吞吐率達到一個相對合理的範圍,又想使執行緒排程相對簡單,並且還要儘可能的降低執行緒池對資源的消耗,就需要合理的設定這兩個數量。
  • threadFactory:它是ThreadFactory型別的變數,用來建立新執行緒。預設使用Executors.defaultThreadFactory() 來建立執行緒。使用預設的ThreadFactory來建立執行緒時,會使新建立的執行緒具有相同的NORM_PRIORITY優先順序並且是非守護執行緒,同時也設定了執行緒的名稱。

  • handler:它是RejectedExecutionHandler型別的變數,表示執行緒池的飽和策略。如果阻塞佇列滿了並且沒有空閒的執行緒,這時如果繼續提交任務,就需要採取一種策略處理該任務。

執行緒池提供了4種策略:

  1. AbortPolicy:直接丟擲異常,這是預設策略;
  2. CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;
  3. DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,並執行當前任務;
  4. DiscardPolicy:直接丟棄任務;

任務執行

提交任務執行,主要有execute和submit兩種方式,主要區別是後者需要有返回值。

  • execute(Runnable command)
  • Future

下面主要介紹execute的流程:

簡單來說,在執行execute()方法時且狀態一直是RUNNING時,的執行過程如下:

  1. 如果workerCount < corePoolSize,則建立並啟動一個執行緒來執行新提交的任務;
  2. 如果workerCount >= corePoolSize,且執行緒池內的阻塞佇列未滿,則將任務新增到該阻塞佇列中;
  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且執行緒池內的阻塞佇列已滿,則建立並啟動一個執行緒來執行新提交的任務;
  4. 如果workerCount >= maximumPoolSize,並且執行緒池內的阻塞佇列已滿, 則根據拒絕策略來處理該任務, 預設的處理方式是直接拋異常。

整個流程可以用下圖來總結:

接下來結合程式碼進行分析:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * clt記錄著runState和workerCount
     */
    int c = ctl.get();
    /*
     * workerCountOf方法取出低29位的值,表示當前活動的執行緒數;
     * 如果當前活動執行緒數小於corePoolSize,則新建一個執行緒放入執行緒池中;
     * 並把任務新增到該執行緒中。
     */
    if (workerCountOf(c) < corePoolSize) {
        /*
         * addWorker中的第二個引數表示限制新增執行緒的數量是根據corePoolSize來判斷還是maximumPoolSize來判斷;
         * 如果為true,根據corePoolSize來判斷;
         * 如果為false,則根據maximumPoolSize來判斷
         */
        if (addWorker(command, true))
            return;
        /*
         * 如果新增失敗,則重新獲取ctl值
         */
        c = ctl.get();
    }
    /*
     * 如果當前執行緒池是執行狀態並且任務新增到佇列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新獲取ctl值
        int recheck = ctl.get();
        // 再次判斷執行緒池的執行狀態,如果不是執行狀態,由於之前已經把command新增到workQueue中了,
        // 這時需要移除該command
        // 執行過後通過handler使用拒絕策略對該任務進行處理,整個方法返回
        if (! isRunning(recheck) && remove(command))
            reject(command);
        /*
         * 獲取執行緒池中的有效執行緒數,如果數量是0,則執行addWorker方法
         * 這裡傳入的引數表示:
         * 1. 第一個引數為null,表示線上程池中建立一個執行緒,但不去啟動;
         * 2. 第二個引數為false,將執行緒池的有限執行緒數量的上限設定為maximumPoolSize,新增執行緒時根據maximumPoolSize來判斷;
         * 如果判斷workerCount大於0,則直接返回,在workQueue中新增的command會在將來的某個時刻被執行。
         */
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /*
     * 如果執行到這裡,有兩種情況:
     * 1. 執行緒池已經不是RUNNING狀態;
     * 2. 執行緒池是RUNNING狀態,但workerCount >= corePoolSize並且workQueue已滿。
     * 這時,再次呼叫addWorker方法,但第二個引數傳入為false,將執行緒池的有限執行緒數量的上限設定為maximumPoolSize;
     * 如果失敗則拒絕該任務
     */
    else if (!addWorker(command, false))
        reject(command);
}

addWorker方法的主要工作是線上程池中建立一個新的執行緒並執行,firstTask引數 用於指定新增的執行緒執行的第一個任務,core引數為true表示在新增執行緒時會判斷當前活動執行緒數是否少於corePoolSize,false表示新增執行緒前需要判斷當前活動執行緒數是否少於maximumPoolSize,程式碼如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        // 獲取執行狀態
        int rs = runStateOf(c);
        /*
         * 這個if判斷
         * 如果rs >= SHUTDOWN,則表示此時不再接收新任務;
         * 接著判斷以下3個條件,只要有1個不滿足,則返回false:
         * 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞佇列中已儲存的任務
         * 2. firsTask為空
         * 3. 阻塞佇列不為空
         * 
         * 首先考慮rs == SHUTDOWN的情況
         * 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會返回false;
         * 然後,如果firstTask為空,並且workQueue也為空,則返回false,
         * 因為佇列中已經沒有任務了,不需要再新增執行緒了
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            // 獲取執行緒數
            int wc = workerCountOf(c);
            // 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進位制是29個1),返回false;
            // 這裡的core是addWorker方法的第二個引數,如果為true表示根據corePoolSize來比較,
            // 如果為false則根據maximumPoolSize來比較。
            // 
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 嘗試增加workerCount,如果成功,則跳出第一個for迴圈
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果增加workerCount失敗,則重新獲取ctl的值
            c = ctl.get();  // Re-read ctl
            // 如果當前的執行狀態不等於rs,說明狀態已被改變,返回第一個for迴圈繼續執行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根據firstTask來建立Worker物件
        w = new Worker(firstTask);
        // 每一個Worker物件都會建立一個執行緒
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN表示是RUNNING狀態;
                // 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態並且firstTask為null,向執行緒池中新增執行緒。
                // 因為在SHUTDOWN時不會在新增新的任務,但還是會執行workQueue中的任務
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // workers是一個HashSet
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize記錄著執行緒池中出現過的最大執行緒數量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 啟動執行緒
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

關閉執行緒池

關閉執行緒池通常有如下兩種方式:

  • shutdownNow():設定執行緒池的狀態,還會嘗試停止正在執行或者暫停任務的執行緒
  • shutdown():設定執行緒池的狀態,只會中斷所有沒有執行任務的執行緒

執行緒池的引數配置

通常來講,根據任務的性質來分,可以劃分為:計算密集型(CPU),IO密集型,混合型。

  • 計算密集型:加密,大數分解,正則等,執行緒數適當小一點,最大推薦:機器的Cpu核心數+1,為什麼+1,防止頁缺失,(機器的Cpu核心=Runtime.getRuntime().availableProcessors();)
  • IO密集型:讀取檔案,資料庫連線,網路通訊, 執行緒數適當大一點,可以設定為機器的Cpu核心數*2。
  • 混合型:儘量拆分,IO密集型>>計算密集型,拆分意義不大,IO密集型~=計算密集型
    佇列的選擇上,應該使用有界,無界佇列可能會導致記憶體溢位,發生OOM。

執行緒池的狀態

執行緒池的執行狀態. 執行緒池一共有五種狀態, 分別是:

  1. RUNNING :能接受新提交的任務,並且也能處理阻塞佇列中的任務;
  2. SHUTDOWN:關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞佇列中已儲存的任務。線上程池處於 RUNNING 狀態時,呼叫 shutdown()方法會使執行緒池進入到該狀態。(finalize() 方法在執行過程中也會呼叫shutdown()方法進入該狀態);
  3. STOP:不能接受新任務,也不處理佇列中的任務,會中斷正在處理任務的執行緒。線上程池處於 RUNNING 或 SHUTDOWN 狀態時,呼叫 shutdownNow() 方法會使執行緒池進入到該狀態;
  4. TIDYING:如果所有的任務都已終止了,workerCount (有效執行緒數) 為0,執行緒池進入該狀態後會呼叫 terminated() 方法進入TERMINATED 狀態。
  5. TERMINATED:在terminated() 方法執行完後進入該狀態,預設terminated()方法中什麼也沒有做。
    進入TERMINATED的條件如下:
    • 執行緒池不是RUNNING狀態;
    • 執行緒池狀態不是TIDYING狀態或TERMINATED狀態;
    • 如果執行緒池狀態是SHUTDOWN並且workerQueue為空;
    • workerCount為0;
    • 設定TIDYING狀態成功。

下圖是執行緒池的狀態轉換過程,

6.5 Executors內建執行緒池

通常開發者都是利用 Executors 提供的通用執行緒池建立方法,去建立不同配置的執行緒池,主要區別在於不同的 ExecutorService 型別或者不同的初始引數。
Executors 目前提供了 5 種不同的執行緒池建立配置:

  • newCachedThreadPool(),它是一種用來處理大量短時間工作任務的執行緒池,具有幾個鮮明特點:它會試圖快取執行緒並重用,當無快取執行緒可用時,就會建立新的工作執行緒;如果執行緒閒置的時間超過60秒,則被終止並移出快取;長時間閒置時,這種執行緒池,不會消耗什麼資源。其內部使用 SynchronousQueue 作為工作佇列。
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • newFixedThreadPool(int nThreads),建立固定數目(nThreads)的執行緒,其背後使用的是無界的工作佇列,任何時候最多有 nThreads 個工作執行緒是活動的。這意味著,如果任務數量超過了活動佇列數目,將在工作佇列中等待空閒執行緒出現;如果有工作執行緒退出,將會有新的工作執行緒被建立,以補足指定的數目nThreads。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • newSingleThreadExecutor(),它的特點在於工作執行緒數目被限制為1,操作一個無界的工作佇列,所以它保證了所有任務的都是被順序執行,最多會有一個任務處於活動狀態,並且不允許使用者改動執行緒池例項,因此可以避免其改變執行緒數目。
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
  • newWorkStealingPool(int parallelism),這是一個經常被人忽略的執行緒池,Java 8 才加入這個建立方法,其內部會構建ForkJoinPool,利用Work-Stealing演算法,並行地處理任務,不保證處理順序。
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false,
             0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
    }
  • newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize),建立的是個 ScheduledExecutorService,可以進行定時或週期性的工作排程,區別在於單一工作執行緒還是多個工作執行緒。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

以下是ScheduledThreadPoolExecutor的建構函式,該類繼承於ThreadPoolExecutor,可以看到任務存放在DelayedWorkQueue。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }

類中提供了多種執行定時任務的方法,

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

總結下來,主要分三種:

  • schedule:只執行一次,任務還可以延時執行
  • scheduleAtFixedRate:提交固定時間間隔的任務
  • scheduleWithFixedDelay:提交固定延時間隔執行的任務

注意scheduleAtFixedRate和scheduleWithFixedDelay的區別,下圖給出了兩者執行任務時間上的示意圖。scheduleAtFixedRate總是間隔固定的時間來執行task,但是如果下圖中Task1執行超時,也就是超過了Fixed Time,當Task1執行完之後,Task2將立刻執行。scheduleWithFixedDelay不同的是,每個任務總是在上一個任務結束之後,等待固定的Fixed Delay Time後開始執行。

public class ScheduleWorkerTime implements Runnable{
    public final static int Long_8 = 8;//任務耗時8秒
    public final static int Short_2 = 2;//任務耗時2秒
    public final static int Normal_5 = 5;//任務耗時5秒

    public static SimpleDateFormat formater = new SimpleDateFormat(
            "HH:mm:ss");
    public static AtomicInteger count = new AtomicInteger(0);
    
    @Override
    public void run() {
        if(count.get()==0) {
            System.out.println("Long_8....begin:"+formater.format(new Date()));
            SleepTools.second(Long_8);
            System.out.println("Long_8....end:"+formater.format(new Date())); 
            count.incrementAndGet();
        }else if(count.get()==1) {
            System.out.println("Short_2 ...begin:"+formater.format(new Date()));
            SleepTools.second(Short_2);
            System.out.println("Short_2 ...end:"+formater.format(new Date()));
            count.incrementAndGet();            
        }else {
            System.out.println("Normal_5...begin:"+formater.format(new Date()));
            SleepTools.second(Normal_5);
            System.out.println("Normal_5...end:"+formater.format(new Date()));
            count.incrementAndGet(); 
        }
    }
    
    public static void main(String[] args) {
            ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(1);
            //任務間隔6秒
            schedule.scheduleAtFixedRate(new ScheduleWorkerTime(),
                    0, 6000, TimeUnit.MILLISECONDS);
    }
}

程式碼中定義了3個任務,分別執行8s,2s,5s,設定的固定間隔為6s。從輸出結果可以看到,第一個場任務結束後,第二個任務立刻開始執行,第二個任務執行完時,到了10s,此時等待2s後,第三個任務開始執行。由此可以看到,當前序任務沒超時,後續任務會按照指定的時間進行執行;如果有超時,則會馬上執行。

執行結果如下:
Long_8....begin:14:56:27
Long_8....end:14:56:35
Short_2 ...begin:14:56:35
Short_2 ...end:14:56:37
Normal_5...begin:14:56:39
Normal_5...end:14:56:44

注意最好在提交給ScheduledThreadPoolExecutor的任務要catch異常,否則發生異常之後,程式會終止執行。

6.6 CompletionService

使用場景

當向Executor提交多個任務並且希望獲得它們在完成之後的結果,如果用FutureTask,可以迴圈獲取task,並呼叫get方法去獲取task執行結果,但是如果task還未完成,獲取結果的執行緒將阻塞直到task完成,由於不知道哪個task優先執行完畢,使用這種方式效率不會很高。

在jdk5時候提出介面CompletionService,它整合了Executor和BlockingQueue的功能,可以更加方便在多個任務執行時,按任務完成順序獲取結果。

使用流程

CompletionService的使用流程如下:

  1. 宣告task執行載體,執行緒池executor;

  2. 宣告CompletionService,來包裝執行task的執行緒池,存放已完成狀態task的阻塞佇列,佇列預設為基於連結串列結構的阻塞佇列LinkedBlockingQueue;

  3. 呼叫submit方法提交task;

  4. 呼叫take方法獲取已完成狀態task。

public class CompletionServiceTest {
    
    // 宣告執行緒池
    private static ExecutorService executorService = Executors.newFixedThreadPool(100);
    
    public void test() {
        
        // 宣告CompletionService包裝Executor
        CompletionService<Long>  completionService = new ExecutorCompletionService<Long>(executorService);
        
        final int groupNum = 10000000 / 100;
        
        for ( int i = 1; i <= 100; i++) {
            int start = (i-1) * groupNum + 1;
            int end = i * groupNum;
            
            completionService.submit(new Callable<Long>() {
                
                @Override
                public Long call() throws Exception {
                    Long sum = 0L;
                    
                    for (int j = start; j <= end; j++) {
                        sum += j;
                    }
                    return sum;
                }
            });
        }
        
        long result = 0L;
        try {
            for (int i = 0; i < 100; i++) {
                long taskResult = completionService.take().get();
                System.out.println(taskResult);
                result += taskResult;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        System.out.println("the result is " + result);
    }
    
    public static void main(String[] args) {
        new CompletionServiceTest().test();
    }
}

原始碼分析

CompletionService介面提供五個方法:

  • Future

  • Future

  • Future

  • Future

  • Future

CompletionService與普通用FutureTask獲取結果的最大不同是,可以按照任務完成的順序返回結果。具體是如何實現的呢?

內部封裝了一個QueueingFuture物件,並且實現了done方法,在task執行完成之後將當前task新增到completionQueue。

    private static class QueueingFuture<V> extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task,
                       BlockingQueue<Future<V>> completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        private final Future<V> task;
        private final BlockingQueue<Future<V>> completionQueue;
        protected void done() { completionQueue.add(task); }
    }

done方法將在FutureTask的finishCompletion方法中被呼叫。只是預設done方法是空的,completionQueue實現了該方法。

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (WAITERS.weakCompareAndSet(this, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

參考:

  • https://www.jianshu.com/p/c4a31f914cc7

本文由『後端精進之路』原創,首發於部落格 http://teckee.github.io/ , 轉載請註明出處

搜尋『後端精進之路』關注公眾號,立刻獲取最新文章和價值2000元的BATJ精品面試課程。

相關推薦

Java併發程式設計系列-(6) Java執行

6. 執行緒池 6.1 基本概念 在web開發中,伺服器需要接受並處理請求,所以會為一個請求來分配一個執行緒來進行處理。如果每次請求都新建立一個執行緒的話實現起來非常簡便,但是存在一個問題:如果併發的請求數量非常多,但每個執行緒執行的時間很短,這樣就會頻繁的建立和銷燬執行緒,如此一來會大大降低系統的效率。

原創】Java併發程式設計系列2:執行概念與基礎操作

## 【原創】Java併發程式設計系列2:執行緒概念與基礎操作 > 偉大的理想只有經過忘我的鬥爭和犧牲才能勝利實現。 本篇為【Dali王的技術部落格】Java併發程式設計系列第二篇,講講有關執行緒的那些事兒。主要內容是如下這些: - 執行緒概念 - 執行緒基礎操作 ### 執行緒概念 程序代表了執

Java併發程式設計:4種執行和緩衝佇列BlockingQueue

一. 執行緒池簡介 1. 執行緒池的概念:           執行緒池就是首先建立一些執行緒,它們的集合稱為執行緒池。使用執行緒池可以很好地提高效能,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個任務傳給執行緒池,執行緒池就會啟動一

Java併發程式設計札記-(六)JUC執行-01概述

前面的例子中總是需要執行緒時就建立,不需要就銷燬它。但頻繁建立和銷燬執行緒是很耗資源的,在併發量較高的情況下頻繁建立和銷燬執行緒會降低系統的效率。執行緒池可以通過重複利用已建立的執行緒降低執行緒建立和銷

Java併發程式設計中四種執行及自定義執行使用教程

引言 通過前面的文章,我們學習了Executor框架中的核心類ThreadPoolExecutor ,對於執行緒池的核心排程機制有了一定的瞭解,並且成功使用ThreadPoolExecutor 建立了執行緒池。 而在Java中,除了ThreadPoolExecutor ,Executor框

Java併發程式設計的藝術(十)——執行(1)

執行緒池的作用 減少資源的開銷 減少了每次建立執行緒、銷燬執行緒的開銷。 提高響應速度 每次請求到來時,由於執行緒的建立已經完成,故可以直接執行任務,因此提高了響應速度。 提高執行緒的可管理性 執行緒是一種稀缺資源,若不加以限制,不僅會佔用大量資源

Java併發程式設計系列-(7) Java執行安全

7. 執行緒安全 7.1 執行緒安全的定義 如果多執行緒下使用這個類,不過多執行緒如何使用和排程這個類,這個類總是表示出正確的行為,這個類就是執行緒安全的。 類的執行緒安全表現為: 操作的原子性 記憶體的可見性 不做正確的同步,在多個執行緒之間共享狀態的時候,就會出現執行緒不安全。 7.2 如何保證執

實戰Java併發程式設計(3.2 執行

1.Executor jdk提供了一套Executor框架,本質上是一個執行緒池。 newFixedThreadPool()方法:該方法返回一個固定數量的執行緒池。該執行緒池中的執行緒數量始終不變,當有一個新任務提交時,執行緒池中若有空閒執行緒,則立即執行,若沒有,則任務會暫存在一個任

java併發程式設計實戰》之 執行安全性

1.執行緒安全性 當多個執行緒訪問某個類時,不管執行時環境採用何種排程方式或者這些執行緒將如何交替執行,並且在主調程式碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那麼這個類就是執行緒安全的。 無狀態物件一定是執行緒安全的,何為無狀態,就是類中不包含任何域,也不包含各種其

[讀書筆記][Java併發程式設計實戰]第二章 執行安全性

                                          第二章 執行緒安全性 1-什麼是執行緒安全的類? 當多個執行緒訪問某一個類時,不管執行時環境採用何種排程方式或者這些執行緒將如何交替執行,並且在主調程式碼中不需要任何額外的同步或協同,這個

(十)java併發程式設計--建立和啟動執行java.lang.Thread 、java.lang.Runnable)

執行緒建立的幾種方式. 建立和啟動一個執行緒 建立一個執行緒. Thread thread = new Thread(); 啟動java執行緒. thread.start(); 這兩個例子並沒有執行執行緒執行體,執行緒將會啟動後然後

學了Java併發程式設計藝術及多執行核心程式設計技術,以及最開始學的程式設計思想那本書,今天做些總結

併發Map分析位碼shift預設值是28,對hash值右移28位,取高四位,獲得segments位置,掩碼mask預設值16-1,作一個與值,不知道有何用處,兩個都是不可修改,初始值和併發度有關,一旦確立下來決定了segments陣列大小,包括segments陣列物件不可修改

Java粗淺認識-併發程式設計(五)-執行

執行緒池 先來總攬一下執行緒池結構 以上是執行緒池結構,常用的工具java.util.concurrent.Executors 結構如下 在Executors中常用的方法 Executors.newCachedThreadPool() 建立執行緒池核心poolSi

Java併發程式設計:什麼是執行安全,以及併發必須知道的幾個概念

廢話 眾所周知,在Java的知識體系中,併發程式設計是非常重要的一環,也是面試的必問題,一個好的Java程式設計師是必須對併發程式設計這塊有所瞭解的。為了追求成為一個好的Java程式設計師,我決定從今天開始死磕Java的併發程式設計,儘量彌補自己在這方面的知識缺陷。 併發必須知道的概念

Java併發程式設計中的多執行是怎麼實現的?

眾所周知,在Java的知識體系中,併發程式設計是非常重要的一環,也是面試中必問的題,一個好的Java程式設計師是必須對併發程式設計這塊有所瞭解的。 併發必須知道的概念在深入學習併發程式設計之前,我們需要了解幾個基本的概念。同步和非同步同步和非同步用請求返回呼叫的方式來理解相對簡單。 同步:

Java併發程式設計:如何建立執行、程序?

在前面一篇文章中已經講述了在程序和執行緒的由來,今天就來講一下在Java中如何建立執行緒,讓執行緒去執行一個子任務。下面先講述一下Java中的應用程式和程序相關的概念知識,然後再闡述如何建立執行緒以及如何建立程序。下面是本文的目錄大綱:   一.Java中關於應

21、Java併發類庫提供的執行有哪幾種? 分別有什麼特點?(高併發程式設計----7)

目錄 今天我要問你的問題是,Java 併發類庫提供的執行緒池有哪幾種? 分別有什麼特點? 典型回答 考點分析 知識擴充套件 下面我就從原始碼角度,分析執行緒池的設計與實現,我將主要圍繞最基礎的 ThreadPoolExecutor 原始碼。 進一步分析,執行緒池既然

Java併發程式設計的藝術(四)——執行的狀態

執行緒的狀態 初始態:NEW 建立一個Thread物件,但還未呼叫start()啟動執行緒時,執行緒處於初始態。 執行態:RUNNABLE 在Java中,執行態包括就緒態 和 執行態。 就緒態 該狀態下的執行緒已經獲得執行所需的所有資源

java併發程式設計(一) 執行安全(1)

最近想了解併發程式設計,二執行緒安全是它的基礎。所以看了下java相關的執行緒安全知識。 執行緒安全的核心是程式碼正確性(一般是輸出的結果); 首先無狀態的物件是執行緒安全的;因為一個無狀態的物件即不包含其他域;也沒有對其他域的引用; (1)原子性    原子性:即程式碼不

Java併發程式設計規則:構建執行安全的共享物件

構建執行緒安全的共享物件,使其在多執行緒環境下能夠提供安全的訪問。編寫正確的併發程式關鍵在於控制共享、可變的狀態進行訪問管理。synchornized關鍵字既可以阻塞程式,也可以維護操作的原子性,它是一個執行緒安全與非執行緒安全的臨界區標識,通過它我們可以控制物件的記憶體可