1. 程式人生 > >執行緒(4)-執行緒池

執行緒(4)-執行緒池

目錄

1.1JDK對執行緒池的支援

為了更好的能夠控制多執行緒,JDK提供了一套Executor框架,其本質是一個執行緒池。

                                                                                       上面是Executor的框架圖。

如圖Executors類扮演著執行緒池工廠的角色,通過Executors可以取得一個擁有特定功能的執行緒池。從圖中看到ThreadPoolExecutor類實現了Executor介面,因此通過這個介面任何Runnable的物件都可以呼叫。

Executor提供了以下工廠方法:

  • newFixedThreadPool():該方法返回一個固定執行緒數量的執行緒池。執行緒池裡若有空閒執行緒則立即執行,否則暫存在一個任務佇列中,等待空閒執行緒。
  • newSingleThreadExecutor():該方法返回了一個只有一個執行緒的執行緒池。
  • newCachedThreadPool():該方法返回了一個可根據實際情況調整執行緒數量的執行緒。執行緒池裡若有空閒執行緒則立即執行,否則會建立一個新的執行緒。
  • newSingleThreadScheduleExecutor():該方法返回一個ScheduleExecutorService物件,執行緒池大小為1。ScheduleExecutorService介面在ExecutorService介面之上,擴充套件了在給定時間執行某任務的功能。
  • newScheduleThreadPool():該方法也返回一個ScheduleExecutorService介面,但該執行緒池可以指定執行緒池數量。

ScheduleExecutorService物件主要的一些方法:

  • schedule():會在給定時間,對任務進行一次排程。
  • scheduleAtFixedRate():建立一個週期任務。任務開始於給定初始延時。後續的任務按照給定的週期執行:後續第一個任務會在initialDelay+period時執行,後續第二個任務會在initialDelay+2*period時執行,以此類推。
  • scheduleWithFixedDelay():建立並執行一個週期性任務。任務開始於初始延時時間,後續任務將會按照給定的延時進行,即上一個任務結束到下一個任務開始的時間差進行。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduleExecutorServoceDemo {
	public static void main(String[] args) {
		ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
		ses.scheduleAtFixedRate(new Runnable() {
			public void run() {
				// TODO Auto-generated method stub
				try {
					Thread.sleep(1000);
					System.out.println(System.currentTimeMillis()/1000);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
		}, 1, 2, TimeUnit.SECONDS);
	}
}

 執行結果:

1537947034 1537947036 1537947038 1537947040 1537947042

可以看到這裡時間間隔是2秒。

如果執行時間超過排程時間會發生什麼事情。將上述Thread.sleep(1000);改為

Thread.sleep(3000);

執行結果:

1537947606 1537947609 1537947612 1537947615

發現執行任務的執行週期為3秒。也就是說週期小於執行時間,那麼任務就會在上一個任務結束後立即被呼叫。

如果採用scheduleWithFixedDelay(),執行3秒,排程兩秒,那麼時間間隔為5秒。

1.2核心執行緒池的內部實現

首先了解一下ThreadPoolExecutor最重要的建構函式

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize:指定了執行緒池中的執行緒數量。
  • maximumPoolSize:制定了執行緒池中最大執行緒的數量。
  • keepAliveTime:當執行緒池數量超過corePoolSize時,多餘的空閒執行緒的存活時間。即,超過corePoolSize的空閒執行緒,在多長時間內被銷燬。
  • unit:keepAliveTime的單位。
  • workQueue:任務佇列,被提交但尚未被執行的任務。

workQueue,是一個BlockingQueue介面的物件,僅用於存放Runnable物件。在ThreadPoolExecutor的建構函式中可食用以下幾種BlockingQueue。

  1. 直接提交的佇列:該功能由SynchronousQueue物件提供。SynchronousQueue是一個特殊的BlockingQueue。它沒有容量,每一個插入操作都要等待一個相應的刪除操作,反之每一個刪除操作都要等待一個對應的插入操作。使用SynchronousQueue,提交的任務不會被真實儲存,而總是將新任務交給執行緒執行,如果沒有空閒執行緒,則建立新執行緒,如果執行緒數量達到最大值,則執行拒絕策略。
  2. 有界的任務佇列:該佇列可由ArrayBlockingQueue實現。它的建構函式必須帶一個容量引數。表示該佇列最大容量。有界隊列當在任務執行緒滿的時候才能將執行緒數提升到corePoolSize以上。
  3. 無界的任務佇列:無界的任務佇列可通過LinkedBlockingQueue類實現。與有界任務佇列相比,除非系統資源耗盡,否則不會加入失敗。
  4. 優先任務佇列:優先任務佇列是帶有執行優先順序的任務佇列。他通過PriorityBlockingQueue實現,可以控制任務的執行先後順序。它是一個特殊的無界佇列。
  • threadFactory:用於建立執行緒,一般用預設的即可。

  • handler:拒絕策略。當任務太多來不及處理,如何拒絕任務。

ThreadPoolExecutor的核心排程程式碼

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

程式碼第五行workerCountOf()函式取得了當前執行緒池的執行緒總數。當前執行緒總數小於corePoolSize時,將會通過addWorker()方法直接排程執行。否則在第十行程式碼處(workQueue.offer())進入等待佇列。如果進入等待佇列失敗,則會執行第17行,將任務直接交給執行緒池。如果當前執行緒數已達到maximumPoolSize,則提交失敗,就執行18行的拒絕策略。

                                                         上圖是ThreadPoolExecutor的任務排程邏輯

1.3拒絕策略

JDK內建的拒絕策略如下。

  • AbortPolicy策略:該策略會直接丟擲異常,組織系統工作。
  • CallerRunsPolicy策略:只要執行緒池關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。顯然這樣做不會真的丟棄任務,但是任務提交的效能會急劇下降。
  • DiscardOldestPolicy策略:該策略丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。
  • DiscardPolicy策略:該策略默默丟棄無法處理的任務,不予任何處理。