歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列連結

  1. 快速入門
  2. Disruptor類分析
  3. 環形佇列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

  • 本文是《disruptor筆記》系列的第六篇,主要內容是將一些常用的消費模式做彙總,後續日常開發中如果有需要就能拿來即用;
  • 以下是常用的模式:
  1. 多個消費者獨立消費,前文已實現,本篇跳過
  2. 多個消費者共同消費,前文已實現,本篇跳過
  3. 既有獨立消費,也有共同消費,前文已實現,本篇跳過
  4. 多個生產者和多個獨立消費者:

  1. C1、C2獨立消費,C3依賴C1和C2

  1. C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3:

  1. C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:

  1. C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:

  1. C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:

  1. C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:

關於本篇程式碼

  • 為了省事兒,本次不會新建工程,而是直接使用前文的consume-mode模組,因此,下面這些類直接就直接使用了,無需重寫程式碼:
  1. 事件定義:OrderEvent
  2. 事件工廠:OrderEventFactory
  3. 事件生產者:OrderEventProducer
  4. 用在獨立消費場景的事件消費者:MailEventHandler
  5. 用在共同消費場景的事件消費者:MailWorkHandler

原始碼下載

名稱 連結 備註
專案主頁 https://github.com/zq2599/blog_demos 該專案在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該專案原始碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該專案原始碼的倉庫地址,ssh協議
  • 這個git專案中有多個資料夾,本次實戰的原始碼在disruptor-tutorials資料夾下,如下圖紅框所示:

  • disruptor-tutorials是個父工程,裡面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:

多個生產者和多個獨立消費者

咱們即將實現下圖的邏輯:

  • 前面幾篇文章所有實戰的生產者都只有一個,到了本篇,為了讓consume-mode模組的程式碼能夠支援多生產者,咱們要對功能業務的抽象父類做以下兩處改動:
  1. init方法原本為private型,現在為了能讓子類重此方法,將其改為protected型別;

  2. 增加名為publishWithProducer2的方法,可見內部只有丟擲異常,要想其正常工作,需要子類自己來實現:

public void publishWithProducer2(String value) throws Exception {
throw new Exception("父類未實現此方法,請在子類中重寫此方法後再呼叫");
}
  • 為了實現多生產者功能,新增MultiProducerServiceImpl.java,有幾處要注意的地方稍後會提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct; @Service("multiProducerService")
public class MultiProducerServiceImpl extends ConsumeModeService { /**
* 第二個生產者
*/
@Setter
protected OrderEventProducer producer2; @PostConstruct
@Override
protected void init() {
// 例項化
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"),
// 生產型別是多生產者
ProducerType.MULTI,
// BlockingWaitStrategy是預設的等待策略
new BlockingWaitStrategy()); // 留給子類實現具體的事件消費邏輯
disruptorOperate(); // 啟動
disruptor.start(); // 第一個生產者
setProducer(new OrderEventProducer(disruptor.getRingBuffer())); // 第二個生產者
setProducer2(new OrderEventProducer(disruptor.getRingBuffer()));
} @Override
protected void disruptorOperate() {
// 一號消費者
MailEventHandler c1 = new MailEventHandler(eventCountPrinter); // 二號消費者
MailEventHandler c2 = new MailEventHandler(eventCountPrinter); // 呼叫handleEventsWithWorkerPool,表示建立的多個消費者以共同消費的模式消費
disruptor.handleEventsWith(c1, c2);
} @Override
public void publishWithProducer2(String value) throws Exception {
producer2.onData(value);
}
}
  • 上述程式碼有以下幾處要注意:
  1. 重寫父類的init方法,主要是例項化Disruptor的時候,多傳入兩個引數:ProducerType.MULTI表示生產型別是多生產者,BlockingWaitStrategy是等待策略,之前的程式碼中咱們沒有傳此引數時,預設的就是BlockingWaitStrategy
  2. init方法中還執行了setProducer2方法,設定成員變數producer2
  3. 重寫publishWithProducer2方法,呼叫成員變數producer2發表事件
  4. 重寫disruptorOperate方法,裡面設定了兩個獨立消費者
  • 驗證上述程式碼的方式依舊是單元測試,開啟ConsumeModeServiceTest.java,新增以下程式碼,可見新增了兩個執行緒同時執行釋出事件的操作:
    @Autowired
