前言

在即將釋出的 .NET 6 runtime 中,預設的執行緒池實現從 C++ 程式碼改為了 C#,更方便我們學習執行緒池的設計了。

https://github.com/dotnet/runtime/tree/release/6.0/src/libraries/System.Threading.ThreadPool

新的執行緒池實現位於 PortableThreadPool 中,原 ThreadPool 中的對外公開的介面會直接呼叫 PortableThreadPool 中的實現。

通過設定環境變數 ThreadPool_UsePortableThreadPool 為 0 可以設定成使用老的執行緒池實現。

https://github.com/dotnet/runtime/pull/43841/commits/b0d47b84a6845a70f011d1b0d3ce5adde9a4d7b7

本文以 .NET 6 runtime 原始碼作為學習材料,對執行緒池的設計進行介紹。從目前的理解上來看,其整體的設計與原來 C++ 的實現並沒有特別大的出入。

注意:

  • 本文不涉及細節的程式碼實現,主要為大家介紹其整體設計。所展示的程式碼並非原封不動的原始碼,而是為了方便理解的簡化版。
  • ThreadPool.SetMaxThreads(int workerThreads, int completionPortThreads) 中的 completionPortThreads 所相關的 IOCP執行緒池 是 .NET Framework 時代的遺留產物,用於管理 Windows 平臺專有的 IOCP 的回撥執行緒池。目前沒看到有什麼地方在用它了,completionPortThreads 這個引數也已經沒有意義,底層IO庫是自己維護的IO等待執行緒池。本文只涉及 worker thread 池的介紹。
  • 本文理解並不完整也不一定完全正確,有異議的地方歡迎留言討論。
  • 為了解釋問題,一部分程式碼會執行在 .NET 6 之前的環境中。

任務的排程

執行緒池的待執行任務被存放在一個佇列系統中。這個系統包括一個 全域性佇列,以及繫結在每一個 Worker Thread 上 的 本地佇列 。而執行緒池中的每一個執行緒都在執行 while(true) 的迴圈,從這個佇列系統中領取並執行任務。

ThreadPool.QueueUserWorkItem 的過載方法 ThreadPool.QueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal) 裡有一個 preferLocal 引數。

  • 呼叫不帶 preferLocal 引數的 ThreadPool.QueueUserWorkItem 方法過載,任務會被放到全域性佇列。

  • preferLocal 為 true 的時候,如果呼叫 ThreadPool.QueueUserWorkItem 程式碼的執行緒正好是個執行緒池裡的某個執行緒,則該任務就會進入該執行緒的本地佇列中。除此之外的情況則會被放到全域性佇列中等待未來被某個 Worker Thread 撿走。

  • 線上程池外的執行緒中呼叫,不管 preferLocal 傳的是什麼,任務都會被放到全域性佇列。

基本排程單元

本地佇列和全域性佇列的元素型別被定義為 object,實際的任務型別分為兩類,在從佇列系統取到任務之後會判斷型別並執行對應的方法。

IThreadPoolWorkItem 實現類的例項。

/// <summary>Represents a work item that can be executed by the ThreadPool.</summary>
public interface IThreadPoolWorkItem
{
void Execute();
}

執行 Execute 方法也就代表著任務的執行。

IThreadPoolWorkItem 的具體實現有很多,例如通過 ThreadPool.QueueUserWorkItem(WaitCallback callBack) 傳入的 callBack 委託例項會被包裝到一個 QueueUserWorkItemCallback 例項裡。QueueUserWorkItemCallbackIThreadPoolWorkItem 的實現類。

Task

class Task
{
internal void InnerInvoke();
}

執行 InnerInvoke 會執行 Task 所包含的委託。

全域性佇列

全域性佇列 是由 ThreadPoolWorkQueue 維護的,同時它也是整個佇列系統的入口,直接被 ThreadPool 所引用。

public static class ThreadPool
{
internal static readonly ThreadPoolWorkQueue s_workQueue = new ThreadPoolWorkQueue(); public static bool QueueUserWorkItem(WaitCallback callBack, object state)
{
object tpcallBack = new QueueUserWorkItemCallback(callBack!, state); s_workQueue.Enqueue(tpcallBack, forceGlobal: true); return true;
}
} internal sealed class ThreadPoolWorkQueue
{
// 全域性佇列
internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>(); // forceGlobal 為 true 時,push 到全域性佇列,否則就放到本地佇列
public void Enqueue(object callback, bool forceGlobal);
}

本地佇列

執行緒池中的每一個執行緒都會繫結一個 ThreadPoolWorkQueueThreadLocals 例項,在 workStealingQueue 這個欄位上儲存著本地佇列。

