1. 程式人生 > >java內置線程池ThreadPoolExecutor源碼學習記錄

java內置線程池ThreadPoolExecutor源碼學習記錄

news executor 性能優化 rup 基本實現 返回 全部 img throws

背景

公司業務性能優化,使用java自帶的Executors.newFixedThreadPool()方法生成線程池。但是其內部定義的LinkedBlockingQueue容量是Integer.MAX_VALUE。考慮到如果數據庫中待處理數據量很大有可能會在短時間內往LinkedBlockingQueue中填充很多數據,導致內存溢出。於是看了一下線程池這塊的源碼,並在此記錄。

類圖

技術分享圖片

  • Executor是一個頂層接口,在它裏面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;

  • ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等

  • 抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;submit() 方法

  • ThreadPoolExecutor繼承了類AbstractExecutorService。實現了execute(Runnable)方法。

  • Executors提供的集中工廠方法都是調用的ThreadPoolExecutor的構造方法。因為這個構造方法參數比較多 所以提供了幾個經典的實現。

ExecutorService newCachedThreadPool = Executors.newFixedThreadPool();
ExecutorService newCachedThreadPool 
= Executors.newSingleThreadExecutor(); ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); ExecutorService newCachedThreadPool = Executors.newScheduledThreadPool();
  • 本篇違章主要包括以下幾點內容。這也是解決背景中提到的問題的主要歷程。

    1.ThreadPoolExecutor構造方法

    2.ExecutorService submit() 方法的實現

    2.Executor execute() 方法的實現

    3.reject() 拒絕策略

ThreadPoolExecutor構造方法

構造方法中賦值的成員標量:

// 構造方法中用到的成員變量
private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數
private volatile long  keepAliveTime;    //線程空閑之後存貨時間 (線程數量大於corePoolSize之後)
private final BlockingQueue<Runnable> workQueue;              //任務緩存隊列,用來存放等待執行的任務
private volatile ThreadFactory threadFactory;   //線程工廠,用來創建線程 
private volatile RejectedExecutionHandler handler; //任務拒絕策略

通過代碼可以知道 Executors提供的集中工廠方法實際都是調用的同一個ThreadPoolExecutor的構造方法。當然我們也可以通過自己調用ThreadPoolExecutor構造方法 自己設置參數 從而獲得很貼合我們業務的線程池。

AbstractExecutorService submit() 方法

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

其實是調用了execute() 方法,execute()方法 由ThreadPoolExecutor類實現。

ThreadPoolExecutor execute()方法

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // 29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
        // 0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
        // 高三位 代表 狀態
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
        // 低三位 代表 數量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 把狀態和數量兩個值 揉在一起
        // private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static int ctlOf(int rs, int wc) { return rs | wc; }
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 獲取到當前有效的線程數和線程池的狀態
        int c = ctl.get();
              // 1.獲取當前正在運行線程數是否小於核心線程池,是則新創建一個線程執行任務,否則將任務放到任務隊列中
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
                 // 2.當前核心線程池中全部線程都在運行workerCountOf(c) >= corePoolSize,所以此時將線程放到任務隊列中
                 // 線程池是否處於運行狀態,且是否任務插入任務隊列成功。註意這塊 && 是做了優化如果前面條件失敗後面語句不會處理
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
              //在此檢查線程池是否處於運行狀態,如果不是則使剛剛的任務出隊。和上面一樣 && 是做了優化如果前面條件失敗後面語句不會處理
            if (! isRunning(recheck) && remove(command))
                reject(command);
              // 如果沒有執行的線程,就再開啟一個線程(有可能沒有核心線程)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
              // 3.插入隊列不成功 offer() 方法失敗是因為隊列滿了,此時就新創建線程去執行任務,創建失敗拋出異常
        else if (!addWorker(command, false))
            reject(command);
    }
// CAS修改clt的值+1,成功退出cas循環,失敗繼續
if (compareAndIncrementWorkerCount(c))
                    break retry;
//將新建的線程加入到線程池中
workers.add(w);
int s = workers.size();
//修正largestPoolSize的值
if (s > largestPoolSize)
  largestPoolSize = s;
workerAdded = true;

addWorker()方法 總結起來就兩部分

1.CAS+失敗重試操作來將線程數加1

2.新建一個線程並啟用。

RejectedExecutionHandler拒絕策略

java 內置的四種拒絕策略。

 public static class AbortPolicy implements RejectedExecutionHandler  // 拋出java.util.concurrent.RejectedExecutionException異常
 public static class CallerRunsPolicy implements RejectedExecutionHandler //直接在 execute 方法的調用線程中運行被拒絕的任務。如果執行程序已關閉,則會丟棄該任務
 public static class DiscardPolicy implements RejectedExecutionHandler  // 不做任何處理 直接丟棄
 public static class DiscardOldestPolicy implements RejectedExecutionHandler  // 丟棄老的

自定義拒絕策略:

new RejectedExecutionHandler() {
            // 自定義拒絕策略
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    // 如果LinkedBlockingQueue存滿了,阻塞等待有空間後再加入元素。(put方法是阻塞的)
                    LOGGER.info("LinkedBlockingQueue has been full ");
                      // put() 方法是阻塞的,如果隊列沒有空間會一直等待。
                    executor.getQueue().put(r);
                    LOGGER.info("thread has been put in");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

總結一點:當用java內置的一些工具的時候,如果有不理解的一定要 深入去看源碼。從根本上找解決思路。

java內置線程池ThreadPoolExecutor源碼學習記錄