1. 程式人生 > >Disruptor源碼閱讀筆記

Disruptor源碼閱讀筆記

數據轉換 詳細 沒有 容量 源碼閱讀筆記 性能 容易 包括 沖突

關於 Disruptor,網絡上有很多的解釋和說法。這裏簡單的概括下。Disruptor 是一個消費者生產者隊列框架,據官網介紹,可以提供非常強大的性能。Disruptor 與其說為我們帶來了一個框架,更多的是為我們帶來了一個獨特思路的編程實踐。總結來說大致有3點。

使用循環數組的方式代替隊列,使用預先填充數據的方式來避免 GC;
使用 CPU 緩存行填充的方式來避免極端情況下的數據爭用導致的性能下降;
多線程編程中盡量避免鎖爭用的編碼技巧。
上面的三點是在 Disruptor 中帶來的一些技巧。有些是常用的,有些是實現起來比較獨特的。

使用循環數組代替隊列

生產者消費者模型自然是離不開隊列的。但是使用傳統的隊列,面對並發等問題,在性能上是否已經足夠的高效?或者說是否有其他的辦法來進一步的提高性能。Disruptor 為我們提供了一個思路和實踐(這個思路不是 Disruptor 首創,但是他們提供了一個好的完整實踐)

基本的循環數組實現

定義一個數組,長度為2的次方冪(因為計算機是二進制的,所以2次方冪可以進行並運算來代替取模運算)。設定一個數字標誌表示當前的可用的位置(可以從0開始)。當這個數字標誌不斷增長到大於數組長度時進行與數組長度的並運算,得到的新數字依然在數組的長度範圍內,就又可以插入。這樣就好像一直插入直到數組末尾又再次從頭開始,故而稱之為循環數組。

一般的循環數組有頭尾兩個標誌位。這點和隊列很像。頭標誌位表示下一個可以插入的位置,尾標誌位表示下一個可以讀取的位置。頭標誌位不能大於尾標誌位一個數組長度(因為這樣就插入的位置和讀取的位置就重疊了會導致數據丟失),尾標誌位不能等於頭標誌位(因為這樣讀取的數據實際上是上一輪的舊數據)

預先填充提高性能

我們知道在java中如果創造大量的對象使用後棄用,JVM 會在適當的時候進行 GC 操作。大量的對象 GC 操作是很消耗時間的。所以如果能夠避免 GC 也可以提高性能,特別是在數據交互非常頻繁的時候。

在循環數組中,可以事先在數組中填充好數據。一旦有新數據的產生,要做的就是修改數組中某一位中的一些屬性值。這樣可以避免頻繁創建數據和棄用數據導致的 GC。這點比起隊列是要好的。

只保留一個標誌位

多線程在隊列也好,循環數組也好,必然存在對標誌位的競爭。無論是使用鎖來避免競爭,還是使用 CAS 來進行無鎖算法。只要爭用的情況存在,並且線程較多,都會出現對資源的不斷消耗。爭用的對象越多,爭用中消耗掉的資源也就越多。為了避免這樣的情況,減少爭用的資源就是一個手段。比如在循環數組中只保留一個標誌位,也就是下一個可以寫入數據位置的標誌位。而尾部標誌位則在各個消費者線程中保存(具體的編程手法後續細講)。

循環數組在單線程中的使用

如果確定只有一個生產者,也就是說只有一個寫線程。則在循環數組中的使用會更加簡化。具體來說單線程更新數組上的標誌位,那這種情況,標誌位就無需采用 CAS 寫的方式來確定下一個可寫入的位置,直接就是在單線程內進行普通的更新即可。

循環數組在多線程中的使用

如果存在多個生產者,則可寫入的標誌位需要用 CAS 算法來進行爭奪,避免鎖的使用。多個線程通過 CAS 得到唯一的不沖突的下一個可寫序號。由於需要獲得序號後才能進行寫入,而寫入完成才可以讓消費者線程進行消費。所以才獲得序號後,完成寫入前,必須有一種方式讓消費者檢測是否完成。以避免消費者拿到還未填入輸入的數組位。

為了達到這個目標,存在簡單—效率低和復雜—效率高兩種方式。

簡單但是可能效率低的方式

使用兩個標誌位。

  • prePut:表示下一個可以供生產者放入的位置;
  • put:表示最後一個生產者已經放入的位置。
    多個生產者通過 CAS 獲得 prePut 的不同的值。在獲得的序號並且完成數據寫入後,將 put 的值以 CAS 方式遞增(比如獲得的序號是7,只有 put 是6的時候才允許設置成功),稱之為發布。這種方式存在一個缺點,如果多個線程並發寫入,獲取 prePut 的值不會堵塞,假設其中一個生產者在寫入數據的時候稍慢,則其他的線程寫入完畢也無法完成發布。就會導致循環等待,浪費了 CPU 性能。

復雜但是可能效率高的方式

在上面的方式中,主要的爭奪環節集中在多線程發布中,序號大的線程發布需要等到序號小的線程發布完成後才能發布。那我們的優化的點也在這個地方。如果只有一個地方可以寫入完成信息,必然需要爭奪。為了避免爭奪,我們可以使用標誌數組(長度和內容數組相同,每一位表示相同下標的內容數組是否發布)來表示每一個位置是否寫入。這樣就可以避免發布的爭奪(大家的標誌位都不在一起了)。

