Java粗淺認識-併發程式設計(五)-執行緒池
執行緒池
先來總攬一下執行緒池結構
以上是執行緒池結構,常用的工具java.util.concurrent.Executors
結構如下
在Executors中常用的方法
Executors.newCachedThreadPool()
建立執行緒池核心poolSize = 0,最大poolSize=Integer.MAX_VALUE,執行緒任務執行完後,如果沒有新任務,會在60s後被回收,使用SynchronousQueue佇列做阻塞佇列,佇列容量是0,每次新增時,必須阻塞等待刪除,每次刪除都要等待新增,對任務工作時間比較小的任務,效率很高,對執行緒的影響如果一直有任務新增,並且正在工作的執行緒都沒有結束,會一直new新執行緒出來執行,執行緒數量上限是Integer.MAX_VALUE.,預設拒絕策略,AbortPolicy,直接拋錯,結束當前執行緒。
例項程式碼
private static void newCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executorService.execute(new Task());
}
executorService.shutdown();
}
Executors.newFixedThreadPool(5)
固定執行緒池大小的執行緒,核心poolSize=maxPoolSize,執行緒一旦啟動,就不會被回收,如果其中某個執行緒掛掉了,會新啟動一個執行緒來填充,使用LinkedBlockingQueue阻塞佇列為等待佇列,如果執行緒全都在執行,那麼新新增進來的任務全部會放到阻塞佇列中去,最大值是Integer.MAX_VALUE,預設拒絕策略,AbortPolicy,直接拋錯,結束當前執行緒。
例項程式碼
private static void newFixedThreadPool() { ExecutorService executorService = Executors.newFixedThreadPool(5); for (int i = 0; i < 100; i++) { executorService.execute(new Task()); } executorService.shutdown(); }
Executors.newScheduledThreadPool(5)
編排任務的執行緒池,定時執行,必須初始化一個核心執行緒數量大小,最大執行緒數Integer.MAX_VALUE,執行緒一旦new出來就不會回收,使用DelayedWorkQueue任務編排的阻塞佇列,預設拒絕策略,AbortPolicy,直接拋錯,結束當前執行緒。
示例程式碼,有三種類型,
第一種是直接延遲一定時間後,執行一次任務;
第二種,在延遲一定時間後,第一次執行,後續執行等待前一個執行緒執行完畢後再延遲一定時候後執行;
第三種,在延遲一定時候後,第一次執行,後續執行等待前一個任務執行開始的時間計時延遲,如果已經到了就執行,如果沒到等待時刻到來執行。
/**
* 在initialDelay時間延遲後第一次執行,之後以上一個任務開始時間點開始計算,每間隔delay後執行一次,如果delay已經到了
* 則立即執行,如果沒有到,等待時刻到來再執行
*/
private static void scheduleAtFixedRate() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(new Task(), 5, 10, TimeUnit.SECONDS);
// executorService.shutdown();
}
/**
* 在initialDelay時間延遲後第一次執行,之後以上一個任務結束時間點開始計算,每間隔delay後執行一次
*/
private static void scheduleWithFixedDelay() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleWithFixedDelay(new Task(), 5, 10, TimeUnit.SECONDS);
// executorService.shutdown();
}
/**
* 延遲一定時間後執行
*/
private static void scheduledDelayThreadPool() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.schedule(new Task(), 5, TimeUnit.SECONDS);
executorService.shutdown();
}
Executors.newSingleThreadExecutor()
單執行緒的執行緒池,如果在任務時,執行緒掛掉了,會立即新啟一個執行緒,最大執行緒1,核心執行緒1,阻塞佇列LinkedBlockingQueue,不會回收執行緒,預設拒絕策略,AbortPolicy,直接拋錯,結束當前執行緒。
/**
* 單執行緒的執行緒池,如果在任務時,執行緒掛掉了,會立即新啟一個執行緒
*/
private static void newSingleThreadExecutor() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Task());
executorService.shutdown();
}
Executors.newWorkStealingPool() java 1.8
1.8新增的執行緒池
public static void newWorkStealingPool() throws InterruptedException {
//forkJoin
ExecutorService forkJoinPool = Executors.newWorkStealingPool();
List<CallableTask> callableTaskList = new ArrayList<>();
callableTaskList.add(new CallableTask());
callableTaskList.add(new CallableTask());
callableTaskList.add(new CallableTask());
List<Future<Integer>> futures = forkJoinPool.invokeAll(callableTaskList);
futures.forEach(integerFuture -> {
try {
System.out.println(integerFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
這個例項不太好,下次想好了,再改哈,根據上面的層級關係圖,forkJoin不屬於ThreadPoolExecutor,接下來我們看看ForkJoinPool執行緒池的實現初始化時,執行緒數是核心數,非同步,異常處理為null
在使用ThreadPoolExecutor時,建議自己建立,不使用Executors工具類建立。用Executors裡面對拒絕任務新增時很不友好,而使用自己建立ThreadPoolExecutor時,可以自己重寫拒絕策略。
示例程式碼
private static void threadPoolExecutor() {
ExecutorService executorService = new ThreadPoolExecutor(4, 16, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 << 4),new MyRejectedExecutionHandler());
}
public static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("需要日誌儲存的訊息。" + r.toString() + executor.toString());
}
}
優先順序佇列線上程池中的使用
模擬VIP使用者,VIP等級高的優先執行。
public static class VIP implements Comparable<VIP>, Runnable {
private String name;
private int vipLevel;
public VIP(String name, int vipLevel) {
this.name = name;
this.vipLevel = vipLevel;
}
@Override
public int compareTo(VIP o) {
return Integer.compare(o.vipLevel, this.vipLevel);
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " " + name + " 執行。vip等級" + vipLevel);
}
}
private static void threadPoolExecutor() {
PriorityBlockingQueue<Runnable> priorityBlockingQueue = new PriorityBlockingQueue();
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("VIP 執行緒-%d").build();
ExecutorService executorService =
new ThreadPoolExecutor(4,
16,
10,
TimeUnit.SECONDS,
priorityBlockingQueue,
threadFactory,
new MyRejectedExecutionHandler());
for (int i = 0; i < 100; i++) {
int vipLevel = i % 2;
executorService.execute(new VIP("使用者" + i, vipLevel));
}
}
總結
本章節對執行緒池的處理進行了講解,需要多加練習。