歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列連結

  1. 快速入門
  2. Disruptor類分析
  3. 環形佇列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

本篇是《disruptor筆記》的第五篇,前文《disruptor筆記之四:事件消費知識點小結》從理論上梳理分析了獨立消費和共同消費,留下了三個任務,今天就來成這些任務,即編碼實現以下三個場景:

  1. 100個訂單,簡訊和郵件系統獨立消費
  2. 100個訂單,郵件系統的兩個郵件伺服器共同消費;
  3. 100個訂單,簡訊系統獨立消費,與此同時,兩個郵件伺服器共同消費;

原始碼下載

名稱 連結 備註
專案主頁 https://github.com/zq2599/blog_demos 該專案在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該專案原始碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該專案原始碼的倉庫地址,ssh協議
  • 這個git專案中有多個資料夾,本次實戰的原始碼在disruptor-tutorials資料夾下,如下圖紅框所示:

  • disruptor-tutorials是個父工程,裡面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:

編寫公共程式碼

  • 為了完成任務,編碼實現上面那三個場景,咱們需要先把公共程式碼寫好;
  • 首先是在父工程disruptor-tutorials下面新建名為consume-mode的module,其build.gradle內容如下:
plugins {
id 'org.springframework.boot'
} dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.lmax:disruptor' testImplementation('org.springframework.boot:spring-boot-starter-test')
}
  • springboot啟動類:
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class ConsumeModeApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumeModeApplication.class, args);
}
}
  • 訂單事件定義:
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString; @Data
@ToString
@NoArgsConstructor
public class OrderEvent { private String value;
}
    • 訂單事件的工程類,定義事件例項如何建立:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class OrderEventFactory implements EventFactory<OrderEvent> {

    @Override
public OrderEvent newInstance() {
return new OrderEvent();
}
}
  • 訂單事件生產者類,定義如何將業務資訊通過事件釋出到環形佇列:
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class OrderEventProducer {
// 儲存資料的環形佇列
private final RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
} public void onData(String content) {
// ringBuffer是個佇列,其next方法返回的是下最後一條記錄之後的位置,這是個可用位置
long sequence = ringBuffer.next(); try {
// sequence位置取出的事件是空事件
OrderEvent orderEvent = ringBuffer.get(sequence);
// 空事件新增業務資訊
orderEvent.setValue(content);
} finally {
// 釋出
ringBuffer.publish(sequence);
}
}
}
  • 消費訂單事件的簡訊服務,實現EventHandler介面,所以是用在獨立消費的場景:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer; @Slf4j