但是又來帶來一個問題,用什麽數字來表示是否已經發布完成?如果只是0和1,那麽寫過1輪以後,標誌數組位上就都是1了。又無法區分。所以標誌數組上的數字應該在循環數組的每一輪循環的值都不同。比如一開始都是-1,第一輪中是0的表示已發布,第二輪中是0表示沒發布,是1的表示已發布。下面的是發布的算法步驟:

將序號除以標誌數組長度(因為長度是2的次方冪,這一步可以通過右移來完成)得到填入值 x;
將序號和標誌數組長度減一進行並運算得到填入位置 index;
將index位置寫入 x。
CPU緩存行填充技術

一般在軟件編程中,很少有工程師會關註一些硬件的信息。不過如果追求性能達到極致,那麽對於一些硬件知識的了解就成了必要。這其中CPU 緩存的知識會神奇的提高我們的程序性能。

CPU緩存行

在編程上,網絡關於 CPU 緩存的知識介紹很多。這裏簡單說下。在硬件中,CPU 存在著多級緩存的結果,越接近 CPU 的緩存容量越小,速度越快。每一個物理內核都有自己的緩存體系。不同的 CPU 之間通過緩存嗅探協議來確定緩存中的數據是否已經失效。如果失效了,CPU 會去內存中讀取數據,並且將最新的數據在特定指令的幫助下寫入到內存中。

CPU 緩存是以行為單位進行存取的。以前的 CPU 是32個字節一行,現在則是64個字節一行。因為這種行存取的方式,所以稱之為緩存行。如果一個對象中不同屬性在多線程中被頻繁更新,會導致一個問題:由於在同一個緩存行中的不相關變量的更新導致整個緩存行失效。緩存行失效後 CPU 就只好到主存中重新讀取數據。這個問題在並發隊列中特別明顯。為了修正這個問題,JDK 7 中特意提供了 transferqueue 來解決這個問題。

緩存行填充

既然問題的發生是因為同一個緩存行中有不相關的變量被更新導致緩存行需要的數據一起失效,那麽解決的辦法就是讓這個頻繁被更新的變量獨占一個緩存行即可。也就是剩下的位置就用無關數據填充。這樣就保證了關鍵變量不會因為其他變量的更新而失效。具體的填充方式,就是在一個 Java 對象中設定無意義的變量,根據變量的長度來計算需要的個數。以下是示例代碼:

//現在一般的cpu架構都是64個字節的緩存行,針對這個情況,緩存行填充可以如下進行
class LeftPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}

class RealValue extends LeftPad
{
protected volatile long point = -1; // 前後都有7個元素填充,可以保證該核心變量獨自在一個緩存行中
}

class RightPad extends RealValue
{
protected long p9, p10, p11, p12, p13, p14, p15;
}

public class CpuCachePadingValue extends RightPad
{
}
多線程編程中減少對鎖的使用

Disruptor 整個框架的實現過程都在盡量的減少對鎖的使用。比如生產者消費者中最容易出現爭奪的,其實就是其中的消息隊列。那麽對於這個消息隊列,我們可以采用的優化手段包括

使用循環數組代替隊列,使用 CAS 算法來代替鎖爭奪;
消費者各自保存自己當前已經處理過的序號,而不是將這個序號的信息在隊列中來存儲,避免多線程爭用。
針對上面的第二點詳細展開說一下。一般來說,隊列中信息的處理有兩種不同的形式,第一種是這個消息需要所有消費者都處理完畢,才能認為是被使用好了。第二種是爭奪到使用權的消費者線程進行消費,其他消費者線程爭奪下一個。無論哪一種,都可以將消費者已經處理的序號保存在消費者線程內。而如果信息只允許被一個線程消費,可以在內部使用 CAS 來爭奪。而生產者線程則需要持有消費者的類的信息,好用來判斷所有消費者中消費的最小的序號,以避免在數據寫入時覆蓋了某個消費者尚未處理的數據信息。

指定消費者不同的處理順序

Disruptor 可以讓不同的消費者按照一定的順序進行消息處理。比如一個消息,必須先經過日誌處理 A1 保存日誌,數據轉換處理器 A2 清理才能最終被業務處理器 A3 進行實際的業務處理。而 A1 和 A2 並沒有任何前後關系,但是 A3 必須等 A1 和 A2 都完成後才能進行。那麽在實際編碼時,可以讓 A3 追蹤 A1 和 A2 的處理序號。所有的消費者都在等待隊列中可用序號達到自己需要的序號,一旦到達,排位靠後的處理器就循環檢測排位靠前的處理器是否已經將數據處理完畢,處理完畢之後自己開始對數據的處理。

總結

Disruptor 這個框架在整個的編碼過程中一直都在體現本地緩存數據,使用 CAS 來代替鎖,盡可能無鎖甚至無 CAS 這樣的一種編程思想。根據官網的說明,這樣的編碼思想是在他們追求多線程以提高性能遇到失敗後(項目復雜性、可測試性、維護性等),回過頭思考在單線程下的性能可能性(單線程無鎖必然是性能最高的,但是吞吐量就有待商榷)。

Disruptor源碼閱讀筆記