1. 程式人生 > >Disruptor——一種可替代有界隊列完成並發線程間數據交換的高性能解決方案

Disruptor——一種可替代有界隊列完成並發線程間數據交換的高性能解決方案

top ogl align 來講 好處 文件 最優化 什麽 內存碎片

  本文翻譯自LMAX關於Disruptor的論文,同時加上一些自己的理解和標註。Disruptor是一個高效的線程間交換數據的基礎組件,它使用柵欄(barrier)+序號(Sequencing)機制協調生產者與消費者,從而避免使用鎖和CAS,同時還組合使用預分配內存機制、緩存行機制(cache line)、批處理效應(batch effect)來達到高吞吐量和低時延的目標。目前Disruptor版本已經叠代至3.0,本論文是基於Disruptor1.0寫就,在新版本中,相對與1.0版本,其核心設計思想沒有變,只是實現細節有所調整和優化,因此,此論文仍然很有研讀意義。

  Disruptor論文原文地址:http://disruptor.googlecode.com/files/Disruptor-1.0.pdf

  Disruptor github地址:http://lmax-exchange.github.io/disruptor/

  【翻譯過程中,參考了“yfx416的博客”,部分生動的註解直接摘抄自他的blog,已經特別標記出來,他的blog文章鏈接點擊此處。如有侵犯,請及時告知。】

*****************************************以下為對Disruptor論文的翻譯*****************************************************

摘要

  Lmax計劃創建一個高性能的財務交易系統。作為我們工作的一部分,我們評估了多種方案去設計這個系統以求達到高性能目標,最後我們發現在傳統的解決方案中我們遇到了基礎上的瓶頸。

  許多應用程序通過隊列來在不同的處理階段之間交換數據。我們的性能測試顯示,如果按照傳統的方式來使用隊列,延時代價的量級和磁盤IO操作的延時量級是同一個量級-非常慢。如果在端到端的操作中采用多個隊列,又會增加了幾百個毫秒的額外開銷【註:用於協調這些隊列】。這裏有很大的優化空間。

  通過進一步研究和調研計算機科學,我們意識到傳統方案(如:隊列和處理節點)耦合了多個關註點,從而帶來了多線程之間的資源爭搶,這暗示我們肯定別有妙方。

  結合現代CPU的工作原理,也就是我們常說的“機制共鳴”(mechanical sympathy)【註:意為參考現代cpu的設計思想,順應底層設計思路,來做上層的應用設計,以便使用底層設計的優勢,從而達到一個最佳的設計結果,得到一個‘共鳴’。】

,通過隔離關註,我們提出了一個數據結構和基於該數據結構的模式,這就是disruptor。

  測試結果顯示,對於一個三階段的任務管道,Disruptor的平均延時的數量級要小於基於傳統隊列的方法三個數量級。另外disruptor的吞吐量是傳統方法的8倍。

  這些性能改進也意味著對於並發編程我們前進了一大步。對於高吞吐量和低延時的異步事件處理系統,這種新的模式是一個非常理想的基礎組件。

  在Lmax中,我們已經建立起一個訂單匹配引擎,實時風險管理系統,以及一個高可用性的內存事務處理系統,這些系統都是基於disruptor。這些系統的性能都是很牛的。

  Disruptor不只是專門為財務行業設計的,它具有相當的通用性,他能夠解決並發編程中的一個復雜問題:如何最大化性能。這個系統的實現非常簡單,盡管這裏面的有些概念不是那麽直觀,但相比於其他機制,基於這種模式的系統往往更加簡單。

  相比於其他方法,disruptor的寫競爭比較少,並發開銷更低,而且更加緩存友好,吞吐量更高,延時抖動更低。對於一個普通時鐘頻率的處理器,disruptor每秒處理的消息量為2500萬,延時低於50納秒。這個性能指標已經接近於現代處理器在多核之間交換數據的上限。

概述

  Disruptor是Lmax開發的世界上最快的財務交易系統的產物。早期的設計思路主要借鑒SEDA和Actors的實現,希望使用pipeline來提升吞吐量。通過測試各種實現,我們發現管道(pipeline)在不同s階段(stage)之間,事件排隊是性能的主要殺手。我們發現隊列帶入了劇烈的延時抖動。我們為了達到更好的性能於是花了很多精力來開發一個新的隊列實現。然而最終發現隊列有其局限性——耦合了生產者、消費者、數據存儲等多個關註點。Disruptor的實現很好地隔離上述關註點。

並發的復雜性

  本文遵循計算機科學的通用定義:並發不僅是說有兩個或者多個任務同時執行,還意味著對資源的競爭訪問。這些競爭的資源可能是數據庫,文件,socket或者內存中的某個地址。

  代碼的並發執行主要有兩個方面:互斥和變化的可見性。互斥主要用來管理對某些資源的競爭更新。變化的可見性主要是用來控制什麽時候這些變化對其他線程可見。如果你能夠在應用層面上限制並發更新那麽你就有可能避免互斥。比如,如果你的算法能夠確保任何資源只會被一個線程更新,那麽互斥就是不必要的。讀或者寫要求所有的變化對其他線程可見,但實際上只有競爭寫操作才真正需要互斥。

  並發環境中最耗費時間的操作其實就是並發寫操作。多線程對同一個資源的寫需要復雜昂貴的協調,通常會通過某種鎖來實現資源協調。

