歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列連結

  1. 快速入門
  2. Disruptor類分析
  3. 環形佇列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

  • 本文是《disruptor筆記》系列的第三篇,主要任務是編碼實現訊息生產和消費,與《disruptor筆記之一:快速入門》不同的是,本次開發不使用Disruptor類,和Ring Buffer(環形佇列)相關的操作都是自己寫程式碼實現;
  • 這種脫離Disruptor類操作Ring Buffer的做法,不適合用在生產環境,但在學習Disruptor的過程中,這是種高效的學習手段,經過本篇實戰後,在今後使用Disruptor時,您在開發、除錯、優化等各種場景下都能更加得心應手;
  • 簡單的訊息生產消費已不能滿足咱們的學習熱情,今天的實戰要挑戰以下三個場景:
  1. 100個事件,單個消費者消費;
  2. 100個事件,三個消費者,每個都獨自消費這個100個事件;
  3. 100個事件,三個消費者共同消費這個100個事件;

前文回顧

為了完成本篇的實戰,前文《disruptor筆記之二:Disruptor類分析》已做了充分的研究分析,建議觀看,這裡簡單回顧以下Disruptor類的幾個核心功能,這也是咱們編碼時要實現的:

  1. 建立環形佇列(RingBuffer物件)
  2. 建立SequenceBarrier物件,用於接收ringBuffer中的可消費事件
  3. 建立BatchEventProcessor,負責消費事件
  4. 繫結BatchEventProcessor物件的異常處理類
  5. 呼叫ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
  6. 啟動獨立執行緒,用來執行消費事件的業務邏輯
  • 理論分析已經完成,接下來開始編碼;

原始碼下載

名稱 連結 備註
專案主頁 https://github.com/zq2599/blog_demos 該專案在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該專案原始碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該專案原始碼的倉庫地址,ssh協議
  • 這個git專案中有多個資料夾,本次實戰的原始碼在disruptor-tutorials資料夾下,如下圖紅框所示:

  • disruptor-tutorials是個父工程,裡面有多個module,本篇實戰的module是low-level-operate,如下圖紅框所示:

開發

  • 進入編碼階段,今天的任務是挑戰以下三個場景:
  1. 100個事件,單個消費者消費;
  2. 100個事件,三個消費者,每個都獨自消費這個100個事件;
  3. 100個事件,三個消費者共同消費這個100個事件;
  • 咱們先把工程建好,然後編寫公共程式碼,例如事件定義、事件工廠等,最後才是每個場景的開發;
  • 在父工程disruptor-tutorials新增名為low-level-operate的module,其build.gradle如下:
plugins {
id 'org.springframework.boot'
} dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.lmax:disruptor' testImplementation('org.springframework.boot:spring-boot-starter-test')
}
  • 然後是springboot啟動類:
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class LowLevelOperateApplication {
public static void main(String[] args) {
SpringApplication.run(LowLevelOperateApplication.class, args);
}
}
  • 事件類,這是事件的定義:
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString; @Data
@ToString
@NoArgsConstructor
public class StringEvent {
private String value;
}
  • 事件工廠,定義如何在記憶體中建立事件物件:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class StringEventFactory implements EventFactory<StringEvent> {
@Override
public StringEvent newInstance() {
return new StringEvent();
}
}
  • 事件生產類,定義如何將業務邏輯的事件轉為disruptor事件釋出到環形佇列,用於消費:
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class StringEventProducer {

    // 儲存資料的環形佇列
private final RingBuffer<StringEvent> ringBuffer; public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
} public void onData(String content) { // ringBuffer是個佇列,其next方法返回的是下最後一條記錄之後的位置,這是個可用位置
long sequence = ringBuffer.next(); try {
// sequence位置取出的事件是空事件
StringEvent stringEvent = ringBuffer.get(sequence);
// 空事件新增業務資訊
stringEvent.setValue(content);
} finally {
// 釋出
ringBuffer.publish(sequence);
}
}
}
  • 事件處理類,收到事件後具體的業務處理邏輯:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer; @Slf4j
public class StringEventHandler implements EventHandler<StringEvent> { public StringEventHandler(Consumer<?> consumer) {
this.consumer = consumer;
} // 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer; @Override
public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100); // 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
  • 定義一個介面,外部通過呼叫介面的方法來生產訊息,再放幾個常量在裡面後面會用到:
package com.bolingcavalry.service;

