1. 程式人生 > >JAVA併發程式設計:執行緒池Executors

JAVA併發程式設計:執行緒池Executors

Java中對執行緒池提供了很好的支援,有了執行緒池,我們就不需要自已再去建立執行緒。如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。JAVA的執行緒池中的執行緒可以在執行完任務後,不銷燬,繼續執行其他的任務。所以瞭解Java的執行緒池對我們掌握併發程式設計是很有幫助的,下面我就從Executors這個多執行緒框架開始講起,首先看一下Executors中主要的方法

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }


如果我們要Exectutors來建立執行緒池,基本離不開上面幾個方法,我們可以看出上面幾個方法中都用到了ThreadPoolExecutor這個類,且都返回ExecutorService物件。我們先看一下ExecutorService的主要方法。
public interface ExecutorService extends Executor {
    void shutdown();

    List<Runnable> shutdownNow();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
}
shutdown() 啟動一次順序關閉,執行以前提交的任務,但不接受新任務

shutdownnow()試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表

submit()用來提交任務。

Executors一般如下使用

	ExecutorService service = Executors.newFixedThreadPool(1);
		for(int i=0;i<10;i++){
			service.submit(new Task(i));
		}
		service.shutdown();
Exectors呼叫靜態方法,利用ThreadPoolExecutor來建立執行緒池,然後submit提交任務,我們看一下submit的程式碼
 public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Object> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

ThreaPoolExecutor實現原理深入分析

我們先來看一下Executors執行緒池框架用到幾個主要的類(介面)這件的關係。

ThreadPoolExecutor、AbstractExecutorService(抽象類)、ExecutorService(介面)和Executor(介面)幾個之間的關係如下。

Executor是一個頂層介面,在它裡面只聲明瞭一個方法execute(Runnable),返回值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;

然後ExecutorService介面繼承了Executor介面,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象類AbstractExecutorService實現了ExecutorService介面,基本實現了ExecutorService中宣告的所有方法;

然後ThreadPoolExecutor繼承了類AbstractExecutorService。

在ThreadPoolExecutor類中有幾個非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法實際上是Executor介面中宣告的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行。所以上面ExecutorService中submit()方法中呼叫的execute()方法是在ThreadPollExecutor中實現的。所以我們研究ThreadPoolExecutor這個類中首先從execute()這個方法開始,在開始之間先看一下ThreadPoolExecutor中的建構函式和一些主要的成員變數

ThreadPoolExecutor建構函式

ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
	.....
	public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
	        BlockingQueue<Runnable> workQueue);

	public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
	        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

	public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
	        BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

	public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
		BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
	...
}

從上面的程式碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的原始碼具體實現,發現前面三個構造器都是呼叫的第四個構造器進行的初始化工作。

下面解釋下一下構造器中各個引數的含義:

  • corePoolSize:核心池的大小,這個引數跟後面講述的執行緒池的實現原理有非常大的關係。在建立了執行緒池後,預設情況下,執行緒池中並沒有任何執行緒,而是等待有任務到來才建立執行緒去執行任務,除非呼叫了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預建立執行緒的意思,即在沒有任務到來之前就建立corePoolSize個執行緒或者一個執行緒。預設情況下,在建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取隊列當中;
  • maximumPoolSize:執行緒池最大執行緒數,這個引數也是一個非常重要的引數,它表示線上程池中最多能建立多少個執行緒;
  • keepAliveTime:表示執行緒沒有任務執行時最多保持多久時間會終止。預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用,直到執行緒池中的執行緒數不大於corePoolSize,即當執行緒池中的執行緒數大於corePoolSize時,如果一個執行緒空閒的時間達到keepAliveTime,則會終止,直到執行緒池中的執行緒數不超過corePoolSize。但是如果呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0;
  • unit:引數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒

  • workQueue:一個阻塞佇列,用來儲存等待執行的任務,這個引數的選擇也很重要,會對執行緒池的執行過程產生重大影響,一般來說,這裡的阻塞佇列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

 ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。執行緒池的排隊策略與BlockingQueue有關。

  • threadFactory:執行緒工廠,主要用來建立執行緒;
  • handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務 

