1. 程式人生 > >使用Semaphore控制執行緒池任務提交的速率

使用Semaphore控制執行緒池任務提交的速率

使用Semaphore控制執行緒池任務提交的速率

歡迎關注作者部落格
簡書傳送門

介紹

  當執行緒池的工作佇列被填滿後,如果沒有預定義的飽和策略來阻塞任務的執行,則可以通過訊號量Semaphore來限制任務的到達率。Semaphore是一個同步工具類,用來控制同時訪問某個特定資源的運算元量。它的acquire方法返回一個虛擬的許可,如果沒有可用的許可,則阻塞該方法的呼叫執行緒直到有可用許可為止。如果執行緒池使用無界佇列緩衝任務時,如果任務在某一時間增長數量過快,容易導致記憶體耗盡。
  無界佇列和Semaphore搭配使用,通過設定訊號量的上界,來控制任務的提交速率。

四種飽和策略

  • static class ThreadPoolExecutor.AbortPolicy
    用於被拒絕任務的處理程式,它將丟擲 RejectedExecutionException.
  • static class ThreadPoolExecutor.CallerRunsPolicy
    用於被拒絕任務的處理程式,它直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務。
  • static class ThreadPoolExecutor.DiscardOldestPolicy
    用於被拒絕任務的處理程式,它放棄最舊的未處理請求,然後重試 execute;如果執行程式已關閉,則會丟棄該任務。
  • static class ThreadPoolExecutor.DiscardPolicy
    用於被拒絕任務的處理程式,預設情況下它將丟棄被拒絕的任務。

原始碼

下面使用Semaphore來控制執行緒池任務提交的速率:

/**
 * @program:
 * @description: 使用Semaphore控制執行緒池任務提交速率
 * @author: zhouzhixiang
 * @create: 2018-11-13 20:48
 */
@ThreadSafe
public class BoundedExecutor {
    private final ExecutorService executor;
private final Semaphore semaphore; public BoundedExecutor(ExecutorService executor, int bound) { this.executor = executor; this.semaphore = new Semaphore(bound); } public void submitTask(final Runnable command) { try { semaphore.acquire(); executor.execute(new Runnable() { @Override public void run() { try { command.run(); }finally { semaphore.release(); } } }); } catch (InterruptedException e) { semaphore.release(); } } public void stop(){ this.executor.shutdown(); } static class MyThread extends Thread { public String name; public MyThread(String name) { this.name = name; } @Override public void run() { System.out.println("Thread-"+name+" is running...."); try { Thread.sleep(new Random().nextInt(10000)); }catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(2,2,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(5)); BoundedExecutor executor = new BoundedExecutor(executorService, 5); for (int i = 0; i < 100; i++) { executor.submitTask(new MyThread(""+i)); } executor.stop(); } }

歡迎加入Java猿社群
歡迎加入Java猿社群.png