internal sealed class ThreadPoolWorkQueueThreadLocals
{
// 繫結線上程池執行緒上
[ThreadStatic]
public static ThreadPoolWorkQueueThreadLocals threadLocals; // 持有全域性佇列的引用,以便能在需要的時候將任務轉移到全域性佇列上
public readonly ThreadPoolWorkQueue workQueue;
// 本地佇列的直接維護者
public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
public readonly Thread currentThread; public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
{
workQueue = tpq;
workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue();
// WorkStealingQueueList 會集中管理 workStealingQueue
ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
currentThread = Thread.CurrentThread;
} // 提供將本地佇列中的任務轉移到全域性佇列中去的功能,
// 當 ThreadPool 通過後文將會介紹的 HillClimbing 演算法判斷得出當前執行緒是多餘的執行緒後,
// 會呼叫此方法對任務進行轉移
public void TransferLocalWork()
{
while (workStealingQueue.LocalPop() is object cb)
{
workQueue.Enqueue(cb, forceGlobal: true);
}
} ~ThreadPoolWorkQueueThreadLocals()
{
if (null != workStealingQueue)
{
// TransferLocalWork 真正的目的並非是為了在這裡被呼叫,這邊只是確保任務不會丟的 fallback 邏輯
TransferLocalWork();
ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
}
}
}

偷竊機制

這裡思考一個問題,為什麼本地佇列的名字會被叫做 WorkStealingQueue 呢?

所有 Worker ThreadWorkStealingQueue 都被集中在 WorkStealingQueueList 中。對執行緒池中其他所有執行緒可見。

Worker Threadwhile(true) 中優先會從自身的 WorkStealingQueue 中取任務。如果本地佇列已經被清空,就會從全域性佇列中取任務。例如下圖的 Thread1 取全域性佇列中領取了一個任務。

同時 Thread3 也沒活幹了,但是全域性佇列中的任務被 Thread1 搶走了。這時候就會去 從 Thread2 的本地佇列中搶 Thread2 的活。

Worker Thread 的生命週期管理

接下來我們把格局放大,關注點從 Worker Thread 的打工日常轉移到對它們的生命週期管理上來。

為了更方便的解釋執行緒管理的機制,這邊使用下面使用一些程式碼做演示。

程式碼參考自 https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#threading。

執行緒生命注入實驗

Task.Run 會將 Task 排程到執行緒池中執行,下面的示例程式碼中等效於 ThreadPool.QueueUserWorkItem(WaitCallback callBack),會把 Task 放到佇列系統的全域性佇列中(順便一提,如果在一個執行緒池執行緒中執行 Task.Run 會將 Task 排程到此執行緒池執行緒的本地佇列中)。

.NET 5 實驗一 預設執行緒池配置

static void Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tcs = new TaskCompletionSource();
var tasks = new List<Task>();
for (int i = 1; i <= Environment.ProcessorCount * 2; i++)
{
int id = i;
Console.WriteLine($"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
tcs.Task.Wait();
}));
} tasks.Add(Task.Run(() =>
{
Console.WriteLine($"Task SetResult | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
tcs.SetResult();
}));
Task.WaitAll(tasks.ToArray());
Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}");
} static int GetBusyThreads()
{
ThreadPool.GetAvailableThreads(out var available, out _);
ThreadPool.GetMaxThreads(out var max, out _);
return max - available;
}

首先在程式碼在 .NET 5 環境中執行以下程式碼,CPU 邏輯核心數 12。

Loop Id: 01    | 0.000 | Busy Threads: 0
Loop Id: 02 | 0.112 | Busy Threads: 1
Loop Id: 03 | 0.112 | Busy Threads: 2
Loop Id: 04 | 0.113 | Busy Threads: 4
Loop Id: 05 | 0.113 | Busy Threads: 7
Loop Id: 06 | 0.113 | Busy Threads: 10
Loop Id: 07 | 0.113 | Busy Threads: 10
Task Id: 01 | 0.113 | Busy Threads: 11
Task Id: 02 | 0.113 | Busy Threads: 12
Task Id: 03 | 0.113 | Busy Threads: 12
Task Id: 07 | 0.113 | Busy Threads: 12
Task Id: 04 | 0.113 | Busy Threads: 12
Task Id: 05 | 0.113 | Busy Threads: 12
Loop Id: 08 | 0.113 | Busy Threads: 10
Task Id: 08 | 0.113 | Busy Threads: 12
Loop Id: 09 | 0.113 | Busy Threads: 11
Loop Id: 10 | 0.113 | Busy Threads: 12
Loop Id: 11 | 0.114 | Busy Threads: 12
Loop Id: 12 | 0.114 | Busy Threads: 12
Loop Id: 13 | 0.114 | Busy Threads: 12
Loop Id: 14 | 0.114 | Busy Threads: 12
Loop Id: 15 | 0.114 | Busy Threads: 12
Loop Id: 16 | 0.114 | Busy Threads: 12
Loop Id: 17 | 0.114 | Busy Threads: 12
Loop Id: 18 | 0.114 | Busy Threads: 12
Loop Id: 19 | 0.114 | Busy Threads: 12
Loop Id: 20 | 0.114 | Busy Threads: 12
Loop Id: 21 | 0.114 | Busy Threads: 12
Loop Id: 22 | 0.114 | Busy Threads: 12
Loop Id: 23 | 0.114 | Busy Threads: 12
Loop Id: 24 | 0.114 | Busy Threads: 12
Task Id: 09 | 0.114 | Busy Threads: 12
Task Id: 06 | 0.114 | Busy Threads: 12
Task Id: 10 | 0.114 | Busy Threads: 12
Task Id: 11 | 0.114 | Busy Threads: 12
Task Id: 12 | 0.114 | Busy Threads: 12
Task Id: 13 | 1.091 | Busy Threads: 13
Task Id: 14 | 1.594 | Busy Threads: 14
Task Id: 15 | 2.099 | Busy Threads: 15
Task Id: 16 | 3.102 | Busy Threads: 16
Task Id: 17 | 3.603 | Busy Threads: 17
Task Id: 18 | 4.107 | Busy Threads: 18
Task Id: 19 | 4.611 | Busy Threads: 19
Task Id: 20 | 5.113 | Busy Threads: 20
Task Id: 21 | 5.617 | Busy Threads: 21
Task Id: 22 | 6.122 | Busy Threads: 22
Task Id: 23 | 7.128 | Busy Threads: 23
Task Id: 24 | 7.632 | Busy Threads: 24
Task SetResult | 8.135 | Busy Threads: 25
Done: | 8.136