ThreadPoolExecutor主要成員變數

private final BlockingQueue<Runnable> workQueue;              //任務快取佇列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //執行緒池的主要狀態鎖,對執行緒池狀態(比如執行緒池大小
															  //、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工作集

private volatile long  keepAliveTime;    //執行緒存貨時間    
private volatile boolean allowCoreThreadTimeOut;   //是否允許為核心執行緒設定存活時間
private volatile int   corePoolSize;     //核心池的大小(即執行緒池中的執行緒數目大於這個引數時,提交的任務會被放進任務快取佇列)
private volatile int   maximumPoolSize;   //執行緒池最大能容忍的執行緒數

private volatile int   poolSize;       //執行緒池中當前的執行緒數

private volatile RejectedExecutionHandler handler; //任務拒絕策略

private volatile ThreadFactory threadFactory;   //執行緒工廠,用來建立執行緒

private int largestPoolSize;   //用來記錄執行緒池中曾經出現過的最大執行緒數

private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

執行緒池狀態

在ThreadPoolExecutor中定義了一個volatile變數,另外定義了幾個static final變量表示執行緒池的各個狀態:

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

runState表示當前執行緒池的狀態,它是一個volatile變數用來保證執行緒之間的可見性;

下面的幾個static final變量表示runState可能的幾個取值。

當建立執行緒池後,初始時,執行緒池處於RUNNING狀態;

如果呼叫了shutdown()方法,則執行緒池處於SHUTDOWN狀態,此時執行緒池不能夠接受新的任務,它會等待所有任務執行完畢;

如果呼叫了shutdownNow()方法,則執行緒池處於STOP狀態,此時執行緒池不能接受新的任務,並且會去嘗試終止正在執行的任務;

當執行緒池處於SHUTDOWN或STOP狀態,並且所有工作執行緒已經銷燬,任務快取佇列已經清空或執行結束後,執行緒池被設定為TERMINATED狀態。

ThreadPoolExecutor實現原理

介紹完ThreadPoolExecutor的構函式、成員變數和執行緒狀態,我們來介紹最核心的內容。上文已經介紹過當我們把任務用ExecutorService的submit提交後是呼叫的ThreadPoolExecutor的execute來執行的,我們來看一下execute的程式碼

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

我們開分析一下上面的程式碼,如果提交的任務command為null,則丟擲空異常,

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

由於是或條件運算子,所以先計算前半部分的值,如果執行緒池中當前執行緒數不小於核心池大小,那麼就會直接進入下面的if語句塊了。

如果執行緒池中當前執行緒數小於核心池大小,則接著執行後半部分,也就是執行

addIfUnderCorePoolSize(command)

如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。

如果執行完addIfUnderCorePoolSize這個方法返回false,然後接著判斷:

if (runState == RUNNING && workQueue.offer(command))

如果當前執行緒池處於RUNNING狀態,則將任務放入任務快取佇列;如果當前執行緒池不處於RUNNING狀態或者任務放入快取佇列失敗,則執行:

addIfUnderMaximumPoolSize(command)

如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。回到前面:

if (runState == RUNNING && workQueue.offer(command))

 這句的執行,如果說當前執行緒池處於RUNNING狀態且將任務放入任務快取佇列成功,則繼續進行判斷:

if (runState != RUNNING || poolSize == 0)

 這句判斷是為了防止在將此任務新增進任務快取佇列的同時其他執行緒突然呼叫shutdown或者shutdownNow方法關閉了執行緒池的一種應急措施。如果是這樣就執行

ensureQueuedTaskHandled(command)

進行應急處理,從名字可以看出是保證 新增到任務快取佇列中的任務得到處理。