鎖的代價

  鎖提供了互斥,並能夠確保變化能夠以一個確定的順序讓其它的線程看見。鎖其實是很昂貴的,因為他們在競爭的時候需要進行仲裁。這個仲裁會涉及到操作系統的上下文切換,操作系統會掛起所有在等待這把鎖的線程,直到鎖持有者釋放該鎖。上下文切換期間,執行線程會喪失對操作系統的控制,導致執行線程的執行上下文丟失之前緩存的數據和指令集,這會給現代處理器帶來嚴重的性能損耗。當然效率更高的用戶態鎖是另一種選擇,但用戶鎖只有在沒有競爭的時候才真正會帶來益處。【註:因為用戶態的鎖往往是通過自旋鎖來實現(或者帶休眠的自旋鎖),而自旋在競爭激烈的時候開銷是很大的(一直在消耗CPU資源)。】

  為了探究影響到底有多大,我們寫了一個程序,這個程序很簡單,就是調用一個循環5億次遞增操作的函數。這個java函數在單線程,2.4G Intel Westmere EP的CPU上只需要300ms。

  一旦引入鎖,即使沒有發生競爭,程序的執行時間也會發生顯著的增加。實驗結果如下:

Method

Time (ms)

Single thread

300

Single thread with lock

10,000

Two threads with lock

224,000

Single thread with CAS

5,700

Two threads with CAS

30,000

Single thread with volatile write

4,700

CAS的代價

  除了鎖之外,另外一種方法是CAS。CAS依賴於處理器的支持,當然大部分現代處理器都支持。CAS相對於鎖是非常高效的,因為它不需要涉及內核上下文切換進行仲裁。但cas並不是免費的,處理器需要對指令pipeline加鎖以確保原子性,並且會用到內存柵欄【註:內存柵欄的實現細節還不清楚,後續加強理解。】以確保對其他線程的可見性。JAVA中的-java.util.concurrent.Automaic*類用到了CAS操作。

  CAS的一個問題就是太復雜了,本來用鎖進行並發編程就已經很頭疼了,用CAS來實現復雜邏輯就更頭痛了。

  最理想的算法就是只有一個線程來負責對單個資源的所有寫,而其它所有的線程都是讀結果。在多核環境中讀取結果會要求memory barrier來使得變化對運行在另外的CPU核心上的線程可見。

內存柵欄(Memory Barriers)

  現代處理器為了獲得更高的性能會做指令重排,在內存和執行單元中,指令執行、數據的加載和存儲都會被進行指令重排。處理器只需要確保程序邏輯能得到相同的結果,它不會關心指令的執行順序。對於單線程程序,指令重排不會有問題,但對於共享狀態的多線程程序而言,內存有序變化就變得非常重要。處理器用內存柵欄來標識對內存更新順序敏感的代碼片段,它們確保確保指令的硬件執行順序和內存變化在線程間的可見性。編譯器可以在代碼的合適位置放置額外的軟件柵欄來確保被編譯代碼的執行順序,這些軟件柵欄是附加在處理器自身的硬件柵欄之上的。

  現代CPU相比內存系統來講速度是非常快的。為了橋接其各個CPU,現代處理器使用了復雜的緩存系統,這些緩存實際上是一些高效的獨立的硬件哈希表。不同CPU之間的緩存是通過消息傳輸協議來保證一致性。另外,處理器還會使用“存儲緩沖區”緩解對緩存的寫壓力,使用“失效隊列”確保——在寫操作發生時,緩存一致性協議能快速知道失效消息【註:實現細節不是很了解】

  對於此種實現方式,最近寫入的數據可能處於任何存儲中:在寄存器裏,在存儲緩沖區中,在各級緩存中,在主存中。如果多個線程要共享這個值,那麽這個值必須要按照一定的順序對其它線程可見,這種可見性是通過交換緩存一致性消息來協調完成的。內存柵欄可以控制這些消息的適時產生。

  讀內存柵欄(a read barrier)確保CPU上的加載指令有序,當緩存發生變化時,讀內存柵欄會在失效隊列上標記一個點。讀內存柵欄標記點之前的寫操作可以通過內存柵欄提供一致性視圖。【註:讀內存柵欄會在“失效隊列”中標記一個節點,這個節點意味著read barrier之前讀取的所有數據都已不可靠(這也就告訴我們可能有變化發生),之後的所有read操作都需要重新從內存中加載,因此之後的操作從而能夠看到數據的最新變化,barrier之前的所有線程的寫指令和barrier之後的讀指令就有了一個先後順序。Read memory barrier使得排在read memory barrier之前的寫操作成為一個整體,在這個整體內部寫操作的順序是不確定的,但這個整體形成了一個完整的視圖,整個整體的結果對後面的操作是可見。】

  寫內存柵欄(a Write barrier)用來確保CPU的存儲指令有序,寫內存柵欄會在存儲緩沖區(store buffer)中標記一個點,這個標記點之前的數據變化(write操作)會通過緩存flush到主存。寫內存柵欄標記點之前發生的存儲操作可以通過寫內存柵欄提供有序性視圖。

  讀寫(a full barrier)能同時確保加載/存儲操作的有序性。

  【註:綜上所述,實際上read barrier意味著read操作不能穿越這個read barrier,write barrier意味著write操作不能穿越這個write barrier。執行read barrier的CPU是多線程中的消費者的角色,它通過read barrier能夠盡快看到生產者線程的執行結果。而執行write barrier的CPU往往充當生產者的角色,它通過write barrier把自己執行的結果盡可能快得讓其它線程看見。】

  【註:內存柵欄的實現原理?】

  一些CPU在上述三個基礎柵欄基礎上引入了很多變化,但是通過這三個基礎柵欄足以理解內存柵欄期望解決問題的復雜性。在Java內存模型中,對volatile域的讀取和寫入實際上就是對應的read barrier和write barrier。Java內存模型規範對此有明確的定義。