Task.Run 會把 Task 排程到執行緒池上執行,前 24 個 task 都會被阻塞住,直到第 25 個被執行。每次都會打印出當前執行緒池中正在執行任務的執行緒數(也就是建立完成的執行緒數)。

可以觀察到以下結果:

  • 前幾次迴圈,執行緒隨著 Task 數量遞增,後面幾次迴圈直到迴圈結束為止,執行緒數一直維持在 12 沒有發生變化。
  • 執行緒數在達到 12 之前,零間隔時間增加。第 12 到 第 13 執行緒間隔 1s 不到,往後約 500ms 增加一個執行緒。

.NET 5 實驗二 調整 ThreadPool 設定

在上面的程式碼最前面加入以下兩行程式碼,繼續在 .NET 5 環境執行一次。

ThreadPool.GetMinThreads(out int defaultMinThreads, out int completionPortThreads);
Console.WriteLine($"DefaultMinThreads: {defaultMinThreads}");
ThreadPool.SetMinThreads(14, completionPortThreads);

執行結果如下

DefaultMinThreads: 12
Loop Id: 01 | 0.000 | Busy Threads: 0
Loop Id: 02 | 0.003 | Busy Threads: 1
Loop Id: 03 | 0.003 | Busy Threads: 2
Loop Id: 04 | 0.003 | Busy Threads: 5
Loop Id: 05 | 0.004 | Busy Threads: 8
Task Id: 01 | 0.004 | Busy Threads: 10
Task Id: 03 | 0.004 | Busy Threads: 10
Loop Id: 06 | 0.004 | Busy Threads: 10
Task Id: 02 | 0.004 | Busy Threads: 10
Task Id: 04 | 0.004 | Busy Threads: 10
Task Id: 05 | 0.004 | Busy Threads: 12
Loop Id: 07 | 0.004 | Busy Threads: 9
Loop Id: 08 | 0.004 | Busy Threads: 10
Loop Id: 09 | 0.004 | Busy Threads: 11
Loop Id: 10 | 0.004 | Busy Threads: 12
Task Id: 08 | 0.004 | Busy Threads: 14
Task Id: 06 | 0.004 | Busy Threads: 14
Task Id: 09 | 0.004 | Busy Threads: 14
Task Id: 10 | 0.004 | Busy Threads: 14
Loop Id: 11 | 0.004 | Busy Threads: 14
Loop Id: 12 | 0.004 | Busy Threads: 14
Loop Id: 13 | 0.004 | Busy Threads: 14
Loop Id: 14 | 0.004 | Busy Threads: 14
Loop Id: 15 | 0.004 | Busy Threads: 14
Loop Id: 16 | 0.004 | Busy Threads: 14
Loop Id: 17 | 0.004 | Busy Threads: 14
Loop Id: 18 | 0.004 | Busy Threads: 14
Loop Id: 19 | 0.004 | Busy Threads: 14
Loop Id: 20 | 0.004 | Busy Threads: 14
Loop Id: 21 | 0.004 | Busy Threads: 14
Loop Id: 22 | 0.004 | Busy Threads: 14
Task Id: 11 | 0.004 | Busy Threads: 14
Loop Id: 23 | 0.004 | Busy Threads: 14
Loop Id: 24 | 0.005 | Busy Threads: 14
Task Id: 07 | 0.005 | Busy Threads: 14
Task Id: 12 | 0.005 | Busy Threads: 14
Task Id: 13 | 0.005 | Busy Threads: 14
Task Id: 14 | 0.005 | Busy Threads: 14
Task Id: 15 | 0.982 | Busy Threads: 15
Task Id: 16 | 1.486 | Busy Threads: 16
Task Id: 17 | 1.991 | Busy Threads: 17
Task Id: 18 | 2.997 | Busy Threads: 18
Task Id: 19 | 3.501 | Busy Threads: 19
Task Id: 20 | 4.004 | Busy Threads: 20
Task Id: 21 | 4.509 | Busy Threads: 21
Task Id: 22 | 5.014 | Busy Threads: 22
Task Id: 23 | 5.517 | Busy Threads: 23
Task Id: 24 | 6.021 | Busy Threads: 24
Task SetResult | 6.522 | Busy Threads: 25
Done: | 6.523

在調整完執行緒池的最小執行緒數量之後,執行緒注入速度發生轉折的時間點從第 12(預設min threads) 個執行緒換到了第 14(修改後的min threads)個執行緒。

整體時間也從 8s 縮到 6s。

.NET 5 實驗三 tcs.Task.Wait() 改為 Thread.Sleep

