1. 程式人生 > >java多執行緒系列:Executors框架

java多執行緒系列:Executors框架

  1. Executor介面介紹
  2. ExecutorService常用介面介紹
  3. 建立執行緒池的一些方法介紹
    • 3.1 newFixedThreadPool方法
    • 3.2 newCachedThreadPool方法
    • 3.3 newScheduledThreadPool方法
  4. 疑問解答
    • 4.1. Runable介面和Callable介面

Executor介面介紹

Executor是一個介面,裡面提供了一個execute方法,該方法接收一個Runable引數,如下

public interface Executor {
    voidexecute(Runnable command);
}

Executor框架的常用類和介面結構圖

Executor框架的常用類和介面結構圖

執行緒物件及執行緒執行返回的物件

執行緒物件及執行緒執行返回的物件

執行緒物件

執行緒物件就是提交給執行緒池的任務,可以實現Runable介面或Callable介面。或許這邊會產生一個疑問,為什麼Runable介面和Callable介面沒有任何關聯,卻都能作為任務來執行?大家可以思考下,文章的結尾會對此進行說明

Future介面

Future介面和FutureTask類是用來接收執行緒非同步執行後返回的結果,可以看到下方ExecutorService介面的submit方法返回的就是Future。

ExecutorService常用介面介紹

接下來我們來看看繼承了Executor介面的ExecutorService

public interface ExecutorService
extends Executor { //正常關閉(不再接收新任務,執行完佇列中的任務) voidshutdown(); //強行關閉(關閉當前正在執行的任務,返回所有尚未啟動的任務清單) List<Runnable> shutdownNow(); booleanisShutdown(); booleanisTerminated(); <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result)
; Future<?> submit(Runnable task); ... }

ThreadPoolExecutor建構函式介紹

在介紹穿件執行緒池的方法之前要先介紹一個類ThreadPoolExecutor,應為Executors工廠大部分方法都是返回ThreadPoolExecutor物件,先來看看它的建構函式吧

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {...}

引數介紹

引數型別含義
corePoolSizeint核心執行緒數
maximumPoolSizeint最大執行緒數
keepAliveTimelong存活時間
unitTimeUnit時間單位
workQueueBlockingQueue存放執行緒的佇列
threadFactoryThreadFactory建立執行緒的工廠
handlerRejectedExecutionHandler多餘的的執行緒處理器(拒絕策略)

建立執行緒池的一些方法介紹

為什麼要講ExecutorService介面呢?是因為我們使用Executors的方法時返回的大部分都是ExecutorService。
Executors提供了幾個建立執行緒池方法,接下來我就介紹一下這些方法

newFixedThreadPool(int nThreads)
建立一個執行緒的執行緒池,若空閒則執行,若沒有空閒執行緒則暫緩在任務佇列中。

newWorkStealingPool()
建立持有足夠執行緒的執行緒池來支援給定的並行級別,並通過使用多個佇列,減少競爭,它需要穿一個並行級別的引數,如果不傳,則被設定為預設的CPU數量。

newSingleThreadExecutor()
該方法返回一個固定數量的執行緒池  
該方法的執行緒始終不變,當有一個任務提交時,若執行緒池空閒,則立即執行,若沒有,則會被暫緩在一個任務佇列只能怪等待有空閒的執行緒去執行。

newCachedThreadPool() 
返回一個可根據實際情況調整執行緒個數的執行緒池,不限制最大執行緒數量,若有空閒的執行緒則執行任務,若無任務則不建立執行緒,並且每一個空閒執行緒會在60秒後自動回收。

newScheduledThreadPool(int corePoolSize)
返回一個SchededExecutorService物件,但該執行緒池可以設定執行緒的數量,支援定時及週期性任務執行。
 
newSingleThreadScheduledExecutor()
建立一個單例執行緒池,定期或延時執行任務。  
 

下面講解下幾個常用的方法,建立單個的就不說明了

newFixedThreadPool方法

該方法建立指定執行緒數量的執行緒池,沒有限制可存放的執行緒數量(無界佇列),適用於執行緒任務執行較快的場景。

FixedThreadPool的execute()的執行示意圖

看看Executors工廠內部是如何實現的

publicstatic ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

可以看到返回的是一個ThreadPoolExecutor物件,核心執行緒數和是最大執行緒數都是傳入的引數,存活時間是0,時間單位是毫秒,阻塞佇列是無界佇列LinkedBlockingQueue。

由於佇列採用的是無界佇列LinkedBlockingQueue,最大執行緒數maximumPoolSize和keepAliveTime都是無效引數,拒絕策略也將無效,為什麼?

這裡又延伸出一個問題,無界佇列說明任務沒有上限,如果執行的任務比較耗時,那麼新的任務會一直存放線上程池中,執行緒池的任務會越來越多,將會導致什麼後果?下面的程式碼可以試試

