1. 程式人生 > >JAVA自定義阻塞型線程池

JAVA自定義阻塞型線程池

executors shutdown @class dna 執行指定 gin eject sta rup

Java的線程池ThreadPoolExecutor是很常用的,常見構造如下:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              RejectedExecutionHandler handler) {
        
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }

  • corePoolSize: 線程池維護線程的最少數量
  • maximumPoolSize:線程池維護線程的最大數量
  • keepAliveTime: 線程池維護線程所允許的空閑時間
  • unit: 線程池維護線程所允許的空閑時間的單位
  • workQueue: 線程池所使用的緩沖隊列
  • handler: 線程池對拒絕任務的處理策略

  正式使用中一般都會設置一個最大緩沖隊列容量,如果線程池滿它會對繼續添加的任務線程執行指定的拒絕策略,ThreadPoolExcetor 的最後一個參數指定了拒絕策略,JDK提供了四種拒絕策略:


AbortPolicy 策略、CallerRunsPolicy策略、 DiscardOledestPolicy策略、DiscardPolicy策略。

AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作。

CallerRunsPolicy 策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。

DiscardOleddestPolicy策略: 該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。

DiscardPolicy策略:該策略默默的丟棄無法處理的任務,不予任何處理。

  可以看到默認提供的四種策略似乎都不太友好,要麽放棄要麽拋異常,有時候我們需要保證任務添加不會失敗,只要被添加的任務能依次順序執行就好了,不需要這個添加動作立即響應,

即讓線程池等待池中的任務完成後再繼續添加新任務,此時需要自定義拒絕策略以及任務緩沖隊列,代碼如下:

package com.montnets.task;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定義阻塞型線程池 當池滿時會阻塞任務提交
 * 
 * @ClassName: BlockThreadPool
 * @Description: TODO
 * @author: wangs
 * @date: 2018-1-24 下午5:24:54
 */
public class BlockThreadPool {

    private ThreadPoolExecutor pool = null;
    
    public BlockThreadPool(int poolSize) {
        pool = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new CustomThreadFactory(),
                new CustomRejectedExecutionHandler());
    }
    
    public void destory() {
        if (pool != null) {
            pool.shutdownNow();
        }
    }

    private class CustomThreadFactory implements ThreadFactory {
        private AtomicInteger count = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = BlockThreadPool.class.getSimpleName() + count.addAndGet(1);
            t.setName(threadName);
            return t;
        }
    }

    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // 核心改造點,由blockingqueue的offer改成put阻塞方法
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void execute(Runnable runnable) {
        this.pool.execute(runnable);
    }

    // 測試構造的線程池
    public static void main(String[] args) {
        BlockThreadPool pool = new BlockThreadPool(3);
        for (int i = 1; i < 100; i++) {
            System.out.println("提交第" + i + "個任務!");
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getId() + "=====開始");
                        TimeUnit.SECONDS.sleep(10);
                        System.out.println(Thread.currentThread().getId() + "=====【結束】");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("【提交第" + i + "個任務成功!】");
        }

        // 2.銷毀----此處不能銷毀,因為任務沒有提交執行完,如果銷毀線程池,任務也就無法執行了
        // exec.destory();
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  該類的核心實現還是ThreadPoolExecutor,只是自定義了拒絕策略CustomRejectedExecutionHandler,緩沖隊列也修改成了ArrayBlockingQueue,即實現了阻塞功能,

當核心池和緩存隊列滿了之後外部再調用execute時就會阻塞住,一直等到池裏某個任務完成後釋放出空閑線程以後,再將該任務添加到緩存隊列,而不會拋異常或丟棄該任務。

適用於一些定時掃描觸發任務類場景。


-- 多情為何忘情,無心怎去用心,空有知人之智,恨無自知之明。

JAVA自定義阻塞型線程池