static void Main(string[] args)
{
var sw = Stopwatch.StartNew();
var tasks = new List<Task>();
for (int i = 1; i <= Environment.ProcessorCount * 2; i++)
{
int id = i;
Console.WriteLine(
$"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
tasks.Add(Task.Run(() =>
{
Console.WriteLine(
$"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}");
Thread.Sleep(Environment.ProcessorCount * 1000);
}));
} Task.WhenAll(tasks.ToArray()).ContinueWith(_ =>
{
Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}");
});
Console.ReadLine();
}
Loop Id: 01    | 0.000 | Busy Threads: 0
Loop Id: 02 | 0.027 | Busy Threads: 1
Loop Id: 03 | 0.027 | Busy Threads: 2
Loop Id: 04 | 0.027 | Busy Threads: 3
Loop Id: 05 | 0.028 | Busy Threads: 4
Loop Id: 06 | 0.028 | Busy Threads: 10
Loop Id: 07 | 0.028 | Busy Threads: 9
Loop Id: 08 | 0.028 | Busy Threads: 9
Loop Id: 09 | 0.028 | Busy Threads: 10
Loop Id: 10 | 0.028 | Busy Threads: 12
Loop Id: 11 | 0.028 | Busy Threads: 12
Loop Id: 12 | 0.028 | Busy Threads: 12
Loop Id: 13 | 0.028 | Busy Threads: 12
Loop Id: 14 | 0.028 | Busy Threads: 12
Loop Id: 15 | 0.028 | Busy Threads: 12
Loop Id: 16 | 0.028 | Busy Threads: 12
Loop Id: 17 | 0.028 | Busy Threads: 12
Loop Id: 18 | 0.028 | Busy Threads: 12
Loop Id: 19 | 0.028 | Busy Threads: 12
Loop Id: 20 | 0.028 | Busy Threads: 12
Loop Id: 21 | 0.028 | Busy Threads: 12
Loop Id: 22 | 0.028 | Busy Threads: 12
Loop Id: 23 | 0.028 | Busy Threads: 12
Loop Id: 24 | 0.028 | Busy Threads: 12
Task Id: 01 | 0.029 | Busy Threads: 12
Task Id: 05 | 0.029 | Busy Threads: 12
Task Id: 03 | 0.029 | Busy Threads: 12
Task Id: 08 | 0.029 | Busy Threads: 12
Task Id: 09 | 0.029 | Busy Threads: 12
Task Id: 10 | 0.029 | Busy Threads: 12
Task Id: 06 | 0.029 | Busy Threads: 12
Task Id: 11 | 0.029 | Busy Threads: 12
Task Id: 12 | 0.029 | Busy Threads: 12
Task Id: 04 | 0.029 | Busy Threads: 12
Task Id: 02 | 0.029 | Busy Threads: 12
Task Id: 07 | 0.029 | Busy Threads: 12
Task Id: 13 | 1.018 | Busy Threads: 13
Task Id: 14 | 1.522 | Busy Threads: 14
Task Id: 15 | 2.025 | Busy Threads: 15
Task Id: 16 | 2.530 | Busy Threads: 16
Task Id: 17 | 3.530 | Busy Threads: 17
Task Id: 18 | 4.035 | Busy Threads: 18
Task Id: 19 | 4.537 | Busy Threads: 19
Task Id: 20 | 5.040 | Busy Threads: 20
Task Id: 21 | 5.545 | Busy Threads: 21
Task Id: 22 | 6.048 | Busy Threads: 22
Task Id: 23 | 7.049 | Busy Threads: 23
Task Id: 24 | 8.056 | Busy Threads: 24
Done: | 20.060

達到 min threads (預設12)之後,執行緒注入速度明顯變慢,最快間隔 500ms。

.NET 6 實驗一 預設 ThreadPool 設定

將 .NET 5 實驗一的程式碼在 .NET 6 執行一次

