Java併發(4)深入分析java執行緒池框架及實現原理(一)
阿新 • • 發佈:2019-02-01
先說說我個人對執行緒池的理解:執行緒池顧名思義是一個裝有很多執行緒的池子,這個池子維護著從執行緒建立到銷燬的怎個生命週期以及執行緒的分配,使用者只需要把任務提交給這個執行緒池而不用去關心執行緒池如何建立執行緒,執行緒池會自己給這些任務分配執行緒資源來完成任務。
java的Executor執行緒池框架類圖大致如下:
- Executor:執行者,java執行緒池框架的最上層父介面,地位類似於spring的BeanFactry、集合框架的Collection介面,在Executor這個介面中只有一個execute方法,該方法的作用是向執行緒池提交任務並執行。
- ExecutorService:該介面繼承自Executor介面,添加了shutdown、shutdownAll、submit、invokeAll等一系列對執行緒的操作方法,該介面比較重要,在使用執行緒池框架的時候,經常用到該介面。
- AbstractExecutorService:這是一個抽象類,實現ExecuotrService介面,
- ThreadPoolExecutor:這是Java執行緒池最核心的一個類,該類繼承自AbstractExecutorService,主要功能是建立執行緒池,給任務分配執行緒資源,執行任務。
- ScheduledExecutorSerivce 和 ScheduledThreadPoolExecutor 提供了另一種執行緒池:延遲執行和週期性執行的執行緒池。
- Executors:這是一個靜態工廠類,該類定義了一系列靜態工廠方法,通過這些工廠方法可以返回各種不同的執行緒池。
要深入分析這個執行緒池框架,我們先從使用者的角度入手進行分析,要使用這個框架首先得有個可執行的執行緒任務(實現了Runnable或Callable介面的類):
public class Task implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName()+"---"+i); } } }然後通過靜態工廠類Executors產生一個執行緒池,然後將這個任務提交給執行緒池。
public class TaskTest { public static void main(String[] args){ ExecutorService exec = Executors.newFixedThreadPool從結果中可以看到,雖然提交了五個任務,但是始終是由三個執行緒執行的。(3); // 核心執行緒為3的執行緒池,最大執行緒為3的執行緒池,池子中只會有三個執行緒 // ExecutorService exec = Executors.newCachedThreadPool(); // 核心執行緒為0,最大執行緒為Integer.MAX_VALUE的執行緒池 // ExecutorService exec = Executors.newScheduledThreadPool(3); // 核心執行緒為3的延遲執行緒池 // ExecutorService exec = Executors.newSingleThreadExecutor(); // 相當於Executors.newFixedThreadPool(1); for (int i = 0; i < 5; i++) { exec.execute(new Task());// 提交五個任務給執行緒池 } } }
在以上的使用中我們通過靜態工廠類產生了一種型別的執行緒池,辣麼,就從這個Executors入手進行分析java的這個執行緒池框架,先看看Execuotrs中幾種執行緒池的靜態工廠方法:
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
3.newCachedThreadPool
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }對於每一種的執行緒池工廠方法都可以在引數處傳入一個ThreadFactroy物件,用來構造執行緒,執行緒池將按照你的ThreadFactroy中的newThread方法來產生執行緒,如果不傳入這個引數,預設使用Executors的靜態內部類DefaultThreadFactroy作為執行緒生成策略。 DefaultThreadFactroy原始碼如下:
/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }預設的執行緒工廠會給執行緒分好組然後給執行緒設定名字,設定執行緒為非守護執行緒,並且將執行緒優先順序設為Thread.NORM_PRIORITY (該值為5,預設執行緒優先順序都是5) 從上面的三種執行緒池的靜態工廠方法中可以看出,通過在new ThreadPoolExecutor的時候傳入不同的引數就可以得到不同的執行緒池了,下面先看看ThreadPoolExecutor構造方法中的各個引數及含義:
public ThreadPoolExecutor(int corePoolSize, //核心執行緒數 int maximumPoolSize, //最大執行緒數 long keepAliveTime, //非核心執行緒存活時間 TimeUnit unit, //時間單位 BlockingQueue<Runnable> workQueue, //排隊執行緒的阻塞佇列 ThreadFactory threadFactory, //執行緒工廠 RejectedExecutionHandler handler) { //當前執行緒已達到最大執行緒數,且阻塞佇列已滿時的異常處理策略}對於以上各引數的理解,可以舉一個栗子進行講解: 我們可以自定義一個 核心執行緒數為1,最大執行緒數為2,非核心閒置執行緒存活時間為100毫秒,阻塞佇列大小為2的執行緒池如下:
ExecutorService executorService = new ThreadPoolExecutor( 1, //核心執行緒 2, //最大執行緒 100, //閒置執行緒存活時間 TimeUnit.MILLISECONDS, //時間單位:毫秒 new ArrayBlockingQueue<Runnable>(2), //大小為3的阻塞佇列 Executors.defaultThreadFactory(), //使用Executors類的預設執行緒工廠 new RejectedExecutionHandler() { //new 一個匿名內部類實現RejectedExecutionHandler介面,作為異常處理策略 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r instanceof Task){ System.out.println( "task:" + ((Task) r).taskId + " is Handel!"); try { TimeUnit.MILLISECONDS.sleep(51); } catch (InterruptedException e) { e.printStackTrace(); } } } } );建立一個任務,每個任務有一個自己的id:
public class Task implements Runnable { private static AtomicInteger number = new AtomicInteger(0); public int taskId = number.addAndGet(1); @Override public void run() { System.out.println(Thread.currentThread().getName() + " task:" + taskId + " start..."); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " task:" + taskId + " end..."); } }接下來向這個執行緒池提交10個任務:
for (int i = 0; i < 10; i++) { executorService.execute(new Task());// 提交十個任務給執行緒池 }executorService.shutdown();
System.out.println("main is over");先看看執行結果: pool-1-thread-1 task:1 start...
pool-1-thread-2 task:4 start...
task:5 is Handel!
task:6 is Handel!
pool-1-thread-1 task:1 end...
pool-1-thread-2 task:4 end...
pool-1-thread-1 task:2 start...
pool-1-thread-2 task:3 start...
task:9 is Handel!
task:10 is Handel!
pool-1-thread-1 task:2 end...
pool-1-thread-2 task:3 end...
pool-1-thread-1 task:7 start...
pool-1-thread-2 task:8 start...
main is over
pool-1-thread-1 task:7 end...
pool-1-thread-2 task:8 end...
結果中: poo-1表示那個執行緒池,-thread-1表示哪一個執行緒在執行任務,task:1表示哪一個任務被執行, start... end...表示任務開和結束
分析一下執行過程:- 在for迴圈中向執行緒池提交第一個任務的時候由於執行緒池中還沒有任何執行緒,並且給執行緒池設定的核心執行緒數為1,所以,這時候執行緒池會通過ThreadFactroy建立一個核心執行緒1並用這個執行緒執行任務1.
- 在向執行緒池提交任務2時,由於執行緒池中核心執行緒數已經用完(Task任務會sleep 100毫秒,所以這時任務1還未結束,執行緒1還在被使用),阻塞佇列還沒滿,所以向阻塞佇列裡面新增任務2,,任務3同理進入阻塞佇列
- 在向執行緒池提交任務4時,由於執行緒池中核心執行緒數已經用完,阻塞佇列已經滿了,但執行緒池中執行緒數還未達到最大執行緒數,所以建立非核心執行緒2,並用這個執行緒執行任務4.
- 在向執行緒池中提交任務5時,由於此時執行緒池中,執行緒數已經達到最大值,並且所有執行緒都在被使用,並且阻塞佇列也已經滿了,所以此時執行緒池就會回撥你傳入的RejectedExecutionHandler的rejectedExecution方法作為異常處理策略處理任務5(該方法其實是在主執行緒中處理的,在該策略中我讓執行緒sleep 51秒,以便用該策略處理兩個Task之後,核心執行緒1執行的任務已經結束),任務6同理會被該策略處理。
- 在向執行緒池中提交第7個任務前2毫秒(理論值),此時,核心執行緒1、非核心執行緒2的任務1、任務4已經執行完成,當核心執行緒1執行完任務之後,發現阻塞佇列中還有任務在排隊等待,這時根據佇列先進先出依次取出佇列中的任務2和任務3分配給兩個執行緒,所以當任務7、任務8提交的時候發現阻塞佇列還沒滿,會依次被執行緒池新增進阻塞佇列.
- 在向執行緒池提交任務9、任務10的時候同第4步原理,會被異常處理策略處理。
- 至此,所有任務都被提交,接著任務2、任務3結束,阻塞佇列中的任務7、任務8被執行,然後任務7、任務8結束,程式結束!