我們接著看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //建立執行緒去執行firstTask任務    
        } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低於核心吃大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到執行緒池狀態的變化,先通過if語句判斷當前執行緒池中的執行緒數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有執行緒池當前執行緒數目小於核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中並沒有加鎖,因此可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完之後,在其他執行緒中又向執行緒池提交了任務,就可能導致poolSize不小於corePoolSize了,所以需要在這個地方繼續判斷。然後接著判斷執行緒池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他執行緒中呼叫了shutdown或者shutdownNow方法。然後就是執行

t = addThread(firstTask);

這個方法也非常關鍵,傳進去的引數為提交的任務,返回值為Thread型別。然後接著在下面判斷t是否為空,為空則表明建立執行緒失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),否則呼叫t.start()方法啟動執行緒。

我們來看一下addThread方法的實現:

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //建立一個執行緒,執行任務    
    if (t != null) {
        w.thread = t;            //將建立的執行緒的引用賦值為w的成員變數        
        workers.add(w);
        int nt = ++poolSize;     //當前執行緒數加1        
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

在addThread方法中,首先用提交的任務建立了一個Worker物件,然後呼叫執行緒工廠threadFactory建立了一個新的執行緒t,然後將執行緒t的引用賦值給了Worker物件的成員變數thread,接著通過workers.add(w)將Worker物件新增到工作集當中。
下面我們看一下Worker類的實現:
private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
	    if (thread != Thread.currentThread())
		thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }

    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,使用者可以根據
            //自己需要過載這個方法和後面的afterExecute方法來進行一些統計資訊,比如某個任務的執行時間等            
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }

    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //當任務佇列中沒有任務時,進行清理工作        
        }
    }
}
既然Worker實現了Runnable介面,那麼自然最核心的方法便是run()方法了:
public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

這裡意思是如果傳遞的task不為空,則立即執行,注意這裡執行完了,並不是該執行緒就結束了,假如一開始的時候我們corePoolSize的大小為5,那麼如果提交了5個任務,那麼每個任務執行 runTask(task)後並沒有結束,而是每個任務都對呼叫task = getTask(),去佇列中取任務,這也是ThreadPoolExecutor提升效率的關鍵地方getTask是ThreadPoolExecutor類中的方法,並不是Worker類中的方法,下面是getTask方法的實現:
Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果執行緒數大於核心池大小或者允許為核心池執行緒設定空閒時間,
            	//則通過poll取任務,若等待一定的時間取不到任務,則返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中斷處於空閒狀態的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

在getTask中,先判斷當前執行緒池狀態,如果runState大於SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。

如果runState為SHUTDOWN或者RUNNING,則從任務快取佇列取任務。

如果當前執行緒池的執行緒數大於核心池大小corePoolSize或者允許為核心池中的執行緒設定空閒存活時間,則呼叫poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。

然後判斷取到的任務r是否為null,為null則通過呼叫workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //如果runState大於等於STOP,或者任務快取佇列為空了
    //或者  允許為核心池執行緒設定空閒存活時間並且執行緒池中的執行緒數目大於1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是線上程池中的執行緒數達到了核心池大小並且往任務佇列中新增任務失敗的情況下執行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。

  到這裡,大部分朋友應該對任務提交給執行緒池之後到被執行的整個過程有了一個基本的瞭解,下面總結一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含義;

  2)其次,要知道Worker是用來起到什麼作用的;

  3)要知道任務提交給執行緒池之後的處理策略,這裡總結一下主要有4點:

  • 如果當前執行緒池中的執行緒數目小於corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務;
  • 如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若新增成功,則該任務會等待空閒執行緒將其取出去執行;若新增失敗(一般來說是任務快取佇列已滿),則會嘗試建立新的執行緒去執行這個任務(當corePoolSize小於maximumPoolSize時候才會建立新的執行緒);
  • 如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
  • 如果執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止,直至執行緒池中的執行緒數目不大於corePoolSize;如果允許為核心池中的執行緒設定存活時間,那麼核心池中的執行緒空閒時間超過keepAliveTime,執行緒也會被終止。