Loop Id: 01    | 0.001 | Busy Threads: 0
Loop Id: 02 | 0.018 | Busy Threads: 1
Loop Id: 03 | 0.018 | Busy Threads: 3
Loop Id: 04 | 0.018 | Busy Threads: 6
Loop Id: 05 | 0.018 | Busy Threads: 4
Loop Id: 06 | 0.018 | Busy Threads: 5
Loop Id: 07 | 0.018 | Busy Threads: 6
Loop Id: 08 | 0.018 | Busy Threads: 8
Task Id: 01 | 0.018 | Busy Threads: 11
Task Id: 04 | 0.018 | Busy Threads: 11
Task Id: 03 | 0.018 | Busy Threads: 11
Task Id: 02 | 0.018 | Busy Threads: 11
Task Id: 05 | 0.018 | Busy Threads: 11
Loop Id: 09 | 0.018 | Busy Threads: 12
Loop Id: 10 | 0.018 | Busy Threads: 12
Loop Id: 11 | 0.018 | Busy Threads: 12
Loop Id: 12 | 0.018 | Busy Threads: 12
Loop Id: 13 | 0.018 | Busy Threads: 12
Task Id: 09 | 0.018 | Busy Threads: 12
Loop Id: 14 | 0.018 | Busy Threads: 12
Loop Id: 15 | 0.018 | Busy Threads: 12
Loop Id: 16 | 0.018 | Busy Threads: 12
Loop Id: 17 | 0.018 | Busy Threads: 12
Task Id: 06 | 0.018 | Busy Threads: 12
Loop Id: 18 | 0.018 | Busy Threads: 12
Loop Id: 19 | 0.018 | Busy Threads: 12
Loop Id: 20 | 0.018 | Busy Threads: 12
Loop Id: 21 | 0.018 | Busy Threads: 12
Loop Id: 22 | 0.018 | Busy Threads: 12
Loop Id: 23 | 0.018 | Busy Threads: 12
Loop Id: 24 | 0.018 | Busy Threads: 12
Task Id: 10 | 0.018 | Busy Threads: 12
Task Id: 07 | 0.019 | Busy Threads: 12
Task Id: 11 | 0.019 | Busy Threads: 12
Task Id: 08 | 0.019 | Busy Threads: 12
Task Id: 12 | 0.019 | Busy Threads: 12
Task Id: 13 | 0.020 | Busy Threads: 16
Task Id: 14 | 0.020 | Busy Threads: 17
Task Id: 15 | 0.020 | Busy Threads: 18
Task Id: 16 | 0.020 | Busy Threads: 19
Task Id: 17 | 0.020 | Busy Threads: 20
Task Id: 18 | 0.020 | Busy Threads: 21
Task Id: 19 | 0.020 | Busy Threads: 22
Task Id: 20 | 0.020 | Busy Threads: 23
Task Id: 21 | 0.020 | Busy Threads: 24
Task Id: 23 | 0.020 | Busy Threads: 24
Task Id: 22 | 0.020 | Busy Threads: 24
Task Id: 24 | 0.020 | Busy Threads: 24
Task SetResult | 0.045 | Busy Threads: 25
Done: | 0.046

與實驗一相比,雖然執行緒數仍然停留在 12 了一段時間,但隨後執行緒就立即增長了,後文會介紹 .NET 6 在這方面做出的改進。

.NET 6 實驗二 調整 ThreadPool 設定

將 .NET 5 實驗二的程式碼在 .NET 6 中執行一次

DefaultMinThreads: 12
Loop Id: 01 | 0.001 | Busy Threads: 0
Loop Id: 02 | 0.014 | Busy Threads: 1
Loop Id: 03 | 0.014 | Busy Threads: 2
Loop Id: 04 | 0.015 | Busy Threads: 5
Loop Id: 05 | 0.015 | Busy Threads: 4
Loop Id: 06 | 0.015 | Busy Threads: 5
Loop Id: 07 | 0.015 | Busy Threads: 7
Loop Id: 08 | 0.015 | Busy Threads: 8
Loop Id: 09 | 0.015 | Busy Threads: 11
Task Id: 06 | 0.015 | Busy Threads: 9
Task Id: 01 | 0.015 | Busy Threads: 9
Task Id: 02 | 0.015 | Busy Threads: 9
Task Id: 05 | 0.015 | Busy Threads: 9
Task Id: 03 | 0.015 | Busy Threads: 9
Task Id: 04 | 0.015 | Busy Threads: 9
Task Id: 07 | 0.015 | Busy Threads: 9
Task Id: 08 | 0.016 | Busy Threads: 9
Task Id: 09 | 0.016 | Busy Threads: 9
Loop Id: 10 | 0.016 | Busy Threads: 9
Loop Id: 11 | 0.016 | Busy Threads: 10
Loop Id: 12 | 0.016 | Busy Threads: 11
Loop Id: 13 | 0.016 | Busy Threads: 13
Task Id: 10 | 0.016 | Busy Threads: 14
Loop Id: 14 | 0.016 | Busy Threads: 14
Loop Id: 15 | 0.016 | Busy Threads: 14
Loop Id: 16 | 0.016 | Busy Threads: 14
Task Id: 11 | 0.016 | Busy Threads: 14
Loop Id: 17 | 0.016 | Busy Threads: 14
Loop Id: 18 | 0.016 | Busy Threads: 14
Loop Id: 19 | 0.016 | Busy Threads: 14
Loop Id: 20 | 0.016 | Busy Threads: 14
Loop Id: 21 | 0.016 | Busy Threads: 14
Loop Id: 22 | 0.016 | Busy Threads: 14
Loop Id: 23 | 0.016 | Busy Threads: 14
Loop Id: 24 | 0.016 | Busy Threads: 14
Task Id: 12 | 0.016 | Busy Threads: 14
Task Id: 13 | 0.016 | Busy Threads: 14
Task Id: 14 | 0.016 | Busy Threads: 14
Task Id: 15 | 0.017 | Busy Threads: 18
Task Id: 16 | 0.017 | Busy Threads: 19
Task Id: 17 | 0.017 | Busy Threads: 20
Task Id: 18 | 0.017 | Busy Threads: 21
Task Id: 19 | 0.017 | Busy Threads: 22
Task Id: 20 | 0.018 | Busy Threads: 23
Task Id: 21 | 0.018 | Busy Threads: 24
Task Id: 22 | 0.018 | Busy Threads: 25
Task Id: 23 | 0.018 | Busy Threads: 26
Task Id: 24 | 0.018 | Busy Threads: 26
Task SetResult | 0.018 | Busy Threads: 25
Done: | 0.019