緩存行(cache line)

  現代處理器中緩存的使用對高性能操作意義非凡。處理器將數據和指令持有在緩存中以達到高效處理的目的,對應的,如果數據和指令在在緩存中丟死,則會變得無效。

  硬件操作緩存並不是以字節或字為單位,為了效率考慮,緩存通常以緩存行(cache line)的形式進行組織,緩存行通常有32-256字節,最常見的是64字節。緩存行也是緩存一致性協議操作的最小粒度。這就意味著:如果兩個變量不幸在同一個緩存行裏,而且它們分別由不同的線程寫入,那麽這兩個變量的寫入會發生競爭,就好像多線程在競爭寫入同一個變量一樣。這種現象被稱之為“偽共享”(false sharing)。出於高性能的考慮,需要確保獨立但被並發寫入的變量之間不會共享同一個緩存行,以求將資源競爭降到最低。

  可預測式的CPU訪問主存【註:這裏的memory譯為主存,以便和緩存(cache)區分開來】時,通常會預測接下來將被訪問到的主存內容並在後臺將它加載到緩存中,從而將主存訪問產生的時延降至最低。主存預讀取發生的前提是:處理器能夠檢測到主存訪問的模式/規律,主存預讀取就像是以一個可預測的‘步幅’在主存中行走。比如說:當對一個數組的內容進行叠代時,‘步幅’是可預測的,這樣主存中的內容就能預讀取到緩存行中,最大化訪問主存的效率。在處理器能感知到的任何方向中,‘步幅’通常要小於2048字節。然而,像鏈表(linked lists)和樹(trees)這樣的數據結構在主存中擁有分布廣泛的節點,從而沒有可預測的訪問‘步幅’【註:由於鏈表或者樹各個節點之間分布並不是順序的,相鄰節點的存儲地址相隔很遠,所以處理器找不到對應的讀取規律,無法進行預讀取。】,由於主存中缺乏一致的模式限制了系統預取緩存行的能力,導致主存訪問的效率可能低了2個數量級。

隊列的問題

  典型的隊列要麽使用鏈表要麽使用數組作為底層的數據存儲。如果一個內存隊列是無界的,很大情況,他會變得不可控直到消耗大量內存導致嚴重錯誤。這通常發生在生產者處理能力好於消費者的場景下。無界隊列在生產者能保證不超過消費者消費能力的情況下非常有用,但是仍然有擔保失效的風險。為了避免災難發生,隊列通常是有界的,為了確保隊列有界就要求它是數組形式或者他的大小是被跟蹤的。

  常見的隊列形式下,在隊列的頭(消費者要頻繁使用),尾(生產者要頻繁使用)以及長度(生產者和消費者都需要頻繁更改)會產生資源競爭。當隊列在使用時,在大多數情況下隊列要麽是幾乎是滿的,要麽幾乎是空的,這是由於生產者和消費者不同的工作節奏造成。很少能取得平衡以使消費者和生產者的工作節奏完美匹配。幾乎為滿或者幾乎為空的隊列會導致嚴重的資源競爭和為了維護緩存一致性而帶來的高昂消耗。【註:以一個幾乎為空隊列為例,大量的消費者被阻塞,在可消費的entity上產生資源競爭,同時緩存對應的內容迅速變化,導致各個cpu之間維護緩存一致性的成本急劇上升】。即便為隊列的頭和尾使用不同的同步對象(比如:鎖或者CAS變量)也同樣存在問題,因為,這些不同的同步對象對應的主存內容通常位於同一個緩存行中,對這些同步對象的頻繁操作也會帶來資源競爭【註:這裏不太確定位於同一個緩存行的是同步對象還是隊列的頭和尾對象】

  相對於在隊列上加單個大粒度(large-grain)的鎖,管理分散(生產者關註隊列頭,消費者關註隊列尾和對頭尾之間的存儲節點的關註)。在put和take上添加單個大粒度鎖的使用時非常簡單的,但是對吞吐量影響實在太大。如果想要很好得解決隊列場景下的並發問題,那麽隊列的實現就必然很復雜,當然如果你是單生產者-單消費者的場景,那麽也許隊列實現沒有必要那麽復雜。  

  Java裏使用queue還有一個更大的問題:queue會成為很大的垃圾回收源。首先,隊列中的對象需要被分配和替換,再者,當鏈表重新尋址,對象需要重新分配。當隊列中的對象不再被引用時,所有這些對象都需要被回收。