我們在舉個例子說明下,new ThreadPoolExecutor(5, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));加入我們建立了一個這樣的執行緒池,請問,該執行緒池最小可以容納多少執行緒?注意最小,我們可以假設執行緒線上程池中駐留的時間比較長。該例子中,
corePoolSize = 5;maximumPoolSize = 10;阻塞佇列的大小為10。假設我們提交的執行緒數為x。
1、當x<=5時候,addIfUnderCorePoolSize函式將任務提交併執行,執行完任務後不結束(沒有設定keepAliveTime),會一直從佇列中取任務
2、當x>5<=15時候,任務被提交的到佇列,等待執行,此時執行緒池中容納的執行緒為5+10=15;
3、當x>15<=20時候,因為在提交任務,佇列已經滿了提交不上,當此時的corePoolSize=5<maximumPoolSize=10,所以此時我們還可以提交5個任務,那麼總的提交的任務數為20個
4,、當x>20時,會採取拒絕策略。
一般化一下:corePoolSize=a,maximumPoolSize=b,佇列大小為c,那麼執行緒池的總容量可以最小為a+c+(b-a)=b+c
補充:

任務快取佇列及排隊策略

在前面我們多次提到了任務快取佇列,即workQueue,它用來存放等待執行的任務。

  workQueue的型別為BlockingQueue<Runnable>,通常可以取下面三種類型:

  1)ArrayBlockingQueue:基於陣列的先進先出佇列,此佇列建立時必須指定大小;

  2)LinkedBlockingQueue:基於連結串列的先進先出佇列,如果建立時沒有指定此佇列大小,則預設為Integer.MAX_VALUE;

  3)synchronousQueue:這個佇列比較特殊,它不會儲存提交的任務,而是將直接新建一個執行緒來執行新來的任務。

任務拒絕策略

 workQueue的型別為BlockingQueue<Runnable>,通常可以取下面四種類型:

ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務 

執行緒池的關閉

  ThreadPoolExecutor提供了兩個方法,用於執行緒池的關閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會立即終止執行緒池,而是要等所有任務快取佇列中的任務都執行完後才終止,但再也不會接受新的任務
  • shutdownNow():立即終止執行緒池,並嘗試打斷正在執行的任務,並且清空任務快取佇列,返回尚未執行的任務

執行緒池容量的動態調整

  ThreadPoolExecutor提供了動態調整執行緒池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設定核心池大小
  • setMaximumPoolSize:設定執行緒池最大能建立的執行緒數目大小

  當上述引數從小變大時,ThreadPoolExecutor進行執行緒賦值,還可能立即建立新的執行緒來執行任務。