前半部分有部分日誌亂序,可以看到,與實驗三一樣,維持在最大執行緒數一小段時間之後,立即就開始了執行緒增長。

.NET 6 實驗三 tcs.Task.Wait() 改為 Thread.Sleep

將 .NET 5 實驗三的程式碼在 .NET 6 中執行一次

Loop Id: 01    | 0.003 | Busy Threads: 0
Loop Id: 02 | 0.024 | Busy Threads: 1
Loop Id: 03 | 0.025 | Busy Threads: 2
Loop Id: 04 | 0.025 | Busy Threads: 3
Loop Id: 05 | 0.025 | Busy Threads: 7
Loop Id: 06 | 0.025 | Busy Threads: 5
Loop Id: 07 | 0.025 | Busy Threads: 6
Loop Id: 08 | 0.025 | Busy Threads: 7
Loop Id: 09 | 0.025 | Busy Threads: 9
Loop Id: 10 | 0.025 | Busy Threads: 10
Loop Id: 11 | 0.026 | Busy Threads: 10
Loop Id: 12 | 0.026 | Busy Threads: 11
Loop Id: 13 | 0.026 | Busy Threads: 12
Loop Id: 14 | 0.026 | Busy Threads: 12
Loop Id: 15 | 0.026 | Busy Threads: 12
Loop Id: 16 | 0.026 | Busy Threads: 12
Loop Id: 17 | 0.026 | Busy Threads: 12
Loop Id: 18 | 0.026 | Busy Threads: 12
Loop Id: 19 | 0.026 | Busy Threads: 12
Loop Id: 20 | 0.026 | Busy Threads: 12
Loop Id: 21 | 0.026 | Busy Threads: 12
Loop Id: 22 | 0.026 | Busy Threads: 12
Loop Id: 23 | 0.026 | Busy Threads: 12
Loop Id: 24 | 0.026 | Busy Threads: 12
Task Id: 01 | 0.026 | Busy Threads: 12
Task Id: 02 | 0.026 | Busy Threads: 12
Task Id: 05 | 0.026 | Busy Threads: 12
Task Id: 04 | 0.026 | Busy Threads: 12
Task Id: 06 | 0.026 | Busy Threads: 12
Task Id: 08 | 0.026 | Busy Threads: 12
Task Id: 09 | 0.026 | Busy Threads: 12
Task Id: 03 | 0.026 | Busy Threads: 12
Task Id: 11 | 0.026 | Busy Threads: 12
Task Id: 10 | 0.026 | Busy Threads: 12
Task Id: 07 | 0.026 | Busy Threads: 12
Task Id: 12 | 0.026 | Busy Threads: 12
Task Id: 13 | 1.026 | Busy Threads: 13
Task Id: 14 | 2.027 | Busy Threads: 14
Task Id: 15 | 3.028 | Busy Threads: 15
Task Id: 16 | 4.030 | Busy Threads: 16
Task Id: 17 | 5.031 | Busy Threads: 17
Task Id: 18 | 6.032 | Busy Threads: 18
Task Id: 19 | 6.533 | Busy Threads: 19
Task Id: 20 | 7.035 | Busy Threads: 20
Task Id: 21 | 8.036 | Busy Threads: 21
Task Id: 22 | 8.537 | Busy Threads: 22
Task Id: 23 | 9.538 | Busy Threads: 23
Task Id: 24 | 10.039 | Busy Threads: 24
Done: | 22.041

結果與 .NET 5 的實驗三相差不大。

執行緒注入

對照上述的幾組實驗結果,接下來以 .NET 6 中 C# 實現的 ThreadPool 作為資料來理解一下執行緒注入的幾個階段(按個人理解進行的劃分,僅供參考)。

1. 第一個執行緒的出現

隨著任務被排程到佇列上,第一個執行緒被創建出來。

下面是執行緒池在執行第一個任務的時候的程式碼摘要,涉及到計數的並執行相關處理的地方,程式碼都使用了 while(xxx) + Interlocked 的方式來進行併發控制,可以理解成樂觀鎖。這一階段,實際上我們只需要關注到 ThreadPoolWorkQueue.EnsureThreadRequested 方法就行了。

可利用 Rider 的反編譯 Debug 功能幫助我們學習。

下面是第一個 Task.Run 的程式碼執行路徑



注意:執行環節是 Main Thread

