聊一聊Disruptor的生產者寫入
本文主要講一下Disruptor的寫入部分,也就是生產者怎麼將資料成功的寫入RingBuffer
寫入RingBuffer需要關注的幾個問題
-
1:如何避免生產者的生產速度過快而造成的新訊息覆蓋了未被消費的舊訊息的問題
-
2:如何解決多個生產者搶佔生產位的問題
帶著問題去看程式碼,會更有目的性,所以本文主要會圍繞這兩個問題作出解答,在解答的過程中,我們都會分為單生產者和多生產者兩種情況進行解答。 首先看第一個問題: 這個問題是Disruptor特有的,因為他的核心RingBuffer是一個首尾相連的有界的連續陣列,所以如果生產者生產過快,一定會使得生產的訊息超過了未消費的訊息一圈,從而使未被消費的訊息被覆蓋而丟失。那Disruptor是如何解決這個問題的呢?
在這之前,要先插播一下生產者生產訊息寫入RingBuffer的主要流程。大致可以分為兩步。
-
第一,佔位
因為RingBuffer中的每個儲存單元都是帶序號的資料容器,所以生產者在往裡寫入資料時,第一步就要先搶佔到這個位置,這個搶佔包括兩部分,首先是如果是多生產者的情況,需要考慮多生產者同時搶佔的問題,然後要考慮消費者的消費速度問題,不能搶佔還未被消費的位置。
-
第二,填裝資料之後釋出。
填裝資料好理解,因為RingBuffer在初始化時已經分配好了所有資料容器,我們要做的就是往這個容器中填入我們的資料,做個比喻就是思考一下摩天輪這個場景。摩天輪的每個箱子都是一直存在的,遊客做的就是不停的往箱子裡坐人和下來給下一波人騰位置。那釋出又是什麼意思呢?這裡的釋出指的是當生產者成功填裝好資料之後,怎麼確認這一次的生產行為,這過程中單生產者和多生產者的處理方式是不一樣的,這個我們會在之後詳細講到。

