1. 程式人生 > >死磕 java執行緒系列之自己動手寫一個執行緒池

死磕 java執行緒系列之自己動手寫一個執行緒池

歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。

(手機橫屏看原始碼更方便)


問題

(1)自己動手寫一個執行緒池需要考慮哪些因素?

(2)自己動手寫的執行緒池如何測試?

簡介

執行緒池是Java併發程式設計中經常使用到的技術,那麼自己如何動手寫一個執行緒池呢?本文彤哥將手把手帶你寫一個可用的執行緒池。

屬性分析

執行緒池,顧名思義它首先是一個“池”,這個池裡面放的是執行緒,執行緒是用來執行任務的。

首先,執行緒池中的執行緒應該是有類別的,有的是核心執行緒,有的是非核心執行緒,所以我們需要兩個變數標識核心執行緒數量coreSize和最大執行緒數量maxSize。

為什麼要區分是否為核心執行緒呢?這是為了控制系統中執行緒的數量。

當執行緒池中執行緒數未達到核心執行緒數coreSize時,來一個任務加一個執行緒是可以的,也可以提高任務執行的效率。

當執行緒池中執行緒數達到核心執行緒數後,得控制一下執行緒的數量,來任務了先進佇列,如果任務執行足夠快,這些核心執行緒很快就能把佇列中的任務執行完畢,完全沒有新增執行緒的必要。

當佇列中任務也滿了,這時候光靠核心執行緒就無法及時處理任務了,所以這時候就需要增加新的執行緒了,但是執行緒也不能無限制地增加,所以需要控制其最大執行緒數量maxSize。

其次,我們需要一個任務佇列來存放任務,這個佇列必須是執行緒安全的,我們一般使用BlockingQueue阻塞佇列來充當,當然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞佇列,不能運用在jdk的執行緒池中)。

最後,當任務越來越多而執行緒處理卻不及時,遲早會達到一種狀態,佇列滿了,執行緒數也達到最大執行緒數了,這時候怎麼辦呢?這時候就需要走拒絕策略了,也就是這些無法及時處理的任務怎麼辦的一種策略,常用的策略有丟棄當前任務、丟棄最老的任務、呼叫者自己處理、丟擲異常等。

根據上面的描述,我們定義一個執行緒池一共需要這麼四個變數:核心執行緒數coreSize、最大執行緒數maxSize、阻塞佇列BlockingQueue、拒絕策略RejectPolicy。

另外,為了便於給執行緒池一個名稱,我們再加一個變數:執行緒池的名稱name。

所以我們得出了執行緒池的屬性及構造方法大概如下:

public class MyThreadPoolExecutor implements Executor {
    /**
     * 執行緒池的名稱
     */
    private String name;
    /**
     * 核心執行緒數
     */
    private int coreSize;
    /**
     * 最大執行緒數
     */
    private int maxSize;
    /**
     * 任務佇列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒絕策略
     */
    private RejectPolicy rejectPolicy;

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }
}

任務流向分析

根據上面的屬性分析,基本上我們已經得到了任務流向的完整邏輯:

首先,如果執行的執行緒數小於核心執行緒數,直接建立一個新的核心執行緒來執行新的任務。

其次,如果執行的執行緒數達到了核心執行緒數,則把新任務入佇列。

然後,如果佇列也滿了,則建立新的非核心執行緒來執行新的任務。

最後,如果非核心執行緒數也達到最大了,那就執行拒絕策略。

程式碼邏輯大致如下:

    @Override
    public void execute(Runnable task) {
        // 正在執行的執行緒數
        int count = runningCount.get();
        // 如果正在執行的執行緒數小於核心執行緒數,直接加一個執行緒
        if (count < coreSize) {
            // 注意,這裡不一定新增成功,addWorker()方法裡面還要判斷一次是不是確實小
            if (addWorker(task, true)) {
                return;
            }
            // 如果新增核心執行緒失敗,進入下面的邏輯
        }

        // 如果達到了核心執行緒數,先嚐試讓任務入隊
        // 這裡之所以使用offer(),是因為如果佇列滿了offer()會立即返回false
        if (taskQueue.offer(task)) {
            // do nothing,為了邏輯清晰這裡留個空if
            // 【本篇文章由公眾號“彤哥讀原始碼”原創】
        } else {
            // 如果入隊失敗,說明佇列滿了,那就新增一個非核心執行緒
            if (!addWorker(task, false)) {
                // 如果新增非核心執行緒失敗了,那就執行拒絕策略
                rejectPolicy.reject(task, this);
            }
        }
    }