public class SmsEventHandler implements EventHandler<OrderEvent> { public SmsEventHandler(Consumer<?> consumer) {
this.consumer = consumer;
} // 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer; @Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("簡訊服務 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100); // 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
  • 消費訂單事件的郵件服務,實現EventHandler介面,所以是用在獨立消費的場景:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer; @Slf4j
public class MailEventHandler implements EventHandler<OrderEvent> { public MailEventHandler(Consumer<?> consumer) {
this.consumer = consumer;
} // 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer; @Override
public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("郵件服務 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100); // 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
  • 消費訂單事件的郵件服務,實現WorkHandler介面,所以是用在共同消費的場景:
package com.bolingcavalry.service;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer; @Slf4j
public class MailWorkHandler implements WorkHandler<OrderEvent> { public MailWorkHandler(Consumer<?> consumer) {
this.consumer = consumer;
} // 外部可以傳入Consumer實現類,每處理一條訊息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer; @Override
public void onEvent(OrderEvent event) throws Exception {
log.info("共同消費模式的郵件服務 : {}", event); // 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100); // 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
  • 最後,將釋出和消費事件的邏輯寫在一個抽象類裡,但是具體如何消費事件並不在此類中實現,而是留給子類,這個抽象類中有幾處要注意的地方稍後會提到:
package com.bolingcavalry.service;

import com.lmax.disruptor.dsl.Disruptor;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; public abstract class ConsumeModeService {
/**
* 獨立消費者數量
*/
public static final int INDEPENDENT_CONSUMER_NUM = 2; /**
* 環形緩衝區大小
*/
protected int BUFFER_SIZE = 16; protected Disruptor<OrderEvent> disruptor; @Setter
private OrderEventProducer producer; /**
* 統計訊息總數
*/
protected final AtomicLong eventCount = new AtomicLong(); /**
* 這是輔助測試用的,
* 測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,
* 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
* 這樣測試主執行緒就可以結束等待了
*/
private CountDownLatch countDownLatch; /**
* 這是輔助測試用的,
* 測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,
* 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
* 這樣測試主執行緒就可以結束等待了
*/
private int countDownLatchGate; /**
* 準備一個匿名類,傳給disruptor的事件處理類,
* 這樣每次處理事件時,都會將已經處理事件的總數打印出來
*/
protected Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet(); /**
* 這是輔助測試用的,
* 測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,
* 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
* 這樣測試主執行緒就可以結束等待了
*/
if (null!=countDownLatch && count>=countDownLatchGate) {
countDownLatch.countDown();
}
}
}; /**
* 釋出一個事件
* @param value
* @return
*/
public void publish(String value) {
producer.onData(value);
} /**
* 返回已經處理的任務總數
* @return
*/
public long eventCount() {
return eventCount.get();
} /**
* 這是輔助測試用的,
* 測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,
* 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
* 這樣測試主執行緒就可以結束等待了
* @param countDownLatch
* @param countDownLatchGate
*/
public void setCountDown(CountDownLatch countDownLatch, int countDownLatchGate) {
this.countDownLatch = countDownLatch;
this.countDownLatchGate = countDownLatchGate;
} /**
* 留給子類實現具體的事件消費邏輯
*/
protected abstract void disruptorOperate(); @PostConstruct
private void init() {
// 例項化
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-")); // 留給子類實現具體的事件消費邏輯
disruptorOperate(); // 啟動
disruptor.start(); // 生產者
setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
}
}
  • 上述程式碼,有以下幾處需要注意:
  1. init方法是spring bean例項化後要執行的方法,這裡面例項化Disruptor,還啟動了消費執行緒,並且例項化了事件生產者,具體的事件消費邏輯,由子類在disruptorOperate方法中實現;
  2. eventCountPrinter是個匿名類例項,傳給事件消費的handler後,每消費一個事件都會執行一次eventCountPrinter.accept方法,這樣就把消費事件的總數準確的儲存在eventCount變數中了;
  3. countDownLatch和countDownLatchGate是為了輔助單元測試而準備的,測試的時候,完成事件釋出後,測試主執行緒就用這個countDownLatch開始等待,在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,這樣測試主執行緒就可以結束等待了
  • 至此,公用程式碼就寫完了,可見抽象父類已經做好了大部分事情,咱們的子類可以聚焦事件消費的邏輯編排了,開始挨個實現那三個場景;

100個訂單,簡訊和郵件系統獨立消費

  • 兩個消費者獨立消費的邏輯非常簡單,就一行程式碼,呼叫handleEventsWith方法把所有消費者例項傳進去,就完事了:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service; @Service("independentModeService")
public class IndependentModeServiceImpl extends ConsumeModeService { @Override
protected void disruptorOperate() {
// 呼叫handleEventsWith,表示建立的多個消費者,每個都是獨立消費的
// 這裡建立兩個消費者,一個是簡訊的,一個是郵件的
disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter), new MailEventHandler(eventCountPrinter));
}
}
  • 單元測試程式碼如下,要注意的地方是釋出完100事件後,呼叫countDownLatch.await()方法開始等待,直到消費者執行緒呼叫countDownLatch.countDown()方法解除等待,還有就是預期的消費訊息總數等於200
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals; @RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class ConsumeModeServiceTest { @Autowired
@Qualifier("independentModeService")
ConsumeModeService independentModeService; /**
* 測試時生產的訊息數量
*/
private static final int EVENT_COUNT = 100; private void testConsumeModeService(ConsumeModeService service, int eventCount, int expectEventCount) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1); // 告訴service,等消費到expectEventCount個訊息時,就執行countDownLatch.countDown方法
service.setCountDown(countDownLatch, expectEventCount); for(int i=0;i<eventCount;i++) {
log.info("publich {}", i);
service.publish(String.valueOf(i));
} // 當前執行緒開始等待,前面的service.setCountDown方法已經告訴過service,
// 等消費到expectEventCount個訊息時,就執行countDownLatch.countDown方法
// 千萬注意,要呼叫await方法,而不是wait方法!
countDownLatch.await(); // 消費的事件總數應該等於釋出的事件數
assertEquals(expectEventCount, service.eventCount());
} @Test
public void testIndependentModeService() throws InterruptedException {
log.info("start testIndependentModeService");
testConsumeModeService(independentModeService,
EVENT_COUNT,
EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
}
}
  • 單元測試執行結果如下,符合預期:

100個訂單,郵件系統的兩個郵件伺服器共同消費

  • 兩個消費者共同消費的程式碼也很簡單,呼叫handleEventsWithWorkerPool方法即可,把共同消費的MailWorkHandler例項作為引數傳入:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service; @Service("shareModeService")
public class ShareModeServiceImpl extends ConsumeModeService {
@Override
protected void disruptorOperate() {
// mailWorkHandler1模擬一號郵件伺服器
MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter); // mailWorkHandler2模擬一號郵件伺服器
MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter); // 呼叫handleEventsWithWorkerPool,表示建立的多個消費者以共同消費的模式消費
disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
}
}
  • 單元測試是在ConsumeModeServiceTest.java中新增如下程式碼,注意由於是共同消費,因此預期的消費事件數等於訊息數,都是100:
    @Autowired
@Qualifier("shareModeService")
ConsumeModeService shareModeService; @Test
public void testShareModeService() throws InterruptedException {
log.info("start testShareModeService");
testConsumeModeService(shareModeService, EVENT_COUNT, EVENT_COUNT);
}
  • 執行單元測試,結果如下圖:

100個訂單,簡訊系統獨立消費,與此同時,兩個郵件伺服器共同消費

  • 最後一個場景,依舊很簡單,handleEventsWith呼叫一次,再呼叫一次handleEventsWithWorkerPool即可:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service; @Service("independentAndShareModeService")
public class IndependentAndShareModeServiceImpl extends ConsumeModeService {
@Override
protected void disruptorOperate() {
// 呼叫handleEventsWith,表示建立的多個消費者,每個都是獨立消費的
// 這裡建立一個消費者,簡訊服務
disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter)); // mailWorkHandler1模擬一號郵件伺服器
MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter); // mailWorkHandler2模擬一號郵件伺服器
MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter); // 呼叫handleEventsWithWorkerPool,表示建立的多個消費者以共同消費的模式消費
disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
}
}
  • 單元測試是在ConsumeModeServiceTest.java中新增如下程式碼,預期的消費事件數應該是200,因為整體上是兩個獨立消費,只不過其中的一個內部有兩個消費者共同消費:
    @Autowired
@Qualifier("independentAndShareModeService")
ConsumeModeService independentAndShareModeService; @Test
public void independentAndShareModeService() throws InterruptedException {
log.info("start independentAndShareModeService");
testConsumeModeService(independentAndShareModeService,
EVENT_COUNT,
EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
}
  • 單元測試結果如下,符合預期:

  • 至此,獨立消費和共同消費的實戰就完成了,藉助disruptor,三個常見場景都可以輕鬆完成,如果您正在做這些場景的開發,希望本文能給您一些參考;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中介軟體系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...

https://github.com/zq2599/blog_demos