DirectX11 With Windows SDK--27 計算著色器:雙調排序
前言
上一章我們用一個比較簡單的例子來嘗試使用計算著色器,但是在看這一章內容之前,你還需要了解一些緩衝區的建立和使用方式:
章節 |
---|
深入理解與使用緩衝區資源 |
這一章我們繼續用一個計算著色器的應用例項作為切入點,進一步瞭解相關知識。
DirectX11 With Windows SDK完整目錄
歡迎加入QQ群: 727623616 可以一起探討DX11,以及有什麼問題也可以在這裡彙報。
執行緒識別符號
對於執行緒組(大小 (ThreadDimX, ThreadDimY, ThreadDimZ)
)中的每一個執行緒,它們都有一個唯一的執行緒ID值。我們可以使用系統值 SV_GroupThreadID
來取得,它的索引範圍為 (0, 0, 0)
到 (ThreadDimX - 1, ThreadDimY - 1, ThreadDimZ - 1)
。
而對於整個執行緒組來說,由於執行緒組集合也是在3D空間中排布,它們也有一個唯一的執行緒組ID值。我們可以使用系統值 SV_GroupID
來取得,執行緒組的索引範圍取決於呼叫 ID3D11DeviceContext::Dispatch
時提供的執行緒組(大小 (GroupDimX, GroupDimY, GroupDimZ)
),範圍為 (0, 0, 0)
到 (GroupDimX - 1, GroupDimY - 1, GroupDimZ - 1)
。
緊接著就是系統值 SV_GroupIndex
,它是單個執行緒組內的執行緒三維索引的一維展開。若已知執行緒組的大小為 (ThreadDimX, ThreadDimY, ThreadDimZ)
,則可以確定 SV_GroupIndex
與 SV_GroupThreadID
滿足下面關係:
SV_GroupIndex = SV_GroupThreadID.z * ThreadDimX * ThreadDimY + SV_GroupThreadID.y * ThreadDimX + SV_GroupThreadID.x;
最後就是系統值 SV_DispatchThreadID
,執行緒組中的每一個執行緒在 ID3D11DeviceContext::Dispatch
提供的執行緒組集合中都有其唯一的執行緒ID值。若已知執行緒組的大小為 (ThreadDimX, ThreadDimY, ThreadDimZ)
,則可以確定 SV_DispatchThreadID
, SV_GroupThreadID
和 SV_GroupID
滿足以下關係:
SV_DispatchThreadID.xyz = SV_GroupID.xyz * float3(ThreadDimX, ThreadDimY, ThreadDimZ) + SV_GroupThreadID.xyz;
共享記憶體和執行緒同步
在一個執行緒組內,允許設定一片共享記憶體區域,使得當前執行緒組內的所有執行緒都可以訪問當前的共享記憶體。一旦設定,那麼每個執行緒都會各自配備一份共享記憶體。共享記憶體的訪問速度非常快,就像暫存器訪問CPU快取那樣。
共享記憶體的宣告方式如下:
groupshared float4 gCache[256];
對於每個執行緒組來說,它所允許分配的總空間最大為32kb(即8192個標量,或2048個向量)。內部執行緒通常應該使用 SV_ThreadGroupID
來寫入共享記憶體,這樣以保證每個執行緒不會出現重複寫入操作,而讀取共享記憶體一般是執行緒安全的。
分配太多的共享記憶體會導致效能問題。假如一個多處理器支援32kb的共享記憶體,然後你的計算著色器需要20kb的共享記憶體,這意味著一個多處理器只適合處理一個執行緒組,因為剩餘的共享記憶體不足以給新的執行緒組執行,這也會限制GPU的並行運算,當該執行緒組因為某些原因需要等待,會導致當前的多處理器處於閒置狀態。因此保證一個多處理器至少能夠處理兩個或以上的執行緒組(比如每個執行緒組分配16kb以下的共享記憶體),以儘可能減少該多處理器的閒置時間。
現在來考慮下面的程式碼:
Texture2D gInput : register(t0); RWTexture2D<float4> gOutput : register(u0); groupshared float4 gCache[256]; [numthreads(256, 1, 1)] void CS(uint3 GTid : SV_GroupThreadID, uint3 DTid : SV_DispatchThreadID) { // 將紋理畫素值快取到共享記憶體 gCache[GTid.x] = gInput[DTid.xy]; // 取出共享記憶體的值進行計算 // 注意!!相鄰的兩個執行緒可能沒有完成對紋理的取樣 // 以及儲存到共享記憶體的操作 float left = gCache[GTid.x - 1]; float right = gCache[GTid.x + 1]; // ... }
因為多個執行緒同時執行,同一時間各個執行緒當前執行的指令有所偏差,有的執行緒可能已經完成了共享記憶體的賦值操作,有的執行緒可能還在進行紋理取樣操作。如果當前執行緒正在讀取相鄰的共享記憶體片段,結果將是未定義的。為了解決這個問題,我們必須在讀取共享記憶體之前讓當前執行緒等待執行緒組內其它的所有執行緒完成寫入操作。這裡我們可以使用 GroupMemoryBarrierWithGroupSync
函式:
Texture2D gInput : register(t0); RWTexture2D<float4> gOutput : register(u0); groupshared float4 gCache[256]; [numthreads(256, 1, 1)] void CS(uint3 GTid : SV_GroupThreadID, uint3 DTid : SV_DispatchThreadID) { // 將紋理畫素值快取到共享記憶體 gCache[GTid.x] = gInput[DTid.xy]; // 等待所有執行緒完成寫入 GroupMemoryBarrierWithGroupSync(); // 現在讀取操作是執行緒安全的,可以開始進行計算 float left = gCache[GTid.x - 1]; float right = gCache[GTid.x + 1]; // ... }
雙調排序
雙調序列
所謂雙調序列(Bitonic Sequence),是指由一個非嚴格遞增序列X(允許相鄰兩個數相等)和非嚴格遞減序列Y構成的序列,比如序列 \((5, 3, 2, 1, 4, 6, 6, 12)\) 。
定義:一個序列 \(a_1 , a_2, ..., a_n\) 是雙調序列,需要滿足下面條件:
- 存在一個 \(a_k(1 <= k <= n)\) ,使得 \(a_1 >= ... >= a_k <= ... <= a_n\) 成立,或者 \(a_1 <= ... <= a_k >= ... >= a_n\) 成立;
- 序列迴圈移位後仍能夠滿足條件(1)
Batcher歸併網路
Batcher歸併網路是由一系列Batcher比較器組成的,Batcher比較器是指在兩個輸入端給定輸入值x和y,再在兩個輸出端輸出最大值 \(max(x, y)\) 和最小值 \(min(x, y)\) 。
雙調歸併網路
雙調歸併網路是基於Batch定理而構建的。該定理是說將任意一個長為2n的雙調序列分為等長的兩半X和Y,將X中的元素與Y中的元素按原序比較,即 \(a_i\) 與 \(a_{i+n}(i <= n)\) 比較,將較大者放入MAX序列,較小者放入MIN序列。則得到的MAX序列和MIN序列仍然是雙調序列,並且MAX序列中的任意一個元素不小於MIN序列中的任意一個元素。
根據這個原理,我們可以將一個n元素的雙調序列通過上述方式進行比較操作來得到一個MAX序列和一個MIN序列,然後對這兩個序列進行遞迴處理,直到序列不可再分割為止。最終歸併得到的為一個有序序列。
這裡我們用一張圖來描述雙調排序的全過程:
其中箭頭方向指的是兩個數交換後,箭頭段的數為較大值,圓點段的數為較小值。
我們可以總結出如下規律:
- 每一趟排序結束會產生連續的雙調序列,除了最後一趟排序會產生我們所需要的單調序列
- 對於2^k個元素的任意序列,需要進行k趟排序才能產生單調序列
- 對於由 \(2^{k-1}\) 個元素的單調遞增序列和 \(2^{k-1}\) 個元素的單調遞減序列組成的雙調序列,需要進行k趟交換才能產生2^k個元素的單調遞增序列
- 在第n趟排序中的第m趟交換,若兩個比較數中較小的索引值為i,那麼與之進行交換的數索引為 \(i+2^{n-m}\)
雙調排序的空間複雜度為 \(O(n)\) ,時間複雜度為 \(O(n{(lg(n))}^2)\) ,看起來比 \(O(nlg(n))\) 系列的排序演算法慢上一截,但是得益於GPU的平行計算,可以看作同一時間內有n個執行緒在執行,使得最終的時間複雜度可以降為 \(O({(lg(n))}^2)\) ,效率又上了一個檔次。
需要注意的是,雙調排序要求排序元素的數目為 \(2^k, (k>=1)\) ,如果元素個數為 \(2^k < n < 2^{k+1}\) ,則需要填充資料到 \(2^{k+1}\) 個。若需要進行升序排序,則需要填充足夠的最大值;若需要進行降序排序,則需要填充足夠的最小值。
排序核心程式碼實現
本HLSL實現參考了 directx-sdk-samples ,雖然裡面的實現看起來比較簡潔,但是理解它的演算法實現費了我不少的時間。個人以自己能夠理解的形式對它的實現進行了修改,因此這裡以我這邊的實現版本來講解。
首先是排序需要用到的資源和常量緩衝區,定義在 BitonicSort.hlsli
:
// BitonicSort.hlsli Buffer<uint> gInput : register(t0); RWBuffer<uint> gData : register(u0); cbuffer CB : register(b0) { uint gLevel;// 2^需要排序趟數 uint gDescendMask;// 下降序列掩碼 uint gMatrixWidth;// 矩陣寬度(要求寬度>=高度且都為2的倍數) uint gMatrixHeight; // 矩陣高度 }
然後是核心的排序演算法:
// BitonicSort_CS.hlsl #include "BitonicSort.hlsli" #define BITONIC_BLOCK_SIZE 512 groupshared uint shared_data[BITONIC_BLOCK_SIZE]; [numthreads(BITONIC_BLOCK_SIZE, 1, 1)] void CS(uint3 Gid : SV_GroupID, uint3 DTid : SV_DispatchThreadID, uint3 GTid : SV_GroupThreadID, uint GI : SV_GroupIndex) { // 寫入共享資料 shared_data[GI] = gData[DTid.x]; GroupMemoryBarrierWithGroupSync(); // 進行排序 for (uint j = gLevel >> 1; j > 0; j >>= 1) { uint smallerIndex = GI & ~j; uint largerIndex = GI | j; bool isDescending = (bool) (gDescendMask & DTid.x); bool isSmallerIndex = (GI == smallerIndex); uint result = ((shared_data[smallerIndex] <= shared_data[largerIndex]) == (isDescending == isSmallerIndex)) ? shared_data[largerIndex] : shared_data[smallerIndex]; GroupMemoryBarrierWithGroupSync(); shared_data[GI] = result; GroupMemoryBarrierWithGroupSync(); } // 儲存結果 gData[DTid.x] = shared_data[GI]; }
可以看到,我們實際上可以將遞迴過程轉化成迭代來實現。
現在我們先從核心排序演算法講起,由於收到執行緒組的執行緒數目、共享記憶體大小限制,這裡定義一個執行緒組包含512個執行緒,即一個執行緒組最大允許排序的元素數目為512。這裡共享記憶體的作用在這裡是臨時快取中間排序的結果。
首先,我們需要將資料寫入共享記憶體中:
// 寫入共享資料 shared_data[GI] = gData[DTid.x]; GroupMemoryBarrierWithGroupSync();
接著就是要開始遞迴排序的過程,其中 gLevel
的含義為單個雙調序列的長度,它也說明了需要對該序列進行 \(lg(gLevel)\) 趟遞迴交換。
在一個執行緒中,我們僅知道該執行緒對應的元素,但現在我們還需要做兩件事情:
- 找到需要與該執行緒對應元素進行Batcher比較的另一個元素
- 判斷當前執行緒對應元素與另一個待比較元素相比,是較小索引還是較大索引
這裡用到了位運算的魔法。先舉個例子,當前 j
為4,則待比較兩個元素的索引分別為2和6,這兩個索引值的區別在於索引2(二進位制010)和索引6(二進位制110),前者二進位制第三位為0,後者二進位制第三位為1.
但只要我們知道上述其中的一個索引,就可以求出另一個索引。較小索引值的索引可以通過遮蔽二進位制的第三位得到,而較大索引值的索引可以通過按位或運算使得第三位為1來得到:
uint smallerIndex = GI & ~j; uint largerIndex = GI | j; bool isSmallerIndex = (GI == smallerIndex);
然後就是判斷當前元素是位於當前趟排序完成後的遞增序列還是遞減序列,比如序列 \((4, 6, 4, 3, 5, 7, 2, 1)\) ,現在要進行第二趟排序,那麼前後4個數將分別生成遞增序列和遞減序列,我們可以設定 gDescendMask
的值為4(二進位制100),這樣二進位制索引範圍在100到111的值(對應十進位制4-7)處在遞減序列,如果這個雙調序列長度為16,那麼索引4-7和12-15的兩段序列都可以通過 gDescendMask
來判斷出處在遞減序列:
bool isDescending = (bool) (gDescendMask & DTid.x);
最後就是要確定當前執行緒對應的共享記憶體元素需要得到較小值,還是較大值了。這裡又以一個雙調序列 \((2, 5, 7, 4)\) 為例,待比較的兩個元素為5和4,當前趟排序會將它變為單調遞增序列,即所處的序列為遞增序列,當前執行緒對應的元素為5, shared_data[smallerIndex] <= shared_data[largerIndex]
的比較結果為 >
,那麼它將拿到(較小值)較大索引的值。經過第一趟交換後將變成 \((2, 4, 7, 5)\) ,第二趟交換就不討論了。
根據對元素所處序列、元素當前索引和比較結果的討論,可以產生出八種情況:
所處序列 | 當前索引 | 比較結果 | 取值結果 |
---|---|---|---|
遞減 | 小索引 | <= | (較大值)較大索引的值 |
遞減 | 大索引 | <= | (較小值)較大索引的值 |
遞增 | 小索引 | <= | (較小值)較小索引的值 |
遞增 | 大索引 | <= | (較大值)較小索引的值 |
遞減 | 小索引 | > | (較大值)較小索引的值 |
遞減 | 大索引 | > | (較小值)較小索引的值 |
遞增 | 小索引 | > | (較小值)較大索引的值 |
遞增 | 大索引 | > | (較大值)較大索引的值 |
顯然現有的變數判斷較大/較小索引值比判斷較大值/較小值容易得多。上述結果表可以整理成下面的程式碼:
uint result = ((shared_data[smallerIndex] <= shared_data[largerIndex]) == (isDescending == isSmallerIndex)) ? shared_data[largerIndex] : shared_data[smallerIndex]; GroupMemoryBarrierWithGroupSync(); shared_data[GI] = result; GroupMemoryBarrierWithGroupSync();
在C++中,現在有如下資源和著色器:
ComPtr<ID3D11Buffer> mConstantBuffer;// 常量緩衝區 ComPtr<ID3D11Buffer> mTypedBuffer1;// 有型別緩衝區1 ComPtr<ID3D11Buffer> mTypedBuffer2;// 有型別緩衝區2 ComPtr<ID3D11UnorderedAccessView> mDataUAV1;// 有型別緩衝區1對應的無序訪問檢視 ComPtr<ID3D11UnorderedAccessView> mDataUAV2;// 有型別緩衝區2對應的無序訪問檢視 ComPtr<ID3D11ShaderResourceView> mDataSRV1;// 有型別緩衝區1對應的著色器資源檢視 ComPtr<ID3D11ShaderResourceView> mDataSRV2;// 有型別緩衝區2對應的著色器資源檢視
然後就是對512個元素進行排序的部分程式碼(size為2的次冪):
void GameApp::SetConstants(UINT level, UINT descendMask, UINT matrixWidth, UINT matrixHeight); // // GameApp::GPUSort // md3dImmediateContext->CSSetShader(mBitonicSort_CS.Get(), nullptr, 0); md3dImmediateContext->CSSetUnorderedAccessViews(0, 1, mDataUAV1.GetAddressOf(), nullptr); // 按行資料進行排序,先排序level <= BLOCK_SIZE 的所有情況 for (UINT level = 2; level <= size && level <= BITONIC_BLOCK_SIZE; level *= 2) { SetConstants(level, level, 0, 0); md3dImmediateContext->Dispatch((size + BITONIC_BLOCK_SIZE - 1) / BITONIC_BLOCK_SIZE, 1, 1); }
給更多的資料排序
上述程式碼允許我們對元素個數為2到512的序列進行排序,但緩衝區的元素數目必須為2的次冪。由於在CS4.0中,一個執行緒組最多允許一個執行緒組包含768個執行緒,這意味著雙調排序僅允許在一個執行緒組中對最多512個元素進行排序。
這裡我們看一個例子,假如有一個16元素的序列,然而執行緒組僅允許包含最多4個執行緒,那我們將其放置在一個4x4的矩陣內:
然後對矩陣轉置:
可以看到,通過轉置後,列資料變換到行資料的位置,這樣我們就可以進行跨度更大的交換操作了。處理完大跨度的交換後,我們再轉置回來,處理行資料即可。
現在假定我們已經對行資料排完序,下圖演示了剩餘的排序過程:
但是線上程組允許最大執行緒數為4的情況下,通過二維矩陣最多也只能排序16個數。。。。也許可以考慮三維矩陣轉置法,這樣就可以排序64個數了哈哈哈。。。
不過還有一個情況我們要考慮,就是元素數目不為(2x2)的倍數,無法構成一個方陣,但我們也可以把它變成對兩個方陣轉置。這時矩陣的寬是高的兩倍:
由於元素個數為32,它的最大索引跨度為16,轉置後的索引跨度為2,不會越界到另一個方陣進行比較。但是當 gLevel
到32時,此時進行的是單調排序, gDescendMask
也必須設為最大值 32
(而不是4),避免產生雙調序列。
負責轉置的著色器實現如下:
// MatrixTranspose_CS.hlsl #include "BitonicSort.hlsli" #define TRANSPOSE_BLOCK_SIZE 16 groupshared uint shared_data[TRANSPOSE_BLOCK_SIZE * TRANSPOSE_BLOCK_SIZE]; [numthreads(TRANSPOSE_BLOCK_SIZE, TRANSPOSE_BLOCK_SIZE, 1)] void CS(uint3 Gid : SV_GroupID, uint3 DTid : SV_DispatchThreadID, uint3 GTid : SV_GroupThreadID, uint GI : SV_GroupIndex) { uint index = DTid.y * gMatrixWidth + DTid.x; shared_data[GI] = gInput[index]; GroupMemoryBarrierWithGroupSync(); uint2 outPos = DTid.yx % gMatrixHeight + DTid.xy / gMatrixHeight * gMatrixHeight; gData[outPos.y * gMatrixWidth + outPos.x] = shared_data[GI]; }
最後是GPU排序用的函式:
void GameApp::GPUSort() { UINT size = (UINT)mRandomNums.size(); md3dImmediateContext->CSSetShader(mBitonicSort_CS.Get(), nullptr, 0); md3dImmediateContext->CSSetUnorderedAccessViews(0, 1, mDataUAV1.GetAddressOf(), nullptr); // 按行資料進行排序,先排序level <= BLOCK_SIZE 的所有情況 for (UINT level = 2; level <= size && level <= BITONIC_BLOCK_SIZE; level *= 2) { SetConstants(level, level, 0, 0); md3dImmediateContext->Dispatch((size + BITONIC_BLOCK_SIZE - 1) / BITONIC_BLOCK_SIZE, 1, 1); } // 計算相近的矩陣寬高(寬>=高且需要都為2的次冪) UINT matrixWidth = 2, matrixHeight = 2; while (matrixWidth * matrixWidth < size) { matrixWidth *= 2; } matrixHeight = size / matrixWidth; // 排序level > BLOCK_SIZE 的所有情況 ComPtr<ID3D11ShaderResourceView> pNullSRV; for (UINT level = BITONIC_BLOCK_SIZE * 2; level <= size; level *= 2) { // 如果達到最高等級,則為全遞增序列 if (level == size) { SetConstants(level / matrixWidth, level, matrixWidth, matrixHeight); } else { SetConstants(level / matrixWidth, level / matrixWidth, matrixWidth, matrixHeight); } // 先進行轉置,並把資料輸出到Buffer2 md3dImmediateContext->CSSetShader(mMatrixTranspose_CS.Get(), nullptr, 0); md3dImmediateContext->CSSetShaderResources(0, 1, pNullSRV.GetAddressOf()); md3dImmediateContext->CSSetUnorderedAccessViews(0, 1, mDataUAV2.GetAddressOf(), nullptr); md3dImmediateContext->CSSetShaderResources(0, 1, mDataSRV1.GetAddressOf()); md3dImmediateContext->Dispatch(matrixWidth / TRANSPOSE_BLOCK_SIZE, matrixHeight / TRANSPOSE_BLOCK_SIZE, 1); // 對Buffer2排序列資料 md3dImmediateContext->CSSetShader(mBitonicSort_CS.Get(), nullptr, 0); md3dImmediateContext->Dispatch(size / BITONIC_BLOCK_SIZE, 1, 1); // 接著轉置回來,並把資料輸出到Buffer1 SetConstants(matrixWidth, level, matrixWidth, matrixHeight); md3dImmediateContext->CSSetShader(mMatrixTranspose_CS.Get(), nullptr, 0); md3dImmediateContext->CSSetShaderResources(0, 1, pNullSRV.GetAddressOf()); md3dImmediateContext->CSSetUnorderedAccessViews(0, 1, mDataUAV1.GetAddressOf(), nullptr); md3dImmediateContext->CSSetShaderResources(0, 1, mDataSRV2.GetAddressOf()); md3dImmediateContext->Dispatch(matrixWidth / TRANSPOSE_BLOCK_SIZE, matrixHeight / TRANSPOSE_BLOCK_SIZE, 1); // 對Buffer1排序剩餘行資料 md3dImmediateContext->CSSetShader(mBitonicSort_CS.Get(), nullptr, 0); md3dImmediateContext->Dispatch(size / BITONIC_BLOCK_SIZE, 1, 1); } }
最後是 std::sort
和雙調排序的比較結果:
可以初步看到雙調排序的排序用時比較穩定,而快速排序明顯隨元素數目增長而變慢。
DirectX11 With Windows SDK完整目錄