我總是喜歡把這個寫入的過程比作摩天輪,希望這個圖能讓你有一個大致的印象
好,現在回到之前沒說完的這個問題,生產者在寫入時,怎麼感知消費者的消費速度是否允許我這一次新資料的寫入呢?這個就和我們上面提到的第一步,佔位有關。 為了方便讀者快速的先有一個畫面感,所以先講到單生產者的情況. 單生產者的時候,因為不會有其他生產者跟他搶佔位置,所以他只需要知道他的消費者的消費到哪裡了就行。Disruptor處理這個問題的方法是每個消費者加入消費時,都會呼叫生產者的以下這個方法
/** * Add the specified gating sequences to this instance of the Disruptor.They will * safely and atomically added to the list of gating sequences. * * @param gatingSequences The sequences to add. */ void addGatingSequences(Sequence... gatingSequences); 複製程式碼
從這個方法的名字也能看出來,這個方法是向生產者新增一個門控序列,那什麼門控序列呢,這個可以從生產者搶佔位子的程式碼裡獲得靈感,對此的解釋我都會按照註釋的方式也在程式碼裡,其餘也不過多贅述了。 /** * @see Sequencer#next(int) */ @Override public long next(int n) //生產者進行搶佔時呼叫的方法,next(1)即為搶佔已生產過的序號的下一個位置 { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); }
long nextValue = this.nextValue; long nextSequence = nextValue + n;// 例如已經生產到了8號,則呼叫next(1)即為搶佔生產9號的權利 long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue;// 這個值即為我們記錄的 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue);// StoreLoad fence long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; } 複製程式碼
方法裡的cachedGatingSequence即為之前說的通過addGatingSequence方法記錄的所有門控序列的最小值,所以門控序列在這裡起到的作用就是讓生產者可以感知到所有消費者中最慢的消費者現在所在的序號,若生產速度已經趕上了這個最慢消費者,那生產者就要park一段時間來等待消費者進行消費 單生產者的裝填資料和釋出過程,
單生產者的裝填資料和釋出過程需要注意的點沒有很多,像我之前說的,裝填資料主要就是往資料容器中裝填資料的過程,這邊主要想提一下發布的過程。釋出的過程,其實就是移動生產者已生產的遊標和通知消費者的過程。
/** * @see Sequencer#publish(long) */ @Override public void publish(long sequence) { cursor.set(sequence); waitStrategy.signalAllWhenBlocking(); } 複製程式碼
上述程式碼中的cursor記錄了該生產者目前生產到的序號,例如若為8,則可以認為此時生產者已經成功生產了8個序號,接下來要佔位生產的是9號序號,為什麼著重講一下移動遊標的這句程式碼呢,因為這邊的處理邏輯和多生產者的時候有一些不同,所以先在這邊提一句,之後在講多生產者的時候,會再進行對比。移動遊標之後要通過waitStrategy通知等待資料的消費者,waitStrategy這個類在我們下一篇講消費者的時候會講到,此時可以先理解為消費者沒資料時會一直等候資料,而生產者在生產完新資料後會去通知消費者。 好的,現在來看一下稍微複雜一點的多生產者是怎麼進行佔位處理的 之前說過佔位需要考慮兩方面因素,消費者的消費速度和生產者之間的搶佔關係。第一個因素在單生產者和多生產者之間都是類似的,主要就是根據門控序列來感知當前最慢消費者的位置。所以現在來分析一下多生產者在搶佔同一個生產位的時候是怎麼做的。 當有兩個或者兩個生產者以上的時候,難免會出現搶佔的情況,而在原來的阻塞佇列裡面,面對這種情況下處理的方法就是進行加鎖操作,而加鎖是必然會降低效能的,所以在之前的文章也提到過當阻塞佇列中有多生產者或者多消費者的時候,效能會急劇下降。那Disruptor是怎麼做的呢?
/** * @see Sequencer#next(int) */ @Override public long next(int n) { ... do { current = cursor.get();// #1 next = current + n;// #2 long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { ... //和單生產者的程式碼類似,不展開講了 } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; } 複製程式碼
因為和單生產者的情況有部分程式碼類似,為了節省篇幅我省去了這一部分,主要看else if()的那句話,可以看到裡面用了CAS操作,這邊描述起來可能有點麻煩,所以我會用一個例子進行說明。
假設,此時有兩個生產者,如果現在整個RingBuffer的生產遊標已經到了12這個位置,那兩個生產者都會呼叫next(1)這個方法,去競爭13這個位置,先看生產者A,進入這段程式碼時在程式碼#1位置,拿到current遊標為12,next遊標為13,那經過上面和最慢消費者位置的比較後,如果發現這個位置是可以生產的,那麼會進入CAS這句程式碼,那這句話實際執行就是cursor.compareAndSet(12,13),與此同時,生產者B也進行了剛才同樣的操作,也進行到了這一步也執行cursor.compareAndSet(12,13),那此時我們會發現根據CAS的特性,只可能有一個生產者修改遊標成功,假設是生產者A好了,那生產者B的這句話就會執行失敗,再次進入永真迴圈的#1程式碼處,而此時cusor已經變成了13,所以生產者B會執行cursor.compareAndSet(13,14),將遊標改到14。所以結果就是生產者A獲得了13號位置的生產權,生產者B獲得了14號位置的生產權。
根據我這個例子,我想大家應該能理解我想表達的意思,多生產者搶佔到了各自的位置,但是回想我們單生產者釋出的時候,我們移動遊標後就相當於向消費者宣告這個位子是有訊息可消費的,但是我們現在兩個生產者只是搶到位子還沒將真正資料放入就已經移動了遊標,那是不是會有問題呢?
關於這個問題,Disruptor的作者也想到,所以他在多生產者的時候,另外設計了一個和RingBuffer大小一致陣列,作為標誌位陣列。只有當生產者真正將資料寫入到相應的位子後,才會將這個標誌位陣列相應的位置置為可消費狀態,所以在多生產者的情況下,消費者在尋找可消費的位置時,除了要看cusor這個遊標移動的情況,還有看對應的這個標誌位是否已經被修改成功。這個標誌位修改的邏輯我們看下一個步驟
多生產者的裝填資料和釋出 裝填資料不多說了,和單生產一樣。主要講講釋出的過程。多生產者的釋出主要是設定這個標誌位陣列。
/** * @see Sequencer#publish(long) */ @Override public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } private void setAvailable(final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); } private int calculateAvailabilityFlag(final long sequence)// indexShift = log2(bufferSize) { return (int) (sequence >>> indexShift); } private int calculateIndex(final long sequence) { return ((int) sequence) & indexMask;//indexMask = bufferSize -1 } 複製程式碼
主要想講講這個設定標誌位的過程,一開始我也不太理解這個過程這麼寫是為什麼。
例如假設我們的bufferSize是8,現在消費者A已經將資料成功寫進了13號位置,然後呼叫了publish這個方法宣告寫入成功,然後我們按整個邏輯走一下,計算index和flag的結果分別為5,1,所以會按照第5個int的計算地址方法,用UNSAFE類將該地址的值設定為1,這個值主要記錄的是現在是第幾圈。那麼之後,在消費者去查詢13號是不是可以消費時,若按照一樣的演算法去查詢第5個位置的值是不是還是1,如果還是1,則證明可以消費,若不是1,也就是生產者還沒修改這個標誌位,則證明資料還沒有準備好,這個位置不能消費。
以上,我們大致分析了一下生產者將資料寫入RingBuffer的過程,不管是單生產者還是多生產者,主要過程還是離不開佔位和裝填資料之後釋出這兩個步驟。回到開頭我們提出的兩個問題,現在試著給出一個答案。