管道和圖(Pipelines and Graphs)

  很多問題場景下,需要將多個處理階段綁定在一起組成管道,這個管道通常以並行地方式組織成圖的拓撲結構。各個階段之間通常使用隊列來連接,同時每個階段會有自己的處理線程。

  這種處理方式並不便宜——每個階段都會有入隊和出隊的開銷,當路徑必須分叉(fork)時,有多少個目標消費者,就會增加多少倍的成本【註:到每個消費者都會有入隊操作】;同時,分叉之後還需要合並,這時候會因為不可避免的資源競爭產生額外的成本。

  【註:舉個例子(這個例子來自於yfx416的譯文,非常淺顯易懂,因此貼在下面)

技術分享

  上面圖講的是如何組裝一臺小汽車,要組裝一臺汽車,我們第一階段要有個底座,接著裝引擎,裝駕駛員座椅,裝乘客座椅,裝後座座椅等等等等,最後是裝四個輪子。這裏每個圓圈都代表一個stage。箭頭代表依賴關系(或者路徑),這很容易理解,只有四個門和引擎蓋都裝好了(body complete)才能進行噴漆工作。每個箭頭都意味著一個隊列,箭頭兩端代表著生產者和消費者。每個消費者和消費者都意味著一個線程。比如對於paint而言,它擁有五個消費者線程來處理隊列。

  這麽做看起來很樸素,但其實Actor和SDEA等大名鼎鼎的架構都是源自類似的樸素簡陋的思想。這麽做固然好,但也是有代價的,讓我們來分析下代價。首先有隊列就意味著有出隊和入隊的操作,這就意味著開銷。對於chassis stage而言,它作為生產者,它的開銷 = 路徑數 * 入隊開銷(對於chassis stage,同樣的消息我需要重復傳播4次到4個不同的目的stage,浪費啊...)。對於paint stage而言,它有五個消費者線程來處理不同的隊列,這五個消費者線程互相競爭CPU,內存以及其他資源(比如為了給paint stage準備數據,大家一起往一個對象裏填充數據),如果線程數目不是5個,而是500個,這個壓力還是很大的。通過分析我們看到,由於各個stage在業務中的作用不同,造成了壓力分布的不均衡,比如bonnet stage就比較閑,paint stage就比較忙。而且對於類似chassis這樣的stage,同樣的消息要重復插入多次,帶來了開銷浪費。不均衡和浪費正是我們感覺不爽的地方。

  能改進麽?可以。同樣是這樣的依賴關系圖,如果我們能把各個stage之間聯系的隊列去掉,用一種統一的數據結構來代替,這樣就避免了重復,這該多麽理想。而且基於統一的數據結構,如果我們能夠用統一的可控的線程組來控制,就能使得壓力比較均勻,這不也是我們求之不得麽?實際上disruptor的思想就是來自於此。】

LMAX Disruptor的設計

  為了解決上面提出的問題,該設計嚴格地實現了關註分離(separation of the concerns)。該設計確保任何數據只被一個線程進行寫訪問,從而避免寫沖突。這個設計就是Disruptor,之所以叫這個名字,是因為它和java7中出現的“Phasers”有很多相似之處。 

  Disruptor的設計初衷就是為了解決上面提到的問題【註:鎖代價過大、CAS代價過大、緩存行導致的偽共享、隊列存在的問題】,以求最優化內存分配,使用緩存友好的方式來最佳使用現代硬件資源。

  Disruptor的核心機制在於:以RingBuffer的形式預分配有界的數據結構,單個或者多個生產者可以向RingBuffer中寫入數據,單個或者多個消費者可以從RingBuffer讀取數據。