public static class ThreadPool
{
internal static readonly ThreadPoolWorkQueue s_workQueue = new ThreadPoolWorkQueue(); public static bool QueueUserWorkItem(WaitCallback callBack, object state)
{
object tpcallBack = new QueueUserWorkItemCallback(callBack!, state); s_workQueue.Enqueue(tpcallBack, forceGlobal: true); return true;
}
} internal sealed class ThreadPoolWorkQueue
{
[StructLayout(LayoutKind.Sequential)]
private struct CacheLineSeparated
{
private readonly Internal.PaddingFor32 pad1; public volatile int numOutstandingThreadRequests; private readonly Internal.PaddingFor32 pad2;
} private CacheLineSeparated _separated; public void Enqueue(object callback, bool forceGlobal)
{
// 執行緒池中執行的任務有兩種:IThreadPoolWorkItem、Task
Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task)); if (loggingEnabled && FrameworkEventSource.Log.IsEnabled())
FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback); ThreadPoolWorkQueueThreadLocals? tl = null;
if (!forceGlobal)
// 獲取本地佇列,如果執行改程式碼的執行緒不是執行緒池執行緒,
// 那這邊是獲取不到的,就算 forceGlobal 是 false,
// 也會把任務放到全域性佇列
tl = ThreadPoolWorkQueueThreadLocals.threadLocals; if (null != tl)
{
// 放到本地佇列
tl.workStealingQueue.LocalPush(callback);
}
else
{
// 當道全域性佇列
workItems.Enqueue(callback);
} EnsureThreadRequested();
} internal void EnsureThreadRequested()
{
//
// If we have not yet requested #procs threads, then request a new thread.
//
// CoreCLR: Note that there is a separate count in the VM which has already been incremented
// by the VM by the time we reach this point.
//
int count = _separated.numOutstandingThreadRequests;
while (count < Environment.ProcessorCount)
{
int prev = Interlocked.CompareExchange(ref _separated.numOutstandingThreadRequests, count + 1, count);
if (prev == count)
{
ThreadPool.RequestWorkerThread();
break;
}
count = prev;
}
} public static class ThreadPool
{ /// <summary>
/// This method is called to request a new thread pool worker to handle pending work.
/// </summary>
internal static void RequestWorkerThread() => PortableThreadPool.ThreadPoolInstance.RequestWorker();
} internal sealed class PortableThreadPool
{
public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool(); internal void RequestWorker()
{
// The order of operations here is important. MaybeAddWorkingWorker() and EnsureRunning() use speculative checks to
// do their work and the memory barrier from the interlocked operation is necessary in this case for correctness.
Interlocked.Increment(ref _separated.numRequestedWorkers);
WorkerThread.MaybeAddWorkingWorker(this);
// 初始化 GateThread
GateThread.EnsureRunning(this);
} /// <summary>
/// The worker thread infastructure for the CLR thread pool.
/// </summary>
private static class WorkerThread
{
internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance)
{
ThreadCounts counts = threadPoolInstance._separated.counts;
short numExistingThreads, numProcessingWork, newNumExistingThreads, newNumProcessingWork;
// 這個 while (true) 是確保計算出正確的待建立執行緒數
while (true)
{
numProcessingWork = counts.NumProcessingWork;
if (numProcessingWork >= counts.NumThreadsGoal)
{
return;
} newNumProcessingWork = (short)(numProcessingWork + 1);
numExistingThreads = counts.NumExistingThreads;
newNumExistingThreads = Math.Max(numExistingThreads, newNumProcessingWork); ThreadCounts newCounts = counts;
newCounts.NumProcessingWork = newNumProcessingWork;
newCounts.NumExistingThreads = newNumExistingThreads; ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts); if (oldCounts == counts)
{
break;
} counts = oldCounts;
} int toCreate = newNumExistingThreads - numExistingThreads;
int toRelease = newNumProcessingWork - numProcessingWork; if (toRelease > 0)
{
s_semaphore.Release(toRelease);
} while (toCreate > 0)
{
if (TryCreateWorkerThread())
{
toCreate--;
continue;
} counts = threadPoolInstance._separated.counts;
while (true)
{
ThreadCounts newCounts = counts;
newCounts.SubtractNumProcessingWork((short)toCreate);
newCounts.SubtractNumExistingThreads((short)toCreate); ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
break;
}
counts = oldCounts;
}
break;
}
} private static bool TryCreateWorkerThread()
{
try
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc
workerThread.UnsafeStart();
}
catch (ThreadStartException)
{
return false;
}
catch (OutOfMemoryException)
{
return false;
} return true;
}
}
} }

2. 達到 min threads 之前的執行緒數增長

細心的朋友會發現上面程式碼裡 EnsureThreadRequested 方法有一個終止條件,_separated.numOutstandingThreadRequests == Environment.ProcessorCount,每次新增一個 ThreadRequested,這個數就會 +1,似乎允許建立的最大 Worker Thread 是 Environment.ProcessorCount?

其實 ThreadPoolWorkQueue 維護的 NumOutstandingThreadRequests 這個值會線上程池執行緒真正跑起來之後,會在 ThreadPoolWorkQueue.Dispatch方法中 -1。也就是說,只要有一個執行緒真正執行起來了,就能建立第 Environment.ProcessorCount + 1 個Thread。當然,在向 ThreadPoolWorkQueue 加入第13個任務的時候,第13個 Worker Thread 就算不允許建立也沒關係,因為任務已經入隊了,會被執行起來的 Worker Thread 取走。

min threads 初始值為 執行環境 CPU 核心數,可通過 ThreadPool.SetMinThreads 進行設定,引數有效範圍是 [1, max threads]。

PortableThreadPool裡維護了一個計數器 PortableThreadPool.ThreadPoolInstance._separated.counts,記錄了 Worker Thread 相關的三個數值:

  • NumProcessingWork:當前正在執行任務的 Worker Thread。
  • NumExistingThreads:當前執行緒池中實際有的 Worker Thread。
  • NumThreadsGoal:當前允許建立的最大 Worker Thread,初始值為 min threads
    internal class PortableThreadPool
{ public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool(); private CacheLineSeparated _separated; private struct CacheLineSeparated
{
public ThreadCounts counts;
} /// <summary>
/// Tracks information on the number of threads we want/have in different states in our thread pool.
/// </summary>
private struct ThreadCounts
{
/// <summary>
/// Number of threads processing work items.
/// </summary>
public short NumProcessingWork { get; set; } /// <summary>
/// Number of thread pool threads that currently exist.
/// </summary>
public short NumExistingThreads { get; set; } // <summary>
/// Max possible thread pool threads we want to have.
/// </summary>
public short NumThreadsGoal { get; set; }
}
}