建立執行緒邏輯分析

首先,建立執行緒的依據是正在執行的執行緒數量有沒有達到核心執行緒數或者最大執行緒數,所以我們還需要一個變數runningCount用來記錄正在執行的執行緒數。

其次,這個變數runningCount需要在併發環境下加加減減,所以這裡需要使用到Unsafe的CAS指令來控制其值的修改,用了CAS就要給這個變數加上volatile修飾,為了方便我們這裡直接使用AtomicInteger來作為這個變數的型別。

然後,因為是併發環境中,所以需要判斷runningCount < coreSize(或maxSize)(條件一)的同時修改runningCount CAS加一(條件二)成功了才表示可以增加一個執行緒,如果條件一失敗則表示不能再增加執行緒了直接返回false,如果條件二失敗則表示其它執行緒先修改了runningCount的值,則重試。

最後,建立一個執行緒並執行新任務,且不斷從佇列中拿任務來執行【本篇文章由公眾號“彤哥讀原始碼”原創】。

程式碼邏輯如下:

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判斷是不是真的可以建立一個執行緒
        for (; ; ) {
            // 正在執行的執行緒數
            int count = runningCount.get();
            // 核心執行緒還是非核心執行緒
            int max = core ? coreSize : maxSize;
            // 不滿足建立執行緒的條件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以建立執行緒
            if (runningCount.compareAndSet(count, count + 1)) {
                // 執行緒的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 建立執行緒並啟動
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 執行的任務
                    Runnable task = newTask;
                    // 不斷從任務佇列中取任務執行,如果取出來的任務為null,則跳出迴圈,執行緒也就結束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 執行任務
                            task.run();
                        } finally {
                            // 任務執行完成,置為空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

取任務邏輯分析

從佇列中取任務應該使用take()方法,這個方法會一直阻塞直至取到任務或者中斷,如果中斷了就返回null,這樣當前執行緒也就可以安靜地結束了,另外還要注意中斷了記得把runningCount減一。

    private Runnable getTask() {
        try {
            // take()方法會一直阻塞直到取到任務為止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 執行緒中斷了,返回null可以結束當前執行緒
            // 當前執行緒都要結束了,理應要把runningCount的數量減一
            runningCount.decrementAndGet();
            return null;
        }
    }

好了,到這裡我們自己的執行緒池就寫完了,下面我們一起來想想怎麼測試呢?

測試邏輯分析

我們再來回顧下自己的寫的執行緒池的構造方法:

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

name,這個隨便傳;

coreSize,我們假設為5;

maxSize,我們假設為10;

taskQueue,任務佇列,既然我們設定的是有邊界的,我們就用最簡單的ArrayBlockingQueue好吧,容量設定為15,這樣裡面最多可以儲存15條任務;

rejectPolicy,拒絕策略,我們假設使用丟棄當前任務的策略,OK,我們來實現一個。

/**
 * 丟棄當前任務
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

OK,這樣一個執行緒池就建立完成了,下面就是執行任務了,我們假設通過for迴圈連續不斷地新增100個任務好不好。

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

我們分析下這段程式:

(1)先連續建立了5個核心執行緒,並執行了新任務;

(2)後面的15個任務進了佇列;

(3)佇列滿了,又連續建立了5個執行緒,並執行了新任務;

(4)後面的任務就沒得執行了,全部走了丟棄策略;

(5)所以真正執行成功的任務應該是 5 + 15 + 5 = 25 條任務;

執行之:

thread name: core_test2
thread name: core_test5
thread name: core_test3
thread name: core_test4
thread name: core_test1
thread name: test6
thread name: test7
thread name: test8
thread name: test9
discard one task
thread name: test10
discard one task
...省略被拒絕的任務
【本篇文章由公眾號“彤哥讀原始碼”原創】
discard one task
running: 1570546871851: 2
running: 1570546871851: 8
running: 1570546871851: 7
running: 1570546871851: 6
running: 1570546871851: 5
running: 1570546871851: 3
running: 1570546871851: 4
running: 1570546871851: 1
running: 1570546871851: 10
running: 1570546871851: 9
running: 1570546872852: 14
running: 1570546872852: 20
running: 1570546872852: 19
running: 1570546872852: 17
running: 1570546872852: 18
running: 1570546872852: 16
running: 1570546872852: 15
running: 1570546872852: 12
running: 1570546872852: 13
running: 1570546872852: 11
running: 1570546873852: 21
running: 1570546873852: 24
running: 1570546873852: 23
running: 1570546873852: 25
running: 1570546873852: 22

可以看到,建立了5個核心執行緒、5個非核心執行緒,成功執行了25條任務,完成沒問題,完美^^。

總結

(1)自己動手寫一個執行緒池需要考慮的因素主要有:核心執行緒數、最大執行緒數、任務佇列、拒絕策略。

(2)建立執行緒的時候要時刻警惕併發的陷阱;

彩蛋

我們知道,jdk自帶的執行緒池還有兩個引數:keepAliveTime、unit,它們是幹什麼的呢?

答:它們是用來控制何時銷燬非核心執行緒的,當然也可以銷燬核心執行緒,具體的分析請期待下一章吧。

完整原始碼

Executor介面

public interface Executor {
    void execute(Runnable command);
}

MyThreadPoolExecutor執行緒池實現類

/**
 * 自動動手寫一個執行緒池
 */
public class MyThreadPoolExecutor implements Executor {

    /**
     * 執行緒池的名稱
     */
    private String name;
    /**
     * 執行緒序列號
     */
    private AtomicInteger sequence = new AtomicInteger(0);
    /**
     * 核心執行緒數
     */
    private int coreSize;
    /**
     * 最大執行緒數
     */
    private int maxSize;
    /**
     * 任務佇列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒絕策略
     */
    private RejectPolicy rejectPolicy;
    /**
     * 當前正在執行的執行緒數【本篇文章由公眾號“彤哥讀原始碼”原創】
     * 需要修改時執行緒間立即感知,所以使用AtomicInteger
     * 或者也可以使用volatile並結合Unsafe做CAS操作(參考Unsafe篇章講解)
     */
    private AtomicInteger runningCount = new AtomicInteger(0);

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

    @Override
    public void execute(Runnable task) {
        // 正在執行的執行緒數
        int count = runningCount.get();
        // 如果正在執行的執行緒數小於核心執行緒數,直接加一個執行緒
        if (count < coreSize) {
            // 注意,這裡不一定新增成功,addWorker()方法裡面還要判斷一次是不是確實小
            if (addWorker(task, true)) {
                return;
            }
            // 如果新增核心執行緒失敗,進入下面的邏輯
        }

        // 如果達到了核心執行緒數,先嚐試讓任務入隊
        // 這裡之所以使用offer(),是因為如果佇列滿了offer()會立即返回false
        if (taskQueue.offer(task)) {
            // do nothing,為了邏輯清晰這裡留個空if
        } else {
            // 如果入隊失敗,說明佇列滿了,那就新增一個非核心執行緒
            if (!addWorker(task, false)) {
                // 如果新增非核心執行緒失敗了,那就執行拒絕策略
                rejectPolicy.reject(task, this);
            }
        }
    }

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判斷是不是真的可以建立一個執行緒
        for (; ; ) {
            // 正在執行的執行緒數
            int count = runningCount.get();
            // 核心執行緒還是非核心執行緒
            int max = core ? coreSize : maxSize;
            // 不滿足建立執行緒的條件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以建立執行緒
            if (runningCount.compareAndSet(count, count + 1)) {
                // 執行緒的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 建立執行緒並啟動
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 執行的任務【本篇文章由公眾號“彤哥讀原始碼”原創】
                    Runnable task = newTask;
                    // 不斷從任務佇列中取任務執行,如果取出來的任務為null,則跳出迴圈,執行緒也就結束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 執行任務
                            task.run();
                        } finally {
                            // 任務執行完成,置為空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

    private Runnable getTask() {
        try {
            // take()方法會一直阻塞直到取到任務為止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 執行緒中斷了,返回null可以結束當前執行緒
            // 當前執行緒都要結束了,理應要把runningCount的數量減一
            runningCount.decrementAndGet();
            return null;
        }
    }

}

RejectPolicy拒絕策略介面

public interface RejectPolicy {
    void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
}

DiscardRejectPolicy丟棄策略實現類

/**
 * 丟棄當前任務
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

測試類

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

    }
}

歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。

相關推薦

java執行系列自己動手一個執行

歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。 (手機橫屏看原始碼更方便) 問題 (1)自己動手寫一個執行緒池需要考慮哪些因素? (2)自己動手寫的執行緒池如何測試? 簡介 執行緒池是Java併發程式設計中經常使用到的技術,那麼自己如何動手寫一個執行緒池呢?本

java執行系列自己動手一個執行(續)

(手機橫屏看原始碼更方便) 問題 (1)自己動手寫的執行緒池如何支援帶返回值的任務呢? (2)如果任務執行的過程中丟擲異常了該

java同步系列自己動手一個鎖Lock

問題 (1)自己動手寫一個鎖需要哪些知識? (2)自己動手寫一個鎖到底有多簡單? (3)自己能不能寫出來一個完美的鎖? 簡介 本篇文章的目標一是自己動手寫一個鎖,這個鎖的功能很簡單,能進行正常的加鎖、解鎖操作。 本篇文章的目標二是通過自己動手寫一個鎖,能更好地理解後面章節將要學習的AQS及各種同步器實現的原理

Java 併發程式設計系列帶你瞭解多執行

早期的計算機不包含作業系統,它們從頭到尾執行一個程式,這個程式可以訪問計算機中的所有資源。在這種情況下,每次都只能執行一個程式,對於昂貴的計算機資源來說是一種嚴重的浪費。 作業系統出現後,計算機可以執行多個程式,不同的程式在單獨的程序中執行。作業系統負責為各個獨

java concurrent包系列(一)從樂觀鎖、悲觀鎖到AtomicInteger的CAS演算法

前言 Java中有各式各樣的鎖,主流的鎖和概念如下: 這篇文章主要是為了讓大家通過樂觀鎖和悲觀鎖出發,理解CAS演算法,因為CAS是整個Concurrent包的基礎。 樂觀鎖和悲觀鎖 首先,java和資料庫中都有這種概念,他是一種從執行緒同步的角度上看的一種廣義上的概念: 悲觀鎖:悲觀的認為自己在使用資料的

java concurrent包系列(三)基於ReentrantLock理解AQS的條件佇列

基於Codition分析AQS的條件佇列 前言 上一篇我們講了AQS中的同步佇列佇列,現在我們研究一下條件佇列。 在java中最常見的加鎖方式就是synchorinzed和Reentrantlock,我們都說Reentrantlock比synchorinzed更加靈活,其實就靈活在Reentrantlock中

java concurrent包系列(五)基於AQS的條件佇列把LinkedBlockingQueue“扒光”

LinkedBlockingQueue的基礎 LinkedBlockingQueue是一個基於連結串列的阻塞佇列,實際使用上與ArrayBlockingQueue完全一樣,我們只需要把之前烤雞的例子中的Queue物件替換一下即可。如果對於ArrayBlockingQueue不熟悉,可以去看看https://

java concurrent包系列(六)基於AQS解析訊號量Semaphore

Semaphore 之前分析AQS的時候,內部有兩種模式,獨佔模式和共享模式,前面的ReentrantLock都是使用獨佔模式,而Semaphore同樣作為一個基於AQS實現的併發元件,它是基於共享模式實現的,我們先看看它的使用場景 Semaphore共享鎖的基本使用 假設有20個人去銀行櫃面辦理業務,

java併發程式設計系列ReadWriteLock讀鎖的使用

前面我們講解了Lock的使用,下面我們來講解一下ReadWriteLock鎖的使用,顧明思義,讀寫鎖在讀的時候,上讀鎖,在寫的時候,上寫鎖,這樣就很巧妙的解決synchronized的一個性能問題:讀與讀之間互斥。 ReadWriteLock也是一個介面,原型如下: pub

3、《SSO CAS單點系列 自己動手實現一個屬於自己的SSO認證伺服器!

上篇《實現一個SSO認證伺服器是這樣的》中,我們詳細講述了實現SSO的基本思路,本篇我們按照這個思路,親自動手實現一個輕量級的SSO認證中心。除了認證中心,我們還要改造系統應用的登入登出部分,使之與認證中心互動,共同完成SSO。因此我們的實現分成兩大部分,一個是SSO Ser

自己動手SQL執行引擎

# 自己動手寫SQL執行引擎 ## 前言 在閱讀了大量關於資料庫的資料後,筆者情不自禁產生了一個造資料庫輪子的想法。來驗證一下自己對於資料庫底層原理的掌握是否牢靠。在筆者的github中給這個database起名為Freedom。 ## 整體結構 既然造輪子,那當然得從前端的網路協議互動到後端的檔

自己動手一個自動登錄腳本gg

簡單 只需要 自己 不同 enum -s class rep 使用 1.下載一個sshpass工具 2.安裝sshpass,安裝到tools文件夾 3.把tools文件夾的路徑加入到/etc/bashrc vim /etc/bashrc

自己動手一個單鏈表

兩個指針 isl linklist nextn mob 內部 數組 nds pty 文章有不當之處,歡迎指正,如果喜歡微信閱讀,你也可以關註我的微信公眾號:好好學java,獲取優質學習資源。 一、概述 單向鏈表(單鏈表)是鏈表的一種,其特點是鏈表的鏈接方向是單向的,對鏈表

【原創】自己動手一個服務網關

exception 負責 lis world 前置 create ble ddr load 引言 什麽是網關?為什麽需要使用網關? 如圖所示,在不使用網關的情況下,我們的服務是直接暴露給服務調用方。當調用方增多,勢必需要添加定制化訪問權限、校驗等邏輯。當添加API網關後,

自己動手一個小型的TCP/IP協議

TCP/IP協議大家都知道,但真正理解的人不多,不如動手寫一個小型的看看。 我知道看書很枯燥,看不懂,還打擊大家的信心,不是我們的腦袋不如人,是我們的方法錯了。 一切的技術都從應用中發展而來,所以要從下往上走,先動手完成一個任務吧。 需要準備的前提知識 linux驅動程式知識

自己動手一個能操作redis的客戶端

引言 redis大家在專案中經常會使用到。官網也提供了多語言的客戶端供大家操作redis,如下圖所示 但是,大家有思考過,這些語言操作redis背後的原理麼?其實,某些大神會說 只要按照redis的協議,傳送指定資料給redis,監聽返回值即可。 確實,

自己動手一個Vue外掛(MD.7)

造不完的輪子,封不完的外掛。網上什麼都有,但是有那找的功夫,自己都寫完了。漫島仍然在向前推進,只是你們看不到最新的更新內容了,剩餘的不會展示,等以後上線了再去看把。 講一下如何寫一個的Vue外掛,(以一個極其簡單的loading效果為例),會了這個其他不愁。 第一步,在compon

較底層程式設計:自己動手一個C語言編譯器

  今天呢,我們就來自己動手編寫一個編譯器,學習一下較為底層的程式設計方式,是一種學習計算機到底是如何工作的非常有效方法。 編譯器通常被看作是十分複雜的工程。事實上,編寫一個產品級的編譯器也確實是一個龐大的任務。但是寫一個小巧可用的編譯器卻不是這麼困難。 祕訣就是首先去找到一個

自己動手一個輕量級的Android網路請求框架

最近有空在看《App研發錄》一書,良心之作。書中第一部分第二章節講了不少關於網路底層封裝的知識,看後覺得學到了不少乾貨。 索性自己也動手完成了一個非常輕量級的網路請求框架,從該書中獲得了不少幫助。特此記錄,回顧一下思路,整理收穫。OK,一起來看。 就如書中所

自己動手一個“tomcat”

很多初學或將學java web的朋友總是被一系列異於常規java project的流程結構所困惑,搞不清事情的本質,這裡就以最簡單的方式來讓初出茅廬的新手對java web專案有個清晰明瞭的認識。 學java web的必定先行學過java基礎,眾所周知,java專案運行於一