例子:
package com.sunny.thread.threadpool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test {
	 public static void main(String[] args) {    
		 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, 
				 new ArrayBlockingQueue<Runnable>(5));
		 Test t = new Test();
		 for(int i=0;i<15;i++){
			 MyTask myTask = t.new MyTask(i);
			 executor.execute(myTask);
			 System.out.println("執行緒池中執行緒數目:"+executor.getPoolSize()+",佇列中等待執行的任務數目:"+
			 executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
		 }
		 executor.shutdown();
	 } 
	 class MyTask implements Runnable {
			private int taskNum;
			
			public MyTask(int num) {
				this.taskNum = num;
			}
			
			@Override
			public void run() {
				System.out.println("正在執行task "+taskNum);
				try {
					Thread.currentThread().sleep(4000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println("task "+taskNum+"執行完畢");
			}
		}
}
結果:
正在執行task 0
執行緒池中執行緒數目:1,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
執行緒池中執行緒數目:2,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 1
執行緒池中執行緒數目:3,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 2
執行緒池中執行緒數目:4,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 3
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:0,已執行玩別的任務數目:0
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:1,已執行玩別的任務數目:0
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:2,已執行玩別的任務數目:0
正在執行task 4
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:3,已執行玩別的任務數目:0
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:4,已執行玩別的任務數目:0
執行緒池中執行緒數目:5,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
執行緒池中執行緒數目:6,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 10
執行緒池中執行緒數目:7,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 11
執行緒池中執行緒數目:8,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 12
執行緒池中執行緒數目:9,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
執行緒池中執行緒數目:10,佇列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 13
正在執行task 14
task 0執行完畢
正在執行task 5
task 1執行完畢
正在執行task 6
task 3執行完畢
正在執行task 7
task 2執行完畢
正在執行task 8
task 10執行完畢
正在執行task 9
task 11執行完畢
task 12執行完畢
task 14執行完畢
task 4執行完畢
task 13執行完畢
task 6執行完畢
task 5執行完畢
task 7執行完畢
task 8執行完畢
task 9執行完畢

參考文章:

http://www.cnblogs.com/dolphin0520/p/3932921.html

相關推薦

JAVA併發程式設計執行Executors

Java中對執行緒池提供了很好的支援,有了執行緒池,我們就不需要自已再去建立執行緒。如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。JAVA的執行緒池中的執行緒可以在執

Java併發(二十一)執行實現原理 Java併發(十八)阻塞佇列BlockingQueue Java併發(十八)阻塞佇列BlockingQueue Java併發程式設計執行的使用

一、總覽 執行緒池類ThreadPoolExecutor的相關類需要先了解:  (圖片來自:https://javadoop.com/post/java-thread-pool#%E6%80%BB%E8%A7%88) Executor:位於最頂層,只有一個 execute(Runnab

Java併發程式設計執行的使用

如果併發的執行緒數量很多,並且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁建立執行緒就會大大降低系統的效率,因為頻繁建立執行緒和銷燬執行緒需要時間。   那麼有沒有一種辦法使得執行緒可以複用,就是執行完一個任務,並不被銷燬,而是可以繼續執行其他的任務?   在J

Java併發程式設計執行的使用(轉載)

轉載自:https://www.cnblogs.com/dolphin0520/p/3932921.html Java併發程式設計:執行緒池的使用   在前面的文章中,我們使用執行緒的時候就去建立一個執行緒,這樣實現起來非常簡便,但是就會有一個問題:   如果併發的執行緒數量很多,並且每個執行緒都是執行

JAVA併發程式設計執行 ThreadPoolExecutor

生活 前期追深度,否則會華而不實,後期追廣度,否則會坐井觀天; 前言 在前面,我們已經對Thread有了比較深入的瞭解,並且已經學會了通過new Thread()來建立一個執行緒,並通過start方法來啟動一個執行緒,這種方法非常簡單,同樣也存在弊端: 1、每次通過new Thr

Java併發程式設計執行ThreadPoolExecutor

  多執行緒的程式的確能發揮多核處理器的效能。雖然與程序相比,執行緒輕量化了很多,但是其建立和關閉同樣需要花費時間。而且執行緒多了以後,也會搶佔記憶體資源。如果不對執行緒加以管理的話,是一個非常大的隱患。而執行緒池的目的就是管理執行緒。當你需要一個執行緒時,你就可以拿一個空閒執行緒去執行任務,當任務執行完後,

Java併發程式設計執行

這裡首先介紹了java5中的併發的小工具包:java.util.concurrent.atomic,然後介紹了執行緒池的概念,對使用java5的方式建立不同形式的執行緒進行了演示,之後介紹了兩個 物件:Callable和Future,用於獲取執行緒執行後的結果,

java併發程式設計一一執行原理分析(三)

合理的設定執行緒池的大小 接著上一篇探討執行緒留下的尾巴。如果合理的設定執行緒池的大小。 要想合理的配置執行緒池的大小、首先得分析任務的特性,可以從以下幾個角度分析: 1、任務的性質:CPU密集型任務、IO密集型任務、混合型任務等; 2、任務的優先順序:高、中、低; 3、任務的執行時

java併發程式設計一一執行原理分析(二)

2、執行緒池 1、什麼是執行緒池 Java中的執行緒池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用執行緒池。 在開發工程中,合理的使用執行緒池能夠帶來3個好處。 第一:降低資源的消耗,通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗 第二:提

java併發程式設計一一執行原理分析(一)

1、併發包 1、CountDownLatch(計數器) CountDownLatch 類位於 java.util.concurrent 包下,利用它可以實現類似於計數器的功能。 比如有一個任務A,它要等待其他4個任務執行完成之後才能執行,此時就可以利用CountDownLatch

Java併發程式設計執行(三)

一.介紹 Java通過Executors提供四種執行緒池,分別為: (1)newCachedThreadPool:建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。 (2)newFixedThreadPool: 建立一個定長執行緒池,可控制

17-Java併發程式設計執行間協作的兩種方式wait、notify、notifyAll和Condition

Java併發程式設計:執行緒間協作的兩種方式:wait、notify、notifyAll和Condition   在前面我們將了很多關於同步的問題,然而在現實中,需要執行緒之間的協作。比如說最經典的生產者-消費者模型:當佇列滿時,生產者需要等待佇列有空間才能繼續往裡面放

Java併發程式設計執行安全和ThreadLocal

執行緒安全的概念:當多個執行緒訪問某一個類(物件或方法)時,這個類始終都能表現出正確的行為,那麼這個類(物件或方法)就是執行緒安全的。 執行緒安全 說的可能比較抽象,下面就以一個簡單的例子來看看什麼是執行緒安全問題。 public class MyThread

Java 併發程式設計執行間的協作(wait/notify/sleep/yield/join)

Java併發程式設計系列: 一、執行緒的狀態    Java中執行緒中狀態可分為五種:New(新建狀態),Runnable(就緒狀態),Running(執行狀態),Blocked(阻塞狀態),Dead(死亡狀態)。   New:新建狀態,當執行緒建立完成時為新建狀態,即

Java併發程式設計執行間協作的兩種方式wait、notify、notifyAll和Condition

在前面我們將了很多關於同步的問題,然而在現實中,需要執行緒之間的協作。比如說最經典的生產者-消費者模型:當佇列滿時,生產者需要等待佇列有空間才能繼續往裡面放入商品,而在等待的期間內,生產者必須釋放對臨界資源(即佇列)的佔用權。因為生產者如果不釋放對臨界資源的佔用權,那麼消費者

Java併發程式設計執行的生命週期是個怎樣的過程?

前言 在日常開發過程中,如果我們需要執行一些比較耗時的程式的話,一般來說都是開啟一個新執行緒,把耗時的程式碼放線上程裡,然後開啟執行緒執行。但執行緒是會耗費系統資源的,如果有多個執行緒同時執行,互相之間搶佔系統資源,那無疑會對系統造成極大的壓力。所以,怎麼操作執行緒,保證不影響整個應用功能是很重要的,而這就

java併發程式設計執行

前言 本文介紹幾種java常用的執行緒池如:FixedThreadPool,ScheduledThreadPool,CachedThreadPool等執行緒池,並分析介紹Executor框架,做到“知其然”:會用執行緒池,正確使用執行緒池。並且“知其所以然”:

Java併發程式設計執行、Callable和Future使用

知識儲備 收藏幾篇好文章: 目錄結構 Callable和Future使用 執行緒池使用 Callable和Future使用 多執行緒實現方式很多,分為兩類:1、沒有返回值的;2、有返回值的。 針對“沒有返回值的”這類可以參

java併發程式設計執行原理分析及ThreadPoolExecutor原始碼實現

執行緒池簡介:  多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。         假設一個伺服器完成一項任務所需時間為:T1 建立執行緒時間,T2 線上程中執行任務的時間,T3 銷燬執行緒時間。    

JAVA 併發程式設計-基於執行設計的ScheduledExecutor(八)

    上篇部落格《JAVA 併發程式設計-執行緒池(七)》中曾介紹到newScheduledThreadPool(intcorePoolSize),建立corePoolSize大小的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。    接下來我們一起來分析一下Jav