@Qualifier("multiProducerService")
ConsumeModeService multiProducerService; @Test
public void testMultiProducerService() throws InterruptedException {
log.info("start testMultiProducerService");
CountDownLatch countDownLatch = new CountDownLatch(1); // 兩個生產者,每個生產100個事件,一共生產兩百個事件
// 兩個獨立消費者,每人消費200個事件,因此一共消費400個事件
int expectEventCount = EVENT_COUNT*4; // 告訴service,等消費到400個訊息時,就執行countDownLatch.countDown方法
multiProducerService.setCountDown(countDownLatch, expectEventCount); // 啟動一個執行緒,用第一個生產者生產事件
new Thread(() -> {
for(int i=0;i<EVENT_COUNT;i++) {
log.info("publich {}", i);
multiProducerService.publish(String.valueOf(i));
}
}).start(); // 再啟動一個執行緒,用第二個生產者生產事件
new Thread(() -> {
for(int i=0;i<EVENT_COUNT;i++) {
log.info("publishWithProducer2 {}", i);
try {
multiProducerService.publishWithProducer2(String.valueOf(i));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start(); // 當前執行緒開始等待,前面的service.setCountDown方法已經告訴過service,
// 等消費到expectEventCount個訊息時,就執行countDownLatch.countDown方法
// 千萬注意,要呼叫await方法,而不是wait方法!
countDownLatch.await(); // 消費的事件總數應該等於釋出的事件數
assertEquals(expectEventCount, multiProducerService.eventCount());
}
  • 測試結果如下,測試通過,符合預期:

C1、C2獨立消費,C3依賴C1和C2

  • 邏輯圖如下:

  • 實現程式碼如下,非常簡單,依賴關係用then即可實現:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service; @Service("scene5")
public class Scene5 extends ConsumeModeService { @Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter); disruptor
// C1、C2獨立消費
.handleEventsWith(c1, c2)
// C3依賴C1和C2
.then(c3);
}
}
  • 單元測試程式碼:
    @Autowired
@Qualifier("scene5")
Scene5 scene5; @Test
public void testScene5 () throws InterruptedException {
log.info("start testScene5");
testConsumeModeService(scene5,
EVENT_COUNT,
// 三個獨立消費者,一共消費300個事件
EVENT_COUNT * 3);
}
  • 為了節省篇幅,測試結果就不貼了,要注意的是,每個事件都一定是C1和C2先消費過,才會被C3消費到;

C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3

  • 邏輯圖如下:

  • 實現程式碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service; @Service("scene6")
public class Scene6 extends ConsumeModeService { @Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter); disruptor
// C1
.handleEventsWith(c1)
// C2和C3也獨立消費
.then(c2, c3)
// C4依賴C2和C3
.then(c4);
}
}
  • 單元測試程式碼:
    @Autowired
@Qualifier("scene6")
Scene6 scene6; @Test
public void testScene6 () throws InterruptedException {
log.info("start testScene6");
testConsumeModeService(scene6,
EVENT_COUNT,
// 四個獨立消費者,一共消費400個事件
EVENT_COUNT * 4);
}

C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

  • 實現程式碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service; @Service("scene7")
public class Scene7 extends ConsumeModeService { @Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter); disruptor
// C1和C2獨立消費
.handleEventsWith(c1, c2)
// C3和C4也是獨立消費,但C3和C4都依賴C1和C2
.then(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
}
  • 單元測試程式碼:
    @Autowired
@Qualifier("scene7")
Scene7 scene7; @Test
public void testScene7 () throws InterruptedException {
log.info("start testScene7");
testConsumeModeService(scene7,
EVENT_COUNT,
// 五個獨立消費者,一共消費500個事件
EVENT_COUNT * 5);
}

C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

  • 實現程式碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service; /**
* @author will ([email protected])
* @version 1.0
* @description: C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
* @date 2021/5/23 11:05
*/
@Service("scene8")
public class Scene8 extends ConsumeModeService { @Override
protected void disruptorOperate() {
MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c5 = new MailWorkHandler(eventCountPrinter); disruptor
// C1和C2共同消費
.handleEventsWithWorkerPool(c1, c2)
// C3和C4也是獨立消費,但C3和C4都依賴C1和C2
.thenHandleEventsWithWorkerPool(c3, c4)
// 然後C5依賴C3和C4
.thenHandleEventsWithWorkerPool(c5);
}
}
  • 單元測試程式碼:
    @Autowired
@Qualifier("scene8")
Scene8 scene8; @Test
public void testScene8 () throws InterruptedException {
log.info("start testScene8");
testConsumeModeService(scene8,
EVENT_COUNT,
// C1和C2共同消費,C3和C4共同消費,C5雖然只是一個,但也是共同消費模式,
// 也就是一共有三組消費者,所以一共消費300個事件
EVENT_COUNT * 3);
}

C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

  • 實現程式碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service; @Service("scene9")
public class Scene9 extends ConsumeModeService { @Override
protected void disruptorOperate() {
MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter); disruptor
// C1和C2共同消費
.handleEventsWithWorkerPool(c1, c2)
// C3和C4獨立消費,但C3和C4都依賴C1和C2
.then(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
}
  • 單元測試程式碼:
    @Autowired
@Qualifier("scene9")
Scene9 scene9; @Test
public void testScene9 () throws InterruptedException {
log.info("start testScene9");
testConsumeModeService(scene9,
EVENT_COUNT,
// C1和C2共同消費(100個事件),
// C3和C4獨立消費(200個事件),
// C5獨立消費(100個事件),
// 所以一共消費400個事件
EVENT_COUNT * 4);
}

C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

  • 實現程式碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service; @Service("scene10")
public class Scene10 extends ConsumeModeService { @Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter); disruptor
// C1和C2共同消費
.handleEventsWith(c1, c2)
// C3和C4是共同消費,但C3和C4都依賴C1和C2
.thenHandleEventsWithWorkerPool(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
}
  • 單元測試程式碼:
    @Test
public void testScene10 () throws InterruptedException {
log.info("start testScene10");
testConsumeModeService(scene10,
EVENT_COUNT,
// C1和C2獨立消費(200個事件),
// C3和C4共同消費(100個事件),
// C5獨立消費(100個事件),
// 所以一共消費400個事件
EVENT_COUNT * 4);
}
  • 至此,一些常見場景的程式碼已完成,希望本文能給您一些參考,幫您更得心應手的用好這個優秀的工具;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中介軟體系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...

https://github.com/zq2599/blog_demos