public interface LowLevelOperateService {
/**
* 消費者數量
*/
int CONSUMER_NUM = 3; /**
* 環形緩衝區大小
*/
int BUFFER_SIZE = 16; /**
* 釋出一個事件
* @param value
* @return
*/
void publish(String value); /**
* 返回已經處理的任務總數
* @return
*/
long eventCount();
}
  • 以上就是公共程式碼了,接下來逐個實現之前規劃的三個場景;

100個事件,單個消費者消費

  • 這是最簡單的功能了,實現釋出訊息和單個消費者消費的功能,程式碼如下,有幾處要注意的地方稍後提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; @Service("oneConsumer")
@Slf4j
public class OneConsumerServiceImpl implements LowLevelOperateService { private RingBuffer<StringEvent> ringBuffer; private StringEventProducer producer; /**
* 統計訊息總數
*/
private final AtomicLong eventCount = new AtomicLong(); private ExecutorService executors; @PostConstruct
private void init() {
// 準備一個匿名類,傳給disruptor的事件處理類,
// 這樣每次處理事件時,都會將已經處理事件的總數打印出來
Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
}
}; // 建立環形佇列例項
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); // 準備執行緒池
executors = Executors.newFixedThreadPool(1); //建立SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); // 建立事件處理的工作類,裡面執行StringEventHandler處理事件
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
ringBuffer,
sequenceBarrier,
new StringEventHandler(eventCountPrinter)); // 將消費者的sequence傳給環形佇列
ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); // 在一個獨立執行緒中取事件並消費
executors.submit(batchEventProcessor); // 生產者
producer = new StringEventProducer(ringBuffer);
} @Override
public void publish(String value) {
producer.onData(value);
} @Override
public long eventCount() {
return eventCount.get();
}
}
  • 上述程式碼有以下幾處需要注意:
  1. 自己建立環形佇列RingBuffer例項
  2. 自己準備執行緒池,裡面的執行緒用來獲取和消費訊息
  3. 自己動手建立BatchEventProcessor例項,並把事件處理類傳入
  4. 通過ringBuffer建立sequenceBarrier,傳給BatchEventProcessor例項使用
  5. 將BatchEventProcessor的sequence傳給ringBuffer,確保ringBuffer的生產和消費不會出現混亂
  6. 啟動執行緒池,意味著BatchEventProcessor例項在一個獨立執行緒中不斷的從ringBuffer中獲取事件並消費;
  • 為了驗證上述程式碼能否正常工作,我這裡寫了個單元測試類,如下所示,邏輯很簡單,呼叫OneConsumerServiceImpl.publish方法一百次,產生一百個事件,再檢查OneConsumerServiceImpl記錄的消費事件總數是不是等於一百:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.LowLevelOperateService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals; @RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class LowLeverOperateServiceImplTest { @Autowired
@Qualifier("oneConsumer")
LowLevelOperateService oneConsumer; private static final int EVENT_COUNT = 100; private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {
for(int i=0;i<eventCount;i++) {
log.info("publich {}", i);
service.publish(String.valueOf(i));
} // 非同步消費,因此需要延時等待
Thread.sleep(10000); // 消費的事件總數應該等於釋出的事件數
assertEquals(expectEventCount, service.eventCount());
} @Test
public void testOneConsumer() throws InterruptedException {
log.info("start testOneConsumerService");
testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT);
}
  • 注意,如果您是直接在IDEA上點選圖示來執行單元測試,記得勾選下圖紅框中選項,否則可能出現編譯失敗:

  • 執行上述單元測試類,結果如下圖所示,訊息的生產和消費都符合預期,並且消費邏輯是在獨立執行緒中執行的:

  • 繼續挑戰下一個場景;

100個事件,三個消費者,每個都獨自消費這個100個事件

  • 這個場景在kafka中也有,就是三個消費者的group不同,這樣每一條訊息,這兩個消費者各自消費一次;
  • 因此,100個事件,3個消費者每人都會獨立消費這100個事件,一共消費300次;
  • 程式碼如下,有幾處要注意的地方稍後提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; @Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService { private RingBuffer<StringEvent> ringBuffer; private StringEventProducer producer; /**
* 統計訊息總數
*/
private final AtomicLong eventCount = new AtomicLong(); /**
* 生產一個BatchEventProcessor例項,並且啟動獨立執行緒開始獲取和消費訊息
* @param executorService
*/
private void addProcessor(ExecutorService executorService) {
// 準備一個匿名類,傳給disruptor的事件處理類,
// 這樣每次處理事件時,都會將已經處理事件的總數打印出來
Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
}
}; BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
ringBuffer,
ringBuffer.newBarrier(),
new StringEventHandler(eventCountPrinter)); // 將當前消費者的sequence例項傳給ringBuffer
ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); // 啟動獨立執行緒獲取和消費事件
executorService.submit(batchEventProcessor);
} @PostConstruct
private void init() {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM); // 建立多個消費者,並在獨立執行緒中獲取和消費事件
for (int i=0;i<CONSUMER_NUM;i++) {
addProcessor(executorService);
} // 生產者
producer = new StringEventProducer(ringBuffer);
} @Override
public void publish(String value) {
producer.onData(value);
} @Override
public long eventCount() {
return eventCount.get();
}
}
  • 上述程式碼和前面的OneConsumerServiceImpl相比差別不大,主要是建立了多個BatchEventProcessor例項,然後分別線上程池中提交;

  • 驗證方法依舊是單元測試,在剛才的LowLeverOperateServiceImplTest.java中增加程式碼即可,注意testLowLevelOperateService的第三個引數是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示預期的被消費訊息數為300

 	@Autowired