內存分配

  Ring buffer的內存是在啟動時預先分配的。Ring buffer要麽是一個引用數組,每個元素是指向對象的引用【註:針對C#, java這樣的語言】,要麽是一個結構數組,每個元素(entry)代表的就是對象實體【註:針對C,C++這樣的語言】。由於java語言的限制,Java實現的Ring buffer實際上只能是一個引用數組。每個元素只能是一個數據的容器,而不是數據本身。這種預先分配的策略就能夠避免Java內存回收引起的一些性能問題,因為這些元素對象(enries)在能夠在Disruptor實例中的整個生命周期存活和被復用【註:這些enties一直被RingBuffer對象持有,而RingBuffer實例對象又被Disruptor持有,故只要Disruptor存在,則這些enties便不會被GC掉】。由於這些對象是在開始階段同時分配的,很大程度上被連續分布在主存中,即使不連續它們在內存中得間隔也有可能是固定的,從而支持緩存步幅,非常有利於緩存的數據預取【註:見前文中‘緩存行’中的描述】。John Rose提出了一個草案希望未來JAVA能夠支持所謂的“value type”,就像C語言那樣,如果這樣的話,disruptor就能夠確保對象在內存中一定是連續的,而不僅僅只是有很大可能性了。

  對於JAVA這樣的運行環境來講,垃圾回收對於低延時系統是一個嚴重的挑戰。內存越大,垃圾回收造成的性能壓力也就越大。垃圾回收喜歡的對象要麽壽命非常短,要麽對象幹脆是不死的。Ring buffer的預先分配使得我們的對象變成了不死的對象,這是大大減輕垃圾回收的壓力。

  在高負載的情況下,基於隊列的系統可以back up【註:不清楚這裏的back up是什麽意思,故使用原文表示】,這會降低處理效率,進而導致被分配的對象比原本存活得更長,在使用分代垃圾收集器的vm中,這些對象將會從年輕代進入老年代。這就意味著:

  • 1) 這些對象被不斷地在各代之間【註:年輕代eden gen 和 suviving erea,以及young gen和old gen之間】進行拷貝,造成延時波動;
  • 2) 進入老年代的對象回收成本會更高,GC帶來的內存碎片也會更多,引起‘stop the world’的概率會更大;越多的臨時對象進入到老年代,也更可能帶來性能上的損耗。

  【註:如果對象是永生不死的,1)的代價無法避免,但2)的代價就會大大降低,因為這些對象一直會存在,那麽老年代觸發major collection時它掃描一下所有對象,發現這些對象都無法釋放,什麽都不用幹,那就直接收工。由於對象不會被釋放,那也就不會有碎片,那麽stop the world的概率也就大大降低了。永生對象在2)中所做的只不過是進行一次掃描,這個代價非常小。】

  【註:下面通過分析ArrayBlockingQueue和RingBuffer來加深Disruptor內存預分配所帶來的好處

  在ArrayBlockingQueue中,數組Object[] items負責存儲隊列中的所有元素,如下圖所示,當消費者消費完items[0]元素,緊接著生產者向items[0]放入新的元素entity1,這時候items[0]存儲的是對象entity1的引用,items[0]到entity0對象的引用被切斷,entity0等待被GC。生產者不斷地向items[0]中寫入消息,則老的entity將不斷地需要被GC,一旦隊列阻塞,items可能熬過多次minor GC,幸存下來,並進入到老年代,帶來更嚴重的性能隱患。

技術分享

  再來看看RingBuffer預分配內存方式的精妙之處。RingBuffer同樣使用數組Object[] entries作為存儲元素,如下圖所示,初始化RingBuffer時,會將所有的entries的每個元素指定為特定的Event,這時候event中的detail屬性是null;後面生產者向RingBuffer中寫入消息時,RingBuffer不是直接將enties[7]指向其他的event對象,而是先獲取event對象,然後更改event對象的detail屬性;消費者在消費時,也是從RingBuffer中讀取出event,然後取出其detail屬性。可以看出,生產/消費過程中,RingBuffer的entities[7]元素並未發生任何變化,未產生臨時對象,entities及其元素對象一直存活,知道RingBuffer消亡。故而可以最小化GC的頻率,提升性能。

技術分享註:圖中對象Entry寫錯,應當為Event。


  思考一個問題:如果RingBuffer的enties[7]指向的event對象中含有對象屬性,會不會有queue類似的GC問題呢?】