public class Main {

    publicstaticvoidmain(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

        while (true){
            pool.submit(new Runnable() {
                @Override
                publicvoidrun() {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

    }
}

示例程式碼

public class Main {

    publicstaticvoidmain(String[] args){
        ExecutorService pool = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit(() -> {
                try {
                    System.out.println("任務"+ finalI +":開始等待2秒,時間:"+LocalTime.now()+",當前執行緒名:"+Thread.currentThread().getName());
                    Thread.sleep(2000);
                    System.out.println("任務"+ finalI +":結束等待2秒,時間:"+LocalTime.now()+",當前執行緒名:"+Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

        }
        pool.shutdown();
    }
}

輸出結果

任務4:開始等待2秒,時間:17:13:22.048,當前執行緒名:pool-1-thread-4
任務2:開始等待2秒,時間:17:13:22.048,當前執行緒名:pool-1-thread-2
任務3:開始等待2秒,時間:17:13:22.048,當前執行緒名:pool-1-thread-3
任務1:開始等待2秒,時間:17:13:22.048,當前執行緒名:pool-1-thread-1

任務2:結束等待2秒,時間:17:13:24.048,當前執行緒名:pool-1-thread-2
任務3:結束等待2秒,時間:17:13:24.048,當前執行緒名:pool-1-thread-3
任務1:結束等待2秒,時間:17:13:24.048,當前執行緒名:pool-1-thread-1
任務4:結束等待2秒,時間:17:13:24.048,當前執行緒名:pool-1-thread-4
任務6:開始等待2秒,時間:17:13:24.049,當前執行緒名:pool-1-thread-4
任務7:開始等待2秒,時間:17:13:24.049,當前執行緒名:pool-1-thread-1
任務5:開始等待2秒,時間:17:13:24.049,當前執行緒名:pool-1-thread-3
任務8:開始等待2秒,時間:17:13:24.049,當前執行緒名:pool-1-thread-2

任務5:結束等待2秒,時間:17:13:26.050,當前執行緒名:pool-1-thread-3
任務7:結束等待2秒,時間:17:13:26.050,當前執行緒名:pool-1-thread-1
任務8:結束等待2秒,時間:17:13:26.051,當前執行緒名:pool-1-thread-2
任務6:結束等待2秒,時間:17:13:26.050,當前執行緒名:pool-1-thread-4

可以看出任務1-4在同一時間執行,在2秒後執行完畢,同時開始執行任務5-8。說明方法內部只建立了4個執行緒,其他任務存放在佇列中等待執行。

newCachedThreadPool方法

newCachedThreadPool方法建立的執行緒池會根據需要自動建立新執行緒。

CachedThreadPool的execute()的執行示意圖

看看Executors工廠內部是如何實現的

publicstatic ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newCachedThreadPool方法也是返回ThreadPoolExecutor物件,核心執行緒是0,最大執行緒數是Integer的最MAX_VALUE,存活時間是60,時間單位是秒,SynchronousQueue佇列。

從傳入的引數可以得知,在newCachedThreadPool方法中的空閒執行緒存活時間時60秒,一旦超過60秒執行緒就會被終止。這邊還隱含了一個問題,如果執行的執行緒較慢,而提交任務的速度快於執行緒執行的速度,那麼就會不斷的建立新的執行緒,從而導致cpu和記憶體的增長。

程式碼和newFixedThreadPool一樣迴圈新增新的執行緒任務,我的電腦執行就會出現如下錯誤

An unrecoverable stack overflow has occurred.

Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.www.078881.cn  java:714)
    at java.util.www.wmyl11.com  concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
    at java.util.concurrent.AbstractExecutorService.submit(www.qinlinyule.cn AbstractExecutorService.www.leyou2.net/  java:112)
    at com.learnConcurrency.executor.cachedThreadPool.Main.main(Main.java:11)
Process finished with exit code -1073741571 www.mcyllpt.com (0xC00000FD)

關於SynchronousQueue佇列,它是一個沒有容量的阻塞佇列,任務傳遞的示意圖如下

CachedThreadPool的任務傳遞示意圖

示例程式碼

public class Main {
    publicstaticvoidmain(String[] args) throws Exception{
        ExecutorService pool = Executors.newCachedThreadPool();
        for (int i = 0; i < 8; i++) {
            int finalI = i + 1;
            pool.submit((www.yongxinzaixian.cn) -> {
                try {
                    System.out.println("任務"+ finalI +":開始等待60秒,時間:"+LocalTime.now()+",當前執行緒名:"+Thread.currentThread().getName());
                    Thread.sleep(60000);
                    System.out.println("任務"+ finalI +":結束等待60秒,時間:"+LocalTime.now()+",當前執行緒名:"+Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            //睡眠10秒
            Thread.sleep(10000);
        }
        pool.shutdown();
    }
}

執行結果

任務1:開始等待60秒,時間:17:15:21.570,當前執行緒名:pool-1-thread-1
任務2:開始等待60秒,時間:17:15:31.553,當前執行緒名:pool-1-thread-2
任務3:開始等待60秒,時間:17:15:41.555,當前執行緒名:pool-1-thread-3
任務4:開始等待60秒,時間:17:15:51.554,當前執行緒名:pool-1-thread-4
任務5:開始等待60秒,時間:17:16:01.554,當前執行緒名:pool-1-thread-5
任務6:開始等待60秒,時間:17:16:11.555,當前執行緒名:pool-1-thread-6
任務7:開始等待60秒,時間:17:16:21.555,當前執行緒名:pool-1-thread-7
任務1:結束等待60秒,時間:17:16:21.570,當前執行緒名:pool-1-thread-1
任務2:結束等待60秒,時間:17:16:31.554,當前執行緒名:pool-1-thread-2

任務8:開始等待60秒,時間:17:16:31.556,當前執行緒名:pool-1-thread-2
任務3:結束等待60秒,時間:17:16:41.555,當前執行緒名:pool-1-thread-3
任務4:結束等待60秒,時間:17:16:51.556,當前執行緒名:pool-1-thread-4
任務5:結束等待60秒,時間:17:17:01.556,當前執行緒名:pool-1-thread-5
任務6:結束等待60秒,時間:17:17:11.555,當前執行緒名:pool-1-thread-6
任務7:結束等待60秒,時間:17:17:21.556,當前執行緒名:pool-1-thread-7
任務8:結束等待60秒,時間:17:17:31.557,當前執行緒名:pool-1-thread-2

示例程式碼中每個任務都睡眠60秒,每次迴圈新增任務睡眠10秒,從執行結果來看,新增的7個任務都是由不同的執行緒來執行,而此時執行緒1和2都執行完畢,任務8新增進來由之前建立的pool-1-thread-2執行。

newScheduledThreadPool方法

這個執行緒池主要用來延遲執行任務或者定期執行任務。

看看Executors工廠內部是如何實現的

publicstatic ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

這裡返回的是ScheduledThreadPoolExecutor物件,我們繼續深入進去看看

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

這裡呼叫的是父類的建構函式,ScheduledThreadPoolExecutor的父類是ThreadPoolExecutor,所以返回的也是ThreadPoolExecutor物件。核心執行緒數是傳入的引數corePoolSize,執行緒最大值是Integer的MAX_VALUE,存活時間時0,時間單位是納秒,佇列是DelayedWorkQueue。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService {}

下面是ScheduledExecutorService的一些方法

public interface ScheduledExecutorService extends ExecutorService {
    //delay延遲時間,unit延遲單位,只執行1次,在經過delay延遲時間之後開始執行
    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    //首次執行時間時然後在initialDelay之後,然後在initialDelay+period 後執行,接著在 initialDelay + 2 * period 後執行,依此類推
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    //首次執行時間時然後在initialDelay之後,然後延遲delay時間執行
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

相關推薦

java執行系列Executors框架

Executor介面介紹ExecutorService常用介面介紹建立執行緒池的一些方法介紹3.1 newFixedThreadPool方法3.2 newCachedThreadPool方法3.3 newScheduledThreadPool方法疑問解答4.1. Runabl

java執行系列通過對戰遊戲學習CyclicBarrier

CyclicBarrier是java.util.concurrent包下面的一個工具類,字面意思是可迴圈使用(Cyclic)的屏障(Barrier),通過它可以實現讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,所有被屏障攔截的執

java執行系列CountDownLatch

這篇文章將介紹CountDownLatch這個同步工具類的基本資訊以及通過案例來介紹如何使用這個工具。 CountDownLatch是java.util.concurrent包下面的一個工具類,可以用來協調多個執行緒之間的同步,或者說起到執行緒之間的通訊(而不

Java執行系列---“JUC原子類”02之 框架

轉自:http://www.cnblogs.com/skywang12345/p/3514589.html   根據修改的資料型別,可以將JUC包中的原子操作類可以分為4類。 1. 基本型別: AtomicInteger, AtomicLong, AtomicBoolean ;2.&

Java執行系列---“JUC鎖”01之 框架

轉自:http://www.cnblogs.com/skywang12345/p/3496098.html(含部分修改)   本章,我們介紹鎖的架構;後面的章節將會對它們逐個進行分析介紹。目錄如下: 01. Java多執行緒系列--“JUC鎖”01之 框架 02. 

java執行系列(一)Thread、Runnable、Callable實現執行的區別

實現多執行緒 java實現多執行緒的方法有三種,分別是繼承thread類,實現runnable介面,實現callable介面(call方法有返回值) /** * 繼承Thread */ public class MyThread extends Thread{ int a = 0;

java執行系列3悲觀鎖和樂觀鎖

1.悲觀鎖和樂觀鎖的基本概念 悲觀鎖: 總是認為當前想要獲取的資源存在競爭(很悲觀的想法),因此獲取資源後會立刻加鎖,於是其他執行緒想要獲取該資源的時候就會一直阻塞直到能夠獲取到鎖; 在傳統的關係型資料庫中,例如行鎖、表鎖、讀鎖、寫鎖等,都用到了悲觀鎖。還有java中的同步關鍵字Synchroniz

Java執行系列--“JUC集合”01之 框架

from:http://www.2cto.com/kf/201401/276021.html Java集合包 在“Java 集合系列01之 總體框架”中,介紹java集合的架構。主體內容包括Collection集合和Map類;而Collection集合又可以劃分為List

java執行系列--"JUC"原子類 01 之框架

根據修改的資料型別,可以將JUC包中的原子操作類可以分為4類 1 基本型別:AtomicInteger,AtomicLong,AtomicBoolean; 2 陣列型別:AtomicIntegerArray,AtomicLongArray,AtomicReferenceA

Java執行執行池和Executors類解析

執行緒池 多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。 假設一個伺服器完成一項任務所需時間為:T1 建立執行緒時間,T2 線上程中執行任務的時間,T3 銷燬執行緒時間。 如果:T1 +

Java執行系列--“JUC原子類”01之 框架

根據修改的資料型別,可以將JUC包中的原子操作類可以分為4類。 基本型別: AtomicInteger,AtomicLong, AtomicBoolean ; 陣列型別: AtomicIntegerArray, AtomicLongArray,Ato

Java執行系列--【JUC集合01】- 框架

參考:http://www.cnblogs.com/skywang12345/p/3498454.html 概要 之前,在""中,講解了Java集合包中的各個類。接下來,將展開對JUC包中的集合進行學習。在學習之前,先溫習一下"Java集合包"。本章內容包括:Jav

Java執行系列--“JUC原子類”03之 AtomicLong原子類

轉自:https://www.cnblogs.com/skywang12345/p/3514593.html(含部分修改) 概要 AtomicInteger, AtomicLong和AtomicBoolean這3個基本型別的原子類的原理和用法相似。本章以AtomicLong對基本型別的原子類進行介紹。內容

Java執行系列---“JUC原子類”04之 AtomicLongArray原子類

轉自:https://www.cnblogs.com/skywang12345/p/3514604.html(含部分修改) 概要 AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray這3個數組型別的原子類的原理和用法相似。本章以AtomicLo

Java執行系列---“JUC原子類”05之 AtomicReference原子類

轉自:http://www.cnblogs.com/skywang12345/p/3514623.html(部分修改) 概要 本章對AtomicReference引用型別的原子類進行介紹。內容包括: AtomicReference介紹和函式列表 AtomicReference原始碼分析(基於J

Java執行系列---“JUC原子類”06之 AtomicLongFieldUpdater原子類

轉自:http://www.cnblogs.com/skywang12345/p/3514635.html (含部分修改) 概要 AtomicIntegerFieldUpdater, AtomicLongFieldUpdater和AtomicReferenceFieldUpdater這3個修改類的成員的原

Java執行系列---“JUC原子類”01之 原子類的實現(CAS演算法)

轉自:https://blog.csdn.net/ls5718/article/details/52563959  & https://blog.csdn.net/mmoren/article/details/79185862(含部分修改)   在JDK 5之前Java語言是靠

JAVA執行(四) Executor併發框架向RabbitMQ推送訊息

github程式碼地址:https://github.com/showkawa/springBoot_2017/tree/master/spb-demo  假設一個需求使用者點選某個頁面,我們後臺需要向MQ推送信資訊 1,模擬的MQ服務,我這邊使用RabbitMQ (關於MQ 傳送和監聽訊息可以

Java執行系列---“JUC鎖”02之 ReentrantLock

轉自:https://www.jianshu.com/p/96c89e6e7e90 & https://blog.csdn.net/Somhu/article/details/78874634 (含部分修改) 一.ReentrantLock鎖 1.Lock介面 Lock,鎖

java執行系列翻譯之java併發/執行教程

原文地址:http://tutorials.jenkov.com/java-concurrency/index.html 以前計算機都是單核,同時只能執行一個程式。之後出現了多重任務處理,這意味著計算機同時可以處理多個程式(又名任務或流程)。但這不是真正的“同時執行”,只是單個CPU被多個程式共