1. 程式人生 > >Java執行緒池詳解及常用方法

Java執行緒池詳解及常用方法

前言

最近被問到了執行緒池的相關問題。於是準備開始寫一些多執行緒相關的文章。這篇將介紹一下執行緒池的基本使用。

Executors

Executors是concurrent包下的一個類,為我們提供了建立執行緒池的簡便方法。
Executors可以建立我們常用的四種執行緒池:
(1)newCachedThreadPool 建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。不設上限,提交的任務將立即執行。
(2)newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
(3)newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。

(4)newSingleThreadExecutor 建立一個單執行緒化的執行緒池執行任務。

Executors的壞處

正常來說,我們不應該使用這種方式建立執行緒池,應該使用ThreadPoolExecutor來建立執行緒池。Executors建立的執行緒池也是呼叫的ThreadPoolExcutor的建構函式。通過原來可以看出。

我們也看到了這裡面的LinkedBlockingQueue並沒有指定佇列的大小是一個無界佇列,這樣可能會造成oom。所以我們要使用ThreadPoolExecutor這種方式。

ThreadPoolExecutor

通過原始碼看到ThreadPoolExecutor比較全的建構函式如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

分別解釋一下引數的意義
corePoolSize:執行緒池長期維持的執行緒數,即使執行緒處於Idle狀態,也不會回收。
maximumPoolSize:執行緒數的上限
keepAliveTime:空閒的時間,超過這個空閒時間,執行緒將被回收
unit:空閒時間的時間單位
workQueue:任務的排隊佇列,當執行緒都執行的時候,有空的執行緒將從佇列彙總進行拿取
threadFactroy:當核心執行緒小於滿執行緒的時候,又需要多加執行緒,則需要從工廠中獲取執行緒
handler:拒絕策略,當執行緒過多的時候的策略

執行緒池針對於任務的執行順序

首先任務過來之後,看看corePoolSize是否有空閒的,有的話就執行。沒有的話,放入任務佇列裡面。然後任務佇列會通知執行緒工廠,趕緊造幾個執行緒,來執行。當任務超過了最大的執行緒數,就執行拒絕策略,拒絕執行。

submit方法

執行緒池建立完畢之後,我們就需要往執行緒池提交任務。通過執行緒池的submit方法即可。
submit方法接收兩種Runable和Callable。
區別如下:
Runable是實現該介面的run方法,callable是實現介面的call方法。
callable允許使用返回值。
callable允許丟擲異常。

提交任務的方式

Future submit(Callable task):這種方式可以拿到返回的結果。
void execute(Runnable command):這種方式拿不到。
Future<?> submit(Runnable task):這種方式可以get,但是永遠是null。

blockqueue的限制

我們在建立執行緒池的時候,如果使用Executors。建立的是無界佇列,容易造成oom。所以我們要自己執行queue的大小。

BlockingQueue queue = new ArrayBlockingQueue<>(512)

拒絕策略

當任務佇列的queue滿了的時候,在提交任務,就要觸發拒絕策略。佇列中預設的拒絕策略是 AbortPolicy。是直接丟擲異常的一種策略。
如果是想實現自定義的策略,可以實現RejectedExecutionHandler 介面。
執行緒池提供瞭如下的幾種策略供選擇。
AbortPolicy:預設策略,丟擲RejectedExecutionException
DiscardPolicy:忽略當前提交的任務
DiscardOldestPolicy:丟棄任務佇列中最老的任務,給新任務騰出地方
CallerRunsPolicy:由提交任務者執行這個任務

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 
                0, TimeUnit.SECONDS, 
                new ArrayBlockingQueue<>(512), 
                new ThreadPoolExecutor.DiscardPolicy());

捕捉異常

如之前所說Callable介面的實現,可以獲取到結果和異常。通過返回的Future的get方法即可拿到。

ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            throw new RuntimeException("exception");// 該異常會在呼叫Future.get()時傳遞給呼叫者
        }
    });
    
try {
  Object result = future.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
  e.printStackTrace();
}

正確構造執行緒池的方式

int poolSize = Runtime.getRuntime().availableProcessors() * 2;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512);
RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy();
executorService = new ThreadPoolExecutor(poolSize, poolSize,
    0, TimeUnit.SECONDS,
            queue,
            policy);

獲取單個結果

通過submit提交一個任務後,可以獲取到一個future,呼叫get方法會阻塞並等待執行結果。get(long timeout, TimeUnit unit)可以指定等待的超時時間。

獲取多個結果

可以使用迴圈依次呼叫,也可以使用ExecutorCompletionService。該類的take方式,會阻塞等待某一任務完成。向CompletionService批量提交任務後,只需呼叫相同次數的CompletionService.take()方法,就能獲取所有任務的執行結果,獲取順序是任意的,取決於任務的完成順序。

void solve(Executor executor, Collection<Callable<Result>> solvers)
   throws InterruptedException, ExecutionException {
   
   CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 構造器
   
   for (Callable<Result> s : solvers)// 提交所有任務
       ecs.submit(s);
       
   int n = solvers.size();
   for (int i = 0; i < n; ++i) {// 獲取每一個完成的任務
       Result r = ecs.take().get();
       if (r != null)
           use(r);
   }
}

這個類是對執行緒池的一個包裝,包裝完後,聽過他進行submit和take。

單個任務超時

Future.get(long timeout, TimeUnit unit)。方法可以指定等待的超時時間,超時未完成會丟擲TimeoutException。

多個任務超時

等待多個任務完成,並設定最大等待時間,可以通過CountDownLatch完成:

public void testLatch(ExecutorService executorService, List<Runnable> tasks) 
    throws InterruptedException{
      
    CountDownLatch latch = new CountDownLatch(tasks.size());
      for(Runnable r : tasks){
          executorService.submit(new Runnable() {
              @Override
              public void run() {
                  try{
                      r.run();
                  }finally {
                      latch.countDown();// countDown
                  }
              }
          });
      }
      latch.await(10, TimeUnit.SECONDS); // 指定超時時間
  }

await是總的時間,即使100個任務,需要跑20分鐘。我10s超時了 也停止了