隔離關註(Teasing Apart the Concerns

  如下幾個分開的關註點是所有隊列實現方式都需要綜合實現的,這些也是為隊列實現定義的接口:

  •   1) 隊列元素的存儲;
  •   2) 隊列協調生產者聲明下一個需要交換的隊列元素的序號;
  •   3) 隊列協調並告知消費者等待的元素已經就緒。

  在使用帶有垃圾回收特性語言來設計金融用的交換機(exchange)時,過多的內存分配會帶來麻煩,所以,我們基於鏈表的隊列不是一個好的解決方案。如果用於存儲各個階段交換數據的節點(entries)可以被預先分配內存,那麽垃圾回收可以被最小化;更進一步地,如果節點被統一分配為相同大小的塊,那麽在遍歷節點時,將會以一種緩存友好的方式進行,效率會更高。預分配內存的數組滿足上述的要求,在創建RingBuffer時,DIsruptor使用抽象工廠模式預分配了所有節點,當一個節點被聲明時,生產者只需要將它的數據拷貝到這個預分配的數據空間中即可。【註:可參考上節中的註解分析。】

  對於現代處理器而言,取余操作是一種比較昂貴的操作。但在RingBuffer中取余是一個使用頻率很高的操作,因為需要計算某一個序號在RingBuffer中的位置需要用到取余。一個替代的方法是將RingBuffer的長度設置為2的冪,這樣通過簡單的位操作就可以獲取余數。  

  我們前面提到,有界隊列會在隊列頭和隊列尾形成激烈的競爭。但是RingBuffer使用的數據結構則沒有這種競爭和並發源於,因為RingBuffer將這些競爭的焦點(concerns)轉移到了生產者/消費者柵欄(barriers)上去,接下來我們將詳細闡述這一邏輯。

  Disruptor的典型應用場景通常只有一個生產者,典型的生產者是文件讀取或者網絡偵聽。如果只有一個生產者,那麽隊列元素分配或者序號分配不會存在競爭。

  但在一些特別的場景,Disruptor會有多個生產者,這種情況下生產者們可能會彼此競爭來獲取RingBuffer中下一個可用的位置,這裏的競爭問題可以通過CAS操作來處理。

  當生產者將相關的數據拷貝到RingBuffer的位置(entry)中後,生產者提交這個序號,告知消費者這個位置的數據可以消費了。這裏可以不使用CAS操作,而是用簡單地自旋直到等待的其他生產者都到達了這個序號便可提交。【註:RingBuffer通過一個cursor來告知消費者當前那些位置可以被消費,多個生產者時,需要確保cursor遊標之前的seq都已經提交,故而這裏需要協調各個生產者】。為了避免覆蓋情況發生【註:覆蓋是指生產者熟讀快於消費者速度,導致生產者寫入消費者還未來得及消費的位置】,生產者在寫入前會檢查所有消費者最小的seq,確保寫入的seq不會大於這個最小的消費者seq。

  消費者在讀取元素之前需要等待一個序號,該序號指示了有效的可以讀取的元素。怎麽樣等待這個序號,有很多種策略。如果CPU資源比較寶貴,那麽消費者可以等待某一個鎖的條件變量,由生產者來喚醒消費者。這種方式明顯會帶來競爭,只適用於CPU資源的稀缺性比系統的延時/吞吐量重要的場景。另外一種策略是所有消費者線程循環檢查遊標(cursor),該遊標表示RingBuffer中當前有效的可供讀取的元素的位置,這種策略使用更多消耗CPU資源來換取低時延。這種方法由於沒有用鎖和條件變量,因此打破了生產者和消費者之間的競爭依賴關系。如果你想支持多生產者-多消費者的場景,你就不得不采用很多CAS操作,這些CAS操作作用在頭,尾,隊列長度等等,這就帶來了復雜性。但disruptor避免了這種復雜的CAS競爭。

序列化(Sequencing)

  順序化是disruptor管理並發的核心概念。每個生產者和消費者都維護自己的序號,這個序號用來和RingBuffer交互。當一個生產者希望在RingBuffer中添加一個元素時,它首先要做的是聲明一個序號【註:這個序號被稱之為生產者的聲明序號,該序號用來指示下一個空閑的slot的位置,一旦序號被聲明,那麽該序號就不會被其它生產者重復操作,生產者就可以操作該聲明序號的slot數據】。單生產者的場景下,這個聲明序號可以是一個簡單的整數;多生產者的場景,這個序號必須是一個支持CAS的原子變量。當一個生產者序號被聲明,這個序號對應的位置就可以被聲明該序號的生產者寫入了;當生產者完成更新元素後,它就通過更新一個單獨的序號來提交變化,這個單獨的序號是一個遊標(cursor),用來指示消費者可以消費的最新的元素。生產者可以通過一種自旋的方式來讀取和更新RingBuffer的遊標,這裏只需要使用內存柵欄而不需要使用CAS操作,如下:

        long expectedSequence = claimedSequence – 1;
        while (cursor != expectedSequence)  //this is a memory Barrier
        {
            // busy spin
        }

        cursor = claimedSequence;

  消費者等待指定的消費者序號變得可用,它通過內存柵欄(memory barrier)來讀取遊標(cursor),一旦遊標的值被更新,內存柵欄會確保RingBuffer中的這一變化會被所有在遊標上等待的消費者可見。【註:通過這種機制,消費者能夠及時知道生產者提交了新的消息,並嘗試進行消費】

  每個消費者都各自維護一個序號來表示自己最新消費的位置序號。生產者通過跟蹤這些序號來確保不會覆蓋消費者還未來得及消費的位置。同時這些序號也可以用於協調消費者之間的執行順序【註:Disruptor中的多消費者實際上是指對於同一個event,有多個處理階段,每個階段被認為是一個獨立的消費者,各個階段的執行通常是有順序要求的】

  單生產者場景下,不管消費者有多麽復雜的依賴,Disruptor都無需使用鎖和CAS操作,它通過多個序號(Sequences)上的內存柵欄就可以協調整個並發場景。【註:Disruptor中有兩類Sequence——生產者序號(又叫cursor)和消費者序號,通過在這兩個序號上建立內存柵欄,達到協調並發的目的】