@Qualifier("multiConsumer")
LowLevelOperateService multiConsumer; @Test
public void testMultiConsumer() throws InterruptedException {
log.info("start testMultiConsumer");
testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
}
  • 執行單元測試,如下圖所示,一共消費了300個事件,並且三個消費者在不動執行緒:

100個事件,三個消費者共同消費這個100個事件

  • 本篇的最後一個實戰是釋出100個事件,然後讓三個消費者共同消費100個(例如A消費33個,B消費33個,C消費34個);

  • 前面用到的BatchEventProcessor是用來獨立消費的,不適合多個消費者共同消費,這種多個消費共同消費的場景需要藉助WorkerPool來完成,這個名字還是很形象的:一個池子裡面有很多個工作者,把任務放入這個池子,工作者們每人處理一部分,大家合力將任務完成;

  • 傳入WorkerPool的消費者需要實現WorkHandler介面,於是新增一個實現類:

package com.bolingcavalry.service;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer; @Slf4j
public class StringWorkHandler implements WorkHandler<StringEvent> { public StringWorkHandler(Consumer<?> consumer) {
this.consumer = consumer;
} // 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer; @Override
public void onEvent(StringEvent event) throws Exception {
log.info("work handler event : {}", event); // 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100); // 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
  • 新增服務類,實現共同消費的邏輯,有幾處要注意的地方稍後會提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; @Service("workerPoolConsumer")
@Slf4j
public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService { private RingBuffer<StringEvent> ringBuffer; private StringEventProducer producer; /**
* 統計訊息總數
*/
private final AtomicLong eventCount = new AtomicLong(); @PostConstruct
private void init() {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM); StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM]; // 建立多個StringWorkHandler例項,放入一個數組中
for (int i=0;i < CONSUMER_NUM;i++) {
handlers[i] = new StringWorkHandler(o -> {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
});
} // 建立WorkerPool例項,將StringWorkHandler例項的陣列傳進去,代表共同消費者的數量
WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers); // 這一句很重要,去掉就會出現重複消費同一個事件的問題
ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(executorService); // 生產者
producer = new StringEventProducer(ringBuffer);
} @Override
public void publish(String value) {
producer.onData(value);
} @Override
public long eventCount() {
return eventCount.get();
}
}
  • 上述程式碼中,要注意的有以下兩處:
  1. StringWorkHandler陣列傳入給WorkerPool後,每個StringWorkHandler例項都放入一個新的WorkProcessor例項,WorkProcessor實現了Runnable介面,在執行workerPool.start時,會將WorkProcessor提交到執行緒池中;

  2. 和前面的獨立消費相比,共同消費最大的特點在於只調用了一次ringBuffer.addGatingSequences方法,也就是說三個消費者共用一個sequence例項;

  • 驗證方法依舊是單元測試,在剛才的LowLeverOperateServiceImplTest.java中增加程式碼即可,注意testWorkerPoolConsumer的第三個引數是EVENT_COUNT,表示預期的被消費訊息數為100
 	@Autowired
@Qualifier("workerPoolConsumer")
LowLevelOperateService workerPoolConsumer; @Test
public void testWorkerPoolConsumer() throws InterruptedException {
log.info("start testWorkerPoolConsumer");
testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);
}
  • 執行單元測試如下圖所示,三個消費者一共消費100個事件,且三個消費者在不同執行緒:

  • 至此,咱們在不用Disruptor類的前提下完成了三種常見場景的訊息生產消費,相信您對Disruptor的底層實現也有了深刻認識,今後不論是使用還是優化Disruptor,一定可以更加得心應手;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中介軟體系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...

https://github.com/zq2599/blog_demos