3. 避免飢餓機制(Starvation Avoidance)

上面講到,隨著任務進入佇列系統,Worker Thread 將隨之增長,直到達到 NumThreadsGoal。

NumThreadsGoal 是12,前 12 個執行緒都被堵住了,加入到佇列系統的第 13 個任務沒辦法被這前 12 個執行緒領走執行。

在這種情況下,執行緒池的 Starvation Avoidance 機制就起到作用了。

在上述所說的第一個階段,除了執行緒池中的第一個執行緒會被建立之外,GateThread 也會隨之被初始化。在第一階段的程式碼摘錄中,可以看到 GateThread 的初始化。

internal sealed class PortableThreadPool
{
public static readonly PortableThreadPool ThreadPoolInstance = new PortableThreadPool(); internal void RequestWorker()
{
Interlocked.Increment(ref _separated.numRequestedWorkers);
WorkerThread.MaybeAddWorkingWorker(this);
// 初始化 GateThread
GateThread.EnsureRunning(this);
}
}

GateThread 是一個獨立的執行緒,每隔 500ms 進行檢查一下,如果 NumProcessingWork >= NumThreadsGoal(WorkerThread.MaybeAddWorkingWorker 不新增 Worker Thread 的判斷條件),就設定新的 NumThreadsGoal = NumProcessingWork + 1,並呼叫 WorkerThread.MaybeAddWorkingWorker,這樣新的 Worker Thread 就可以被 WorkerThread.MaybeAddWorkingWorker 建立。

這就解釋了,為什麼 .NET 5 實驗一、二線上程數達到min threads(NumThreadsGoal 的預設值)之後,後面 Worker Thread 的增長是每 500ms 一個。

由於在第三階段中,執行緒的增長會比較緩慢,有經驗的開發會在應用啟動的時候設定一個較大的 min threads,使其較晚或不進入第三階段。

執行緒注入在 .NET 6 中的改進

.NET 6 與 .NET 5 的實驗二相比,達到 min threads 之後,執行緒的增長速度有明顯的差異,而兩者的實驗三卻相差不大。

** .NET 6 對於 Task.Wait 導致執行緒池執行緒阻塞的場景進行了優化,但如果並非此原因導致的執行緒數不夠用,依舊是 Starvation Avoidance 的策略。**

新的 ThreadPool 提供了一個 ThreadPool.NotifyThreadBlocked 的內部介面,裡面會呼叫 GateThread.Wake 去喚醒 GateThread 本來 500ms 執行一次的邏輯,這 500ms 的間隔時間是通過 AutoResetEvent 實現的,所以 GateThread.Wake 也很簡單。

關鍵程式碼示意,非真實程式碼:

internal class PortableThreadPool
{
public bool NotifyThreadBlocked()
{
// ...
GateThread.Wake(this);
return true;
} private static class GateThread
{
private static readonly AutoResetEvent DelayEvent = new AutoResetEvent(initialState: false); // GateThread 入口方法
private static void GateThreadStart()
{
while(true)
{
DelayEvent.WaitOne(500);
// ...
}
} public static void Wake(PortableThreadPool threadPoolInstance)
{
DelayEvent.Set();
EnsureRunning(threadPoolInstance);
}
}

爬山演算法(Hill Climbing)

除了上述介紹的執行緒注入機制外,從CLR 4.0開始,執行緒池內實現了一個根據採集到執行緒池吞吐率資料(每次任務完成時記錄資料),推匯出該演算法認為最優的執行緒池執行緒數量。

演算法實現位於 HillClimbing.ThreadPoolHillClimber.Update,有興趣的朋友可以去看一下。

public (int newThreadCount, int newSampleMs) Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)
  • currentThreadCount:當前執行緒數

  • sampleDurationSeconds:取樣間隔

  • numCompletions:這段取樣時間間隔內完成的任務數

  • newThreadCount:新的執行緒數

  • newSample:新的取樣間隔時間

不必要執行緒的銷燬

如果執行緒需要被移除的時候,本地佇列還存在待執行任務,則會將這些任務轉移到全域性佇列中。

在以下幾個場景中,執行緒池將會銷燬掉不需要的執行緒,並不一定全面,只限於筆者當前認知。

  • 在無法從佇列系統領取到任務時。
  • 通過爬山演算法認定當前執行緒屬於多餘執行緒時。

參考資料

https://www.codeproject.com/Articles/3813/NET-s-ThreadPool-Class-Behind-The-Scenes

https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/#threading

https://mattwarren.org/2017/04/13/The-CLR-Thread-Pool-Thread-Injection-Algorithm/

https://docs.microsoft.com/zh-CN/previous-versions/msp-n-p/ff963549(v=pandp.10)?redirectedfrom=MSDN#thread-injection