1. 程式人生 > >spring執行緒池ThreadPoolTaskExecutor與阻塞佇列BlockingQueue

spring執行緒池ThreadPoolTaskExecutor與阻塞佇列BlockingQueue

一:  ThreadPoolTaskExecutor是一個spring的執行緒池技術,檢視程式碼可以看到這樣一個欄位:

    private ThreadPoolExecutor threadPoolExecutor;

  可以發現,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor進行實現,

  直接看程式碼:

複製程式碼
    @Override
    protected ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        BlockingQueue
<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(
true); } this.threadPoolExecutor = executor; return executor; }
複製程式碼

  這是ThreadPoolTaskExecutor用來初始化threadPoolExecutor的方法,BlockingQueue是一個阻塞佇列,這個我們先不管。由於ThreadPoolTaskExecutor的實現方式完全是使用threadPoolExecutor進行實現,我們需要知道這個threadPoolExecutor的一些引數。

複製程式碼
   public ThreadPoolExecutor(int
corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
複製程式碼

  這個是呼叫的建構函式:

  int corePoolSize:執行緒池維護執行緒的最小數量.
  int maximumPoolSize:執行緒池維護執行緒的最大數量.
  long keepAliveTime:空閒執行緒的存活時間.
  TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒列舉值.
  BlockingQueue<Runnable> workQueue:持有等待執行的任務佇列.
  RejectedExecutionHandler handler:
  用來拒絕一個任務的執行,有兩種情況會發生這種情況。
  一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即執行緒池已經飽和;
  二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就呼叫ensureQueuedTaskHandled(Runnable command),在該方法中有可能呼叫reject。

ThreadPoolExecutor池子的處理流程如下:  

1)當池子大小小於corePoolSize就新建執行緒,並處理請求

2)當池子大小等於corePoolSize,把請求放入workQueue中,池子裡的空閒執行緒就去從workQueue中取任務並處理

3)當workQueue放不下新入的任務時,新建執行緒入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理

4)另外,當池子的執行緒數大於corePoolSize的時候,多餘的執行緒會等待keepAliveTime長的時間,如果無請求可處理就自行銷燬

其會優先建立  CorePoolSiz 執行緒, 當繼續增加執行緒時,先放入Queue中,當 CorePoolSiz  和 Queue 都滿的時候,就增加建立新執行緒,當執行緒達到MaxPoolSize的時候,就會丟擲錯 誤 org.springframework.core.task.TaskRejectedException

另外MaxPoolSize的設定如果比系統支援的執行緒數還要大時,會丟擲java.lang.OutOfMemoryError: unable to create new native thread 異常。

  這個是ThreadPoolExecutor的運算流程,既然ThreadPoolTaskExecutor是直接使用ThreadPoolExecutor進行處理,所以運算規則肯定一樣。

在spring中使用ThreadPoolTaskExecutor的配置:

複製程式碼
 <!-- 非同步執行緒池 -->
    <bean id="threadPool"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 核心執行緒數 -->
        <property name="corePoolSize" value="3" />
        <!-- 最大執行緒數 -->
        <property name="maxPoolSize" value="10" />
        <!-- 佇列最大長度 >=mainExecutor.maxSize -->
        <property name="queueCapacity" value="25" />
        <!-- 執行緒池維護執行緒所允許的空閒時間 -->
        <property name="keepAliveSeconds" value="300" />
        <!-- 執行緒池對拒絕任務(無執行緒可用)的處理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,呼叫者的執行緒會執行該任務,如果執行器已關閉,則丟棄.  -->
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
        </property>
    </bean>
複製程式碼

Reject策略預定義有四種:
(1)ThreadPoolExecutor.AbortPolicy策略,是預設的策略,處理程式遭到拒絕將丟擲執行時 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,呼叫者的執行緒會執行該任務,如果執行器已關閉,則丟棄.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程).

向spring容器中加入ThreadPoolTaskExecutor後,使用時只需要呼叫其的execute方法,其引數為一個Runnable。

複製程式碼
threadPool.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println("=======");

                }
            });
複製程式碼

ThreadPoolTaskExecutor有兩個execute的過載,但翻看程式碼可以知道呼叫的是同一個方法,所以只調用execute就可以了

ThreadPoolTaskExecutor.execute

在execute中呼叫的是ThreadPoolExecutor中的execute方法,執行了上面的處理流程後執行任務。