批量效應(Batching Effect)

  當消費者等待RingBuffer中可用的前進遊標序號時,如果消費者發現RingBuffer遊標自上次檢查以來已經前進了多個序號,消費者可以直接處理所有可用的多個序號,而不用引入並發機制, 這樣滯後的消費者能夠迅速跟上生產者的步伐,從而平衡系統,這一特性是隊列(queues)不具有的。這種類型的批處理增加了吞吐量,同時減少和平滑了延遲。 根據我們的觀察結果,無論負載如何,時延都會維持在一個常數時間值上,直到存儲子系統飽和,然後根據小定律(Little’s Law),該曲線是線性的。 這與在負載增加時觀察隊列所得到時延“J”曲線效應非常不同。

依賴圖(Dependency Graphs)

  隊列從本質上來講表示一個簡單的消費者和生產者之間的只具有一步的管道(pipeline)。如果消費者形成了一個鏈條,或者一個圖狀的依賴關系,那麽圖中的每個階段之間都會需要一個隊列。大量的隊列就帶來了開銷。在設計LMAX財務交易系統的過程中,我們發現基於隊列的設計方法會導致大量的隊列開銷,而這些為數眾多的隊列所帶來的開銷耗費了事務處理的大部分時間。

  在Disruptor設計模式中,生產者和消費者的競爭被很好得隔離開了,因此通過使用一個簡單的RingBuffer也可以在消費者之間構造復雜的依賴關系。這樣降低了執行時延,從而提高了吞吐量。

  一個RingBuffer可以用來處理一個具有復雜的依賴關系圖的流程。設計RingBuffer的時候需要特別註意,需要避免消費者之間造成的jvm偽共享。

Disruptor類圖

  下圖是Disruptor框架的核心類圖。這個圖裏遺漏了一些簡化編程模型的類。一旦業務流程的依賴關系圖構建完畢,那麽編程模型就變簡單了。生產者通過ProducerBarrier來順序申請entry,同時將數據變化寫入entry中,然後再通過ProducerBarrier來提交數據變化並使得這些變化對消費者可見。作為一個消費者,它所需要做的只不過是提供一個BatchHandler實現,當一個新的entry可見時,這個回調會被觸發。這使得Disruptor編程模型是一個基於事件的模型,和Actor模型類似。

  為了更靈活的設計,隊列通常將關註點組合起來考慮。RingBuffer是Disruptor模式的核心,它為數據交換提供了存儲,同時又避免了競爭。通過RingBuffer,生產者和消費者之間的並發問題被隔離開了。ProducerBarrier就是用來管理RingBuffer中的位置槽(slot)聲明,同時跟蹤相關的消費者從而避免沖突覆蓋。而ConsumerBarrier在有新的元素有效時會負責通知消費者。通過這些barrier,消費者之間就構造成了一個依賴關系圖,這個依賴關系關系圖實際上代表了流程處理過程中的各個階段。

  【註:一下類圖是Disruptor1.0的實現類圖,現在已經更新到3.0版本,類圖也有了很大的改變,後面會給出最新的3.0版本的類圖】

技術分享

代碼示例

  下面的代碼示例是一個單生產者和單消費者的場景,它通過BatchHandler來實現消費者。消費者運行在一個單獨的線程上,當元素可用時,它被用來接收元素。

// Callback handler which can be implemented by consumers
final BatchHandler<ValueEntry> batchHandler = new BatchHandler<ValueEntry>()
{
    public void onAvailable(final ValueEntry entry) throws Exception
    {
        // process a new entry as it becomes available.
    }

    public void onEndOfBatch() throws Exception
    {
        // useful for flushing results to an IO device if necessary.
    }

    public void onCompletion()
    {
        // do any necessary clean up before shutdown
    }
};

RingBuffer<ValueEntry> ringBuffer =
    new RingBuffer<ValueEntry>(ValueEntry.ENTRY_FACTORY, SIZE,
                               ClaimStrategy.Option.SINGLE_THREADED,
                               WaitStrategy.Option.YIELDING);
ConsumerBarrier<ValueEntry> consumerBarrier = ringBuffer.createConsumerBarrier();       
BatchConsumer<ValueEntry> batchConsumer = 
    new BatchConsumer<ValueEntry>(consumerBarrier, batchHandler);
ProducerBarrier<ValueEntry> producerBarrier = ringBuffer.createProducerBarrier(batchConsumer);   

// Each consumer can run on a separate thread
EXECUTOR.submit(batchConsumer);

// Producers claim entries in sequence
ValueEntry entry = producerBarrier.nextEntry();

// copy data into the entry container

// make the entry available to consumers
producerBarrier.commit(entry);  

性能測試——吞吐量

我們選取了Doug Lea的ArrayBlockingQueue的實現作為參考目標進行測試,ArrayBlockingQueue是所有有界隊列中性能最好的,測試是按照阻塞的方式進行的。

技術分享

技術分享

技術分享

技術分享

