歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列連結
本篇概覽
本篇是《disruptor筆記》的第五篇,前文《disruptor筆記之四:事件消費知識點小結》從理論上梳理分析了獨立消費和共同消費,留下了三個任務,今天就來成這些任務,即編碼實現以下三個場景:
- 100個訂單,簡訊和郵件系統獨立消費
- 100個訂單,郵件系統的兩個郵件伺服器共同消費;
- 100個訂單,簡訊系統獨立消費,與此同時,兩個郵件伺服器共同消費;
原始碼下載
- 本篇實戰中的完整原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
名稱 | 連結 | 備註 |
---|---|---|
專案主頁 | https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
git倉庫地址(ssh) | [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
- 這個git專案中有多個資料夾,本次實戰的原始碼在disruptor-tutorials資料夾下,如下圖紅框所示:
- disruptor-tutorials是個父工程,裡面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:
編寫公共程式碼
- 為了完成任務,編碼實現上面那三個場景,咱們需要先把公共程式碼寫好;
- 首先是在父工程disruptor-tutorials下面新建名為consume-mode的module,其build.gradle內容如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.lmax:disruptor'
testImplementation('org.springframework.boot:spring-boot-starter-test')
}
- springboot啟動類:
package com.bolingcavalry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumeModeApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumeModeApplication.class, args);
}
}
- 訂單事件定義:
package com.bolingcavalry.service;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
public class OrderEvent {
private String value;
}
- 訂單事件的工程類,定義事件例項如何建立:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventFactory;
public class OrderEventFactory implements EventFactory<OrderEvent> {
@Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
- 訂單事件生產者類,定義如何將業務資訊通過事件釋出到環形佇列:
package com.bolingcavalry.service;
import com.lmax.disruptor.RingBuffer;
public class OrderEventProducer {
// 儲存資料的環形佇列
private final RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String content) {
// ringBuffer是個佇列,其next方法返回的是下最後一條記錄之後的位置,這是個可用位置
long sequence = ringBuffer.next();
try {
// sequence位置取出的事件是空事件
OrderEvent orderEvent = ringBuffer.get(sequence);
// 空事件新增業務資訊
orderEvent.setValue(content);
} finally {
// 釋出
ringBuffer.publish(sequence);
}
}
}
- 消費訂單事件的簡訊服務,實現EventHandler介面,所以是用在獨立消費的場景:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class SmsEventHandler implements EventHandler<OrderEvent> {
public SmsEventHandler(Consumer<?> consumer) {
this.consumer = consumer;
}
// 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer;
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("簡訊服務 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
- 消費訂單事件的郵件服務,實現EventHandler介面,所以是用在獨立消費的場景:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class MailEventHandler implements EventHandler<OrderEvent> {
public MailEventHandler(Consumer<?> consumer) {
this.consumer = consumer;
}
// 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer;
@Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("郵件服務 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
- 消費訂單事件的郵件服務,實現WorkHandler介面,所以是用在共同消費的場景:
package com.bolingcavalry.service;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class MailWorkHandler implements WorkHandler<OrderEvent> {
public MailWorkHandler(Consumer<?> consumer) {
this.consumer = consumer;
}
// 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer;
@Override
public void onEvent(OrderEvent event) throws Exception {
log.info("共同消費模式的郵件服務 : {}", event);
// 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
- 最後,將釋出和消費事件的邏輯寫在一個抽象類裡,但是具體如何消費事件並不在此類中實現,而是留給子類,這個抽象類中有幾處要注意的地方稍後會提到:
package com.bolingcavalry.service;
import com.lmax.disruptor.dsl.Disruptor;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
public abstract class ConsumeModeService {
/**
* 獨立消費者數量
*/
public static final int INDEPENDENT_CONSUMER_NUM = 2;
/**
* 環形緩衝區大小
*/
protected int BUFFER_SIZE = 16;
protected Disruptor<OrderEvent> disruptor;
@Setter
private OrderEventProducer producer;
/**
* 統計訊息總數
*/
protected final AtomicLong eventCount = new AtomicLong();
/**
* 這是輔助測試用的,
* 測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,
* 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
* 這樣測試主執行緒就可以結束等待了
*/
private CountDownLatch countDownLatch;
/**
* 這是輔助測試用的,
* 測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,
* 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
* 這樣測試主執行緒就可以結束等待了
*/
private int countDownLatchGate;
/**
* 準備一個匿名類,傳給disruptor的事件處理類,
* 這樣每次處理事件時,都會將已經處理事件的總數打印出來
*/
protected Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
/**
* 這是輔助測試用的,
* 測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,
* 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
* 這樣測試主執行緒就可以結束等待了
*/
if (null!=countDownLatch && count>=countDownLatchGate) {
countDownLatch.countDown();
}
}
};
/**
* 釋出一個事件
* @param value
* @return
*/
public void publish(String value) {
producer.onData(value);
}
/**
* 返回已經處理的任務總數
* @return
*/
public long eventCount() {
return eventCount.get();
}
/**
* 這是輔助測試用的,
* 測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,
* 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
* 這樣測試主執行緒就可以結束等待了
* @param countDownLatch
* @param countDownLatchGate
*/
public void setCountDown(CountDownLatch countDownLatch, int countDownLatchGate) {
this.countDownLatch = countDownLatch;
this.countDownLatchGate = countDownLatchGate;
}
/**
* 留給子類實現具體的事件消費邏輯
*/
protected abstract void disruptorOperate();
@PostConstruct
private void init() {
// 例項化
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"));
// 留給子類實現具體的事件消費邏輯
disruptorOperate();
// 啟動
disruptor.start();
// 生產者
setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
}
}
- 上述程式碼,有以下幾處需要注意:
- init方法是spring bean例項化後要執行的方法,這裡面例項化Disruptor,還啟動了消費執行緒,並且例項化了事件生產者,具體的事件消費邏輯,由子類在disruptorOperate方法中實現;
- eventCountPrinter是個匿名類例項,傳給事件消費的handler後,每消費一個事件都會執行一次eventCountPrinter.accept方法,這樣就把消費事件的總數準確的儲存在eventCount變數中了;
- countDownLatch和countDownLatchGate是為了輔助單元測試而準備的,測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,這樣測試主執行緒就可以結束等待了
- 至此,公用程式碼就寫完了,可見抽象父類已經做好了大部分事情,咱們的子類可以聚焦事件消費的邏輯編排了,開始挨個實現那三個場景;
100個訂單,簡訊和郵件系統獨立消費
- 兩個消費者獨立消費的邏輯非常簡單,就一行程式碼,呼叫handleEventsWith方法把所有消費者例項傳進去,就完事了:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;
@Service("independentModeService")
public class IndependentModeServiceImpl extends ConsumeModeService {
@Override
protected void disruptorOperate() {
// 呼叫handleEventsWith,表示建立的多個消費者,每個都是獨立消費的
// 這裡建立兩個消費者,一個是簡訊的,一個是郵件的
disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter), new MailEventHandler(eventCountPrinter));
}
}
- 單元測試程式碼如下,要注意的地方是釋出完100事件後,呼叫countDownLatch.await()方法開始等待,直到消費者執行緒呼叫countDownLatch.countDown()方法解除等待,還有就是預期的消費訊息總數等於200:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class ConsumeModeServiceTest {
@Autowired
@Qualifier("independentModeService")
ConsumeModeService independentModeService;
/**
* 測試時生產的訊息數量
*/
private static final int EVENT_COUNT = 100;
private void testConsumeModeService(ConsumeModeService service, int eventCount, int expectEventCount) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 告訴service,等消費到expectEventCount個訊息時,就執行countDownLatch.countDown方法
service.setCountDown(countDownLatch, expectEventCount);
for(int i=0;i<eventCount;i++) {
log.info("publich {}", i);
service.publish(String.valueOf(i));
}
// 當前執行緒開始等待,前面的service.setCountDown方法已經告訴過service,
// 等消費到expectEventCount個訊息時,就執行countDownLatch.countDown方法
// 千萬注意,要呼叫await方法,而不是wait方法!
countDownLatch.await();
// 消費的事件總數應該等於釋出的事件數
assertEquals(expectEventCount, service.eventCount());
}
@Test
public void testIndependentModeService() throws InterruptedException {
log.info("start testIndependentModeService");
testConsumeModeService(independentModeService,
EVENT_COUNT,
EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
}
}
- 單元測試執行結果如下,符合預期:
100個訂單,郵件系統的兩個郵件伺服器共同消費
- 兩個消費者共同消費的程式碼也很簡單,呼叫handleEventsWithWorkerPool方法即可,把共同消費的MailWorkHandler例項作為引數傳入:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
@Service("shareModeService")
public class ShareModeServiceImpl extends ConsumeModeService {
@Override
protected void disruptorOperate() {
// mailWorkHandler1模擬一號郵件伺服器
MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);
// mailWorkHandler2模擬一號郵件伺服器
MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);
// 呼叫handleEventsWithWorkerPool,表示建立的多個消費者以共同消費的模式消費
disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
}
}
- 單元測試是在ConsumeModeServiceTest.java中新增如下程式碼,注意由於是共同消費,因此預期的消費事件數等於訊息數,都是100:
@Autowired
@Qualifier("shareModeService")
ConsumeModeService shareModeService;
@Test
public void testShareModeService() throws InterruptedException {
log.info("start testShareModeService");
testConsumeModeService(shareModeService, EVENT_COUNT, EVENT_COUNT);
}
- 執行單元測試,結果如下圖:
100個訂單,簡訊系統獨立消費,與此同時,兩個郵件伺服器共同消費
- 最後一個場景,依舊很簡單,handleEventsWith呼叫一次,再呼叫一次handleEventsWithWorkerPool即可:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;
@Service("independentAndShareModeService")
public class IndependentAndShareModeServiceImpl extends ConsumeModeService {
@Override
protected void disruptorOperate() {
// 呼叫handleEventsWith,表示建立的多個消費者,每個都是獨立消費的
// 這裡建立一個消費者,簡訊服務
disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter));
// mailWorkHandler1模擬一號郵件伺服器
MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);
// mailWorkHandler2模擬一號郵件伺服器
MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);
// 呼叫handleEventsWithWorkerPool,表示建立的多個消費者以共同消費的模式消費
disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
}
}
- 單元測試是在ConsumeModeServiceTest.java中新增如下程式碼,預期的消費事件數應該是200,因為整體上是兩個獨立消費,只不過其中的一個內部有兩個消費者共同消費:
@Autowired
@Qualifier("independentAndShareModeService")
ConsumeModeService independentAndShareModeService;
@Test
public void independentAndShareModeService() throws InterruptedException {
log.info("start independentAndShareModeService");
testConsumeModeService(independentAndShareModeService,
EVENT_COUNT,
EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
}
- 單元測試結果如下,符合預期:
- 至此,獨立消費和共同消費的實戰就完成了,藉助disruptor,三個常見場景都可以輕鬆完成,如果您正在做這些場景的開發,希望本文能給您一些參考;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程式設計師欣宸
微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...
https://github.com/zq2599/blog_demos