ThreadPoolExecutor.execute

二:阻塞佇列BlockingQueue

  在ThreadPoolTaskExecutor原始碼中我們看到了BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);這樣一句話用來得到一個佇列,這個佇列是用來存放任務的。當執行緒池中有空閒執行緒時就回去任務佇列中拿任務並處理。BlockingQueue是一個阻塞併線程安全的一個佇列

  多執行緒環境中,通過佇列可以很容易實現資料共享,比如經典的“生產者”和“消費者”模型中,通過佇列可以很便利地實現兩者之間的資料共享。假設我們有若干 生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的數 據共享問題。但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並 且當生產出來的資料累積到一定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者執行緒),以便等待消費者執行緒把累積的資料處理完畢,反之亦然。

BlockingQueue的核心方法:
放入資料:
  offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,
    則返回true,否則返回false.(本方法不阻塞當前執行方法的執行緒)
  offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往佇列中
    加入BlockingQueue,則返回失敗。
  put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷
    直到BlockingQueue裡面有空間再繼續.
獲取資料:
  poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,
    取不到時返回null;
  poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件,如果在指定時間內,
    佇列一旦有資料可取,則立即返回佇列中的資料。否則知道時間超時還沒有資料可取,返回失敗。
  take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到
    BlockingQueue有新的資料被加入; 
  drainTo():一次性從BlockingQueue獲取所有可用的資料物件(還可以指定獲取資料的個數), 
    通過該方法,可以提升獲取資料效率;不需要多次分批加鎖或釋放鎖。

檢視 ThreadPoolTaskExecutor的程式碼可以發現,其主要是使用 BlockingQueue的一種實現LinkedBlockingQueue進行實現。

複製程式碼
/**
     * Create the BlockingQueue to use for the ThreadPoolExecutor.
     * <p>A LinkedBlockingQueue instance will be created for a positive
     * capacity value; a SynchronousQueue else.
     * @param queueCapacity the specified queue capacity
     * @return the BlockingQueue instance
     * @see java.util.concurrent.LinkedBlockingQueue
     * @see java.util.concurrent.SynchronousQueue
     */
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        if (queueCapacity > 0) {
            return new LinkedBlockingQueue<Runnable>(queueCapacity);
        }
        else {
            return new SynchronousQueue<Runnable>();
        }
    }
複製程式碼

LinkedBlockingQueue是一個基於連結串列的阻塞佇列,其內部也維持著一個數據緩衝佇列(該佇列由一個連結串列構成),當生產者往佇列中放入一個數據時,佇列會從生產者手中獲取資料,並快取在佇列內部,而生產者立 即返回;只有當佇列緩衝區達到最大值快取容量時(LinkedBlockingQueue可以通過建構函式指定該值),才會阻塞生產者佇列,直到消費者從 佇列中消費掉一份資料,生產者執行緒會被喚醒,反之對於消費者這端的處理也基於同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理 併發資料,還因為其對於生產者端和消費者端分別採用了獨立的鎖來控制資料同步,這也意味著在高併發的情況下生產者和消費者可以並行地操作佇列中的資料,以 此來提高整個佇列的併發效能。

  生成LinkedBlockingQueue時有一個大小限制,其預設為Integer.MAX_VALUE.

  另外LinkedBlockingQueue不接受null值,當新增null的時候,會直接丟擲NullPointerException:

  public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();

 3.另外最近的專案中使用的多執行緒和佇列比較多,多執行緒自不用說,我百度了一下佇列的優點,感覺說的很好,特此抄過來:

1. 解耦

在專案啟動之初來預測將來專案會碰到什麼需求,是極其困難的。訊息佇列在處理過程中間插入了一個隱含的、基於資料的介面層,兩邊的處理過程都要實現這一介面。這允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。

2. 冗餘

有時在處理資料的時候處理過程會失敗。除非資料被持久化,否則將永遠丟失。訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。在被許多訊息佇列所採用的"插入-獲取-刪除"正規化中,在把一個訊息從佇列中刪除之前,需要你的處理過程明確的指出該訊息已經被處理完畢,確保你的資料被安全的儲存直到你使用完畢。

3. 擴充套件性

因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的;只要另外增加處理過程即可。不需要改變程式碼、不需要調節引數。擴充套件就像調大電力按鈕一樣簡單。

4. 靈活性 & 峰值處理能力

在訪問量劇增的情況下,你的應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見;如果為 以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住增長的訪問壓力,而不是因為超出負荷的請求而完全崩潰。

5. 可恢復性

當體系的一部分元件失效,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。而這種允許重試或者延後處理請求的能力通常是造就一個略感不便的使用者和一個沮喪透頂的使用者之間的區別。

6. 送達保證

訊息佇列提供的冗餘機制保證了訊息能被實際的處理,只要一個程序讀取了該佇列即可。在此基礎上,IronMQ提供了一個"只送達一次"保證。無論有多少進 程在從佇列中領取資料,每一個訊息只能被處理一次。這之所以成為可能,是因為獲取一個訊息只是"預定"了這個訊息,暫時把它移出了佇列。除非客戶端明確的 表示已經處理完了這個訊息,否則這個訊息會被放回佇列中去,在一段可配置的時間之後可再次被處理。

7.排序保證

在許多情況下,資料處理的順序都很重要。訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。

8.緩衝

在任何重要的系統中,都會有需要不同的處理時間的元素。例如,載入一張圖片比應用過濾器花費更少的時間。訊息佇列通過一個緩衝層來幫助任務最高效率的執行--寫入佇列的處理會盡可能的快速,而不受從佇列讀的預備處理的約束。該緩衝有助於控制和優化資料流經過系統的速度。

9. 理解資料流

在一個分散式系統裡,要得到一個關於使用者操作會用多長時間及其原因的總體印象,是個巨大的挑戰。訊息系列通過訊息被處理的頻率,來方便的輔助確定那些表現不佳的處理過程或領域,這些地方的資料流都不夠優化。

10. 非同步通訊

很多時候,你不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許你把一個訊息放入佇列,但並不立即處理它。你想向佇列中放入多少訊息就放多少,然後在你樂意的時候再去處理它們。

ThreadPoolTaskExecutor

此文轉載自部落格園,原文地址:http://www.cnblogs.com/lic309/p/4186880.html

相關推薦

spring執行ThreadPoolTaskExecutor阻塞佇列BlockingQueue

一:  ThreadPoolTaskExecutor是一個spring的執行緒池技術,檢視程式碼可以看到這樣一個欄位: private ThreadPoolExecutor threadPoolExecutor;   可以發現,spring的  ThreadPoolTaskExecutor是

JAVA執行ThreadPoolExecutor阻塞佇列BlockingQueue

池技術是典型的享元模式。 頻繁使用new Thread來建立執行緒的方式並不太好。因為每次new Thread新建和銷燬物件效能較差,執行緒缺乏統一管理。好在java提供了執行緒池,它能夠有效的管理、排程執行緒,避免過多的資源消耗。優點如下: 重用

java執行ThreadPoolExecutor和阻塞佇列BlockingQueue,Executor, ExecutorService

ThreadPoolExecutor 引數最全的建構函式 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

關於執行中的阻塞佇列BlockingQueue

       接上篇文章https://blog.csdn.net/GoSaint/article/details/84345210        對於BlockingQueue阻塞佇列而言,常用在多執行緒生產者

1.1 Spring 執行 --- ThreadPoolTaskExecutor

Spring 擅長對元件的封裝和整合, Spring-context對JDK的併發包做了功能增強。  step 1 :Spring-context.xml 中增加如下程式碼 <bean id="poolTaskExecutor" class="org.springframe

7.執行中的阻塞佇列無限大是否合適

      在設定執行緒池佇列長度時,如果長度設定的不合理就無法發揮出多執行緒的威力。設定執行緒池的佇列長度取決於使用場景;比如全程非同步的系統,佇列可以設定為0,corePoolSize設定為cpu核數。研究tomcat、Dubbo等業界成熟的產品是如何設定執行緒佇列,分析

java多執行執行原理、阻塞佇列

# 一、執行緒池定義和使用 jdk 1.5 之後就引入了執行緒池。 ## 1.1 定義 從上面的空間切換看得出來,執行緒是稀缺資源,它的建立與銷燬是一個相對偏重且耗資源的操作,而Java執行緒依賴於核心執行緒,建立執行緒需要進行作業系統狀態切換。為避免資源過度消耗需要設法重用執行緒執行多個任務

java執行學習(九):阻塞佇列BlockingQueue講解

上一章中學到了執行緒池的詳細使用以及核心執行緒池的部分原始碼,其中就包含有BlockingQueue的資訊,那麼到底BlockingQueue是什麼呢,有什麼用呢,本章就是學這個的。 Blocking翻譯過來為’阻塞’,Queue就是佇列的意思,那麼BlockingQueue就是阻塞隊列了,

(八) Java多執行詳解之阻塞佇列BlockingQueue佇列優先順序詳解

阻塞佇列 阻塞佇列與普通佇列的區別在於當佇列是空時從佇列中獲取元素的操作將會被阻塞,或者當佇列是滿時往佇列裡新增元素的操作會被阻塞。試圖從空的阻塞佇列中獲取元素的執行緒將會被阻塞,直到其他的執行緒往空的佇列插入新的元素,同樣試圖往已滿的阻塞佇列中新增新元素的執

Java多執行/併發26、阻塞佇列BlockingQueue

BlockingQueue介面定義了一種佇列,這種佇列通常容量是提前固定(確定了容量大小)的。容量滿時往BlockingQueue中新增資料時會造成阻塞,容量為空時取元素操作會阻塞。 我們可以認為BlockingQueue佇列是一個水庫。水庫滿了的時侯,上游的

2、使用SPRING中的執行ThreadPoolTaskExecutor實現JAVA併發

new Thread的弊端如下:a. 每次new Thread新建物件效能差。b. 執行緒缺乏統一管理,可能無限制新建執行緒,相互之間競爭,及可能佔用過多系統資源導致宕機或oom。c. 缺乏更多功能,如定時執行、定期執行、執行緒中斷。相比new Thread,Java提供的四種執行緒池的好處在於:a

使用SPRING中的執行ThreadPoolTaskExecutor實現JAVA併發

使用SPRING中的執行緒池ThreadPoolTaskExecutor實現併發。 一:不需要返回值的情況  1,初始化執行緒池 Java程式碼   ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPool

實現Spring整合執行ThreadPoolTaskExecutor

在之前的專案裡面用到了執行緒池的功能,這裡記錄一下。 我們為什麼要實現執行緒池,下面是我百度查的: 在Java中,如果每當一個請求到達就建立一個新執行緒,開銷是相當大的。在實際使用中,每個請求建立新執行緒的伺服器在建立和銷燬執行緒上花費的時間和消耗的系統資源,甚至可能要比花

使用SPRING中的執行ThreadPoolTaskExecutor並且得到任務執行的結果

XML配置 <bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心執行

Junit單元測試+aop+spring+執行,在進行Junit測試時切面中執行內呼叫的方法不執行

一、問題背景: 寫了一個切面,指向某service包下的所有類及方法,當該service包下方法被呼叫時切面執行,切面中用了執行緒池ExecutorService pool = Executors.newFixedThreadPool(5);執行緒池內呼叫了dao層的方法。 二、問題描述:單

Java 執行中斷(interrupt)阻塞 (park)的區別

    很多Java開發人員(包括我),尤其是剛進入軟體行業的新手,認為Java設定執行緒中斷就是表示執行緒停止了,不往前執行了, Thread.currentThread().interrupt()    其實不是這樣的,執行緒中斷只是一個狀態而已,true表示已

執行安全實現 CLH 佇列

阻塞同步 在 Java 中,我們經常使用 synchronized 關鍵字來做到互斥同步以解決多執行緒併發訪問共享資料的問題。synchronzied 關鍵字在編譯後,會在 synchronized 所包含的同步程式碼塊前後分別加入 monitorenter 和 m

執行ThreadPoolTaskExecutor配置說明

     一般實際開發中經常用到多執行緒,所以需要使用執行緒池了,  ThreadPoolTaskExecutor通常通過XML方式配置,或者通過Executors的工廠方法進行配置。  XML方式配置程式碼如下:交給spring來管理; <bean id="taskExecutor"

python執行ThreadPoolExecutor程序ProcessPoolExecutor

1 import time 2 from concurrent.futures import ThreadPoolExecutor 3 4 def get_thread_time(times): 5 time.sleep(times) 6 return times 7 8

Spring 執行使用

Spring 中預設自帶執行緒池org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor,一般有可以直接使用,這是時候使用的是預設的配置,直接使用@Autowired注入使用     @Autowire