技術分享

  上述配置中,ArrayBlockingQueue被用於每一個數據流箭頭的位置,相當於Disruptor中的柵欄所處的位置。下表展示了總共處理5億條消息時每秒吞吐量的性能測試結果,測試環境為:沒有HT的1.6.0_25 64-bit Sun JVM, Windows 7, Intel Core i7 860 @ 2.8 GHz ,以及Intel Core i7-2720QM, Ubuntu 11.04。 我們取了最好的前三條結果,這個結果使用於任何JVM運行環境,表中顯示的結果並不是我們發現最好的結果。

Nehalem 2.8Ghz – Windows 7 SP1 64-bit

Sandy Bridge 2.2Ghz – Linux 2.6.38 64-bit

ABQ

Disruptor

ABQ

Disruptor

Unicast: 1P – 1C

5,339,256

25,998,336

4,057,453

22,381,378

Pipeline: 1P – 3C

2,128,918

16,806,157

2,006,903

15,857,913

Sequencer: 3P – 1C

5,539,531

13,403,268

2,056,118

14,540,519

Multicast: 1P – 3C

1,077,384

9,377,871

260,733

10,860,121

Diamond: 1P – 3C

2,113,941

16,143,613

2,082,725

15,295,197

性能測試——時延

  為了測量延時,我們采用3個階段的pipeline作為測試場景,為了能夠測出系統的最佳狀態,我們讓吞吐量壓力維持在一個合適的水準,這個壓力不至於耗盡隊列資源。這個壓力是通過每插入一個事件就等待1ms的方式來實現的,然後一直這樣重復5000萬次。為了精確測量延時,我們需要精確考量CPU的時間戳計數器(TSC)。我們采用了那些TSC恒定的CPU來作為測試機器,因為老的CPU為了節省功耗,往往會自動調節TSC。Intel Nehalem之後的CPU都支持恒定的TSC。可以使用運行於Ubuntu 11.04上的Oracle最新版本的JVM進行測試,本測試未做CPU綁定。

  我們依然采用ArrayBlockingQueue用來對比,而沒有選取ConcurrentLinkedQueue,原因在於我們期望使用一個有界隊列來確保生產者不會超過消費者,從而避免產生過載,對測試產生影響。【註:有人可能會問:為什麽不用ConcurrentLinkedQueue來做對比呢?ConcurrentLinkedQueue的性能不是更好麽?的確是這樣,但是由於Disruptor模式的ringbuffer是一個長度固定的隊列系統,而ConcurrentLinkedQueue是一個長度沒有限制的隊列,兩者對於長度無限制的隊列,生產者如果效率過高,且又無法阻塞,這樣會搶占消費者的CPU資源,從而影響最後的測試結果。】

  Disruptor每一輪【註:一次完整的流水線】的平均延時為52納秒,相比之下ArrayBlockQueue每一輪的平均延時為32757納秒。跟蹤顯示ArrayBlockQueue性能損失主要是由條件變量的加鎖/通知引起的。下表中測試結果是在配置為2.2Ghz Core i7-2720QM 的Ubuntu 11.04操作系統上運行版本為Java 1.6.0_25 64-bit 的jvm虛擬機得到的。

Array Blocking Queue (ns)

Disruptor (ns)

Min Latency

145

29

Mean Latency

32,757

52

99% observations less than

2,097,152

128

99.99% observations less than

4,194,304

8,192

Max Latency

5,069,086

175,567

技術分享

結論

  Disruptor在"提高吞吐量-降低時延"的領域裏邁出了重要一步,這在許多應用中都是非常重要的考慮。我們的測試顯示,與其它線程間交換數據的方法相比,disruptor的性能是最好的。我們相信disruptor應該是所有數據交換方法中最好的。通過隔離關註,通過限制寫競爭,通過最小化讀競爭,通過代碼的緩存友好化(cache-friendly),我們創造了一種在線程之間交換數據的高效的方法。【註:這裏是指線程間交換數據,並未包含具體業務邏輯處理。】

  批處理效應(Batch Effect)允許消費者能夠一次性沒有競爭的處理大量的元素,這為高性能系統提供了一個新的特性。對於大多數系統而言,隨著吞吐量壓力的增加,延時也會呈指數級增加,形成一個類似J的曲線。然而在Disruptor系統中,隨著吞吐量壓力的增加,延時依然會很平滑,直到內存被耗盡。

  我們相信Disruptor建立了高性能計算的新基準,並且非常適合繼續利用處理器和計算機設計的發展趨勢。

參考文獻

[1] Staged Event Driven Architecture – http://www.eecs.harvard.edu/~mdw/proj/seda/

[2] Actor model – http://dspace.mit.edu/handle/1721.1/6952

[3] Java Memory Model - http://www.ibm.com/developerworks/library/j-jtp02244/index.html

[4] Phasers - http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166ydocs/jsr166y/Phaser.html

[5] Value Types - http://blogs.oracle.com/jrose/entry/tuples_in_the_vm

[6] Little’s Law - http://en.wikipedia.org/wiki/Little%27s_law

[7] ArrayBlockingQueue - http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ArrayBlockingQueue.html

[8] ConcurrentLinkedQueue - http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

Disruptor——一種可替代有界隊列完成並發線程間數據交換的高性能解決方案