1. 程式人生 > >盤點java併發包提供的執行緒池和佇列

盤點java併發包提供的執行緒池和佇列

執行緒池
  • newCachedThreadPool()
  • newFixedThreadPool(int nThreads)
  • newSingleThreadPoolExecutor()
  • newScheduledThreadPool(int corePoolSize)
  • newWorkStrealingPool(int parallelism)
佇列
  • SynchronousQueue
  • LinkedBlockingQueue
執行緒池 Executors框架的5個靜態工廠方法
  • CachedThreadPool
經常用來處理短時間任務的執行緒池,它試圖快取執行緒並重用,如果沒有可用的快取執行緒則新建執行緒,如果執行緒閒置60秒會移出快取,這種執行緒池長時間閒置不會佔用什麼資源,但是如果任務併發出現速度過快會導致執行緒建立來不及處理任務導致部分任務等待超時。
  • FixedThreadPool
建立時接收快取執行緒數引數,它只會快取這麼多執行緒,不會多也不會少,如果有工作執行緒退出,會建立新的執行緒補足指定數目。
它的工作佇列是無界的,如果沒有執行緒可用則會在佇列中等待,這導致如果執行緒大小設定不合理以及任務併發大的情況下,請求型別的任務可能會導致請求超時。
為了避免上面的情況導致超時也好,佇列任務過多導致oom也好,可以使用jmap等工具,檢視是否有大量任務物件入隊。
  • SingleThreadPool
它建立的是ScheduledExecutorService,可以定時或週期性的工作排程,執行緒數被限制為1,保證任務是順序執行,執行緒池例項不能被修改。
  • ScheduledThreadPool
它建立的也是ScheduledExecutorService,區別在於它會保持核心執行緒在一定數量
  • WorkStrealingPool
內部構造ForkJoinPool,利用Work-Stealing 演算法,並行處理任務,不保證處理順序。
ThreadPoolExecutor 建立執行緒
new  ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);




• corePoolSize(執行緒池的基本大小):當提交一個任務到執行緒池時,執行緒池會建立一個執行緒來執行任務,即使其他空閒的基本執行緒能夠執行新任務也會建立執行緒,等到需要執行的任務數大於執行緒池基本大小時就不再建立。如果呼叫了執行緒池的prestartAllCoreThreads方法,執行緒池會提前建立並啟動所有基本執行緒。
• runnableTaskQueue(任務佇列):用於儲存等待執行的任務的阻塞佇列。 可以選擇以下幾個阻塞佇列。
• ArrayBlockingQueue:是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序。
• LinkedBlockingQueue:一個基於連結串列結構的阻塞佇列,此佇列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個佇列。
• SynchronousQueue:一個不儲存元素的阻塞佇列。每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個佇列。
• PriorityBlockingQueue:一個具有優先順序的無限阻塞佇列。
• maximumPoolSize(執行緒池最大大小):執行緒池允許建立的最大執行緒數。如果佇列滿了,並且已建立的執行緒數小於最大執行緒數,則執行緒池會再建立新的執行緒執行任務。值得注意的是如果使用了無界的任務佇列這個引數就沒什麼效果。
• ThreadFactory:用於設定建立執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設定更有意義的名字。
• RejectedExecutionHandler(飽和策略):當佇列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略預設情況下是AbortPolicy,表示無法處理新任務時丟擲異常。以下是JDK1.5提供的四種策略。
• AbortPolicy:直接丟擲異常。
• CallerRunsPolicy:只用呼叫者所線上程來執行任務。
• DiscardOldestPolicy:丟棄佇列裡最近的一個任務,並執行當前任務。
• DiscardPolicy:不處理,丟棄掉。
• 當然也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化不能處理的任務。
• keepAliveTime(執行緒活動保持時間):執行緒池的工作執行緒空閒後,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高執行緒的利用率。
• TimeUnit(執行緒活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
向執行緒池提交任務
任務拒絕策略
當執行緒池的任務快取佇列已滿並且執行緒池中的執行緒數目達到maximumPoolSize,如果還有任務到來就會採取任務拒絕策略,通常有以下四種策略:
• ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
• ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
• ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
• ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
自定義拒絕策略
/**
 * 執行緒池 請求數 大於 最大執行緒數+佇列大小
 * @Author: duhongjiang
 * @Date: Created in 2018/6/24
 */
public abstract class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    private static Logger logger = LoggerFactory.getLogger(MyRejectedExecutionHandler.class);
    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
          rejected();
    }
    protected abstract void rejected();
}
執行緒池容量的動態調整
ThreadPoolExecutor提供了動態調整執行緒池容量大小的方法:setCorePoolSize()和setMaximumPoolSize()
• setCorePoolSize:設定核心池大小
• setMaximumPoolSize:設定執行緒池最大能建立的執行緒數目大小
佇列
  • SynchronousQueue
這個佇列不會快取任務,也就是說佇列容量是0,來一個任務就交給執行緒,是CachedThreadPool使用的佇列。
  • LinkedBlockingQueue
基於連結串列的先進先出 無界的佇列,FixedThreadPool使用。但是 無界是相對說的,它的佇列大小預設是Integer.MAX_VALUE,當然也可以指定大小。