歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列連結
本篇概覽
- 本文是《disruptor筆記》系列的第六篇,主要內容是將一些常用的消費模式做彙總,後續日常開發中如果有需要就能拿來即用;
- 以下是常用的模式:
- C1、C2獨立消費,C3依賴C1和C2
- C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3:
- C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:
- C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:
- C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:
- C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4:
關於本篇程式碼
- 為了省事兒,本次不會新建工程,而是直接使用前文的consume-mode模組,因此,下面這些類直接就直接使用了,無需重寫程式碼:
- 事件定義:OrderEvent
- 事件工廠:OrderEventFactory
- 事件生產者:OrderEventProducer
- 用在獨立消費場景的事件消費者:MailEventHandler
- 用在共同消費場景的事件消費者:MailWorkHandler
原始碼下載
- 本篇實戰中的完整原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
名稱 | 連結 | 備註 |
---|---|---|
專案主頁 | https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
git倉庫地址(ssh) | [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
- 這個git專案中有多個資料夾,本次實戰的原始碼在disruptor-tutorials資料夾下,如下圖紅框所示:
- disruptor-tutorials是個父工程,裡面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:
多個生產者和多個獨立消費者
咱們即將實現下圖的邏輯:
- 前面幾篇文章所有實戰的生產者都只有一個,到了本篇,為了讓consume-mode模組的程式碼能夠支援多生產者,咱們要對功能業務的抽象父類做以下兩處改動:
init方法原本為private型,現在為了能讓子類重此方法,將其改為protected型別;
增加名為publishWithProducer2的方法,可見內部只有丟擲異常,要想其正常工作,需要子類自己來實現:
public void publishWithProducer2(String value) throws Exception {
throw new Exception("父類未實現此方法,請在子類中重寫此方法後再呼叫");
}
- 為了實現多生產者功能,新增MultiProducerServiceImpl.java,有幾處要注意的地方稍後會提到:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Service("multiProducerService")
public class MultiProducerServiceImpl extends ConsumeModeService {
/**
* 第二個生產者
*/
@Setter
protected OrderEventProducer producer2;
@PostConstruct
@Override
protected void init() {
// 例項化
disruptor = new Disruptor<>(new OrderEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"),
// 生產型別是多生產者
ProducerType.MULTI,
// BlockingWaitStrategy是預設的等待策略
new BlockingWaitStrategy());
// 留給子類實現具體的事件消費邏輯
disruptorOperate();
// 啟動
disruptor.start();
// 第一個生產者
setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
// 第二個生產者
setProducer2(new OrderEventProducer(disruptor.getRingBuffer()));
}
@Override
protected void disruptorOperate() {
// 一號消費者
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
// 二號消費者
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
// 呼叫handleEventsWithWorkerPool,表示建立的多個消費者以共同消費的模式消費
disruptor.handleEventsWith(c1, c2);
}
@Override
public void publishWithProducer2(String value) throws Exception {
producer2.onData(value);
}
}
- 上述程式碼有以下幾處要注意:
- 重寫父類的init方法,主要是例項化Disruptor的時候,多傳入兩個引數:ProducerType.MULTI表示生產型別是多生產者,BlockingWaitStrategy是等待策略,之前的程式碼中咱們沒有傳此引數時,預設的就是BlockingWaitStrategy
- init方法中還執行了setProducer2方法,設定成員變數producer2
- 重寫publishWithProducer2方法,呼叫成員變數producer2發表事件
- 重寫disruptorOperate方法,裡面設定了兩個獨立消費者
- 驗證上述程式碼的方式依舊是單元測試,開啟ConsumeModeServiceTest.java,新增以下程式碼,可見新增了兩個執行緒同時執行釋出事件的操作:
@Autowired
@Qualifier("multiProducerService")
ConsumeModeService multiProducerService;
@Test
public void testMultiProducerService() throws InterruptedException {
log.info("start testMultiProducerService");
CountDownLatch countDownLatch = new CountDownLatch(1);
// 兩個生產者,每個生產100個事件,一共生產兩百個事件
// 兩個獨立消費者,每人消費200個事件,因此一共消費400個事件
int expectEventCount = EVENT_COUNT*4;
// 告訴service,等消費到400個訊息時,就執行countDownLatch.countDown方法
multiProducerService.setCountDown(countDownLatch, expectEventCount);
// 啟動一個執行緒,用第一個生產者生產事件
new Thread(() -> {
for(int i=0;i<EVENT_COUNT;i++) {
log.info("publich {}", i);
multiProducerService.publish(String.valueOf(i));
}
}).start();
// 再啟動一個執行緒,用第二個生產者生產事件
new Thread(() -> {
for(int i=0;i<EVENT_COUNT;i++) {
log.info("publishWithProducer2 {}", i);
try {
multiProducerService.publishWithProducer2(String.valueOf(i));
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// 當前執行緒開始等待,前面的service.setCountDown方法已經告訴過service,
// 等消費到expectEventCount個訊息時,就執行countDownLatch.countDown方法
// 千萬注意,要呼叫await方法,而不是wait方法!
countDownLatch.await();
// 消費的事件總數應該等於釋出的事件數
assertEquals(expectEventCount, multiProducerService.eventCount());
}
- 測試結果如下,測試通過,符合預期:
C1、C2獨立消費,C3依賴C1和C2
- 邏輯圖如下:
- 實現程式碼如下,非常簡單,依賴關係用then即可實現:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;
@Service("scene5")
public class Scene5 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
disruptor
// C1、C2獨立消費
.handleEventsWith(c1, c2)
// C3依賴C1和C2
.then(c3);
}
}
- 單元測試程式碼:
@Autowired
@Qualifier("scene5")
Scene5 scene5;
@Test
public void testScene5 () throws InterruptedException {
log.info("start testScene5");
testConsumeModeService(scene5,
EVENT_COUNT,
// 三個獨立消費者,一共消費300個事件
EVENT_COUNT * 3);
}
- 為了節省篇幅,測試結果就不貼了,要注意的是,每個事件都一定是C1和C2先消費過,才會被C3消費到;
C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3
- 邏輯圖如下:
- 實現程式碼如下:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;
@Service("scene6")
public class Scene6 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
disruptor
// C1
.handleEventsWith(c1)
// C2和C3也獨立消費
.then(c2, c3)
// C4依賴C2和C3
.then(c4);
}
}
- 單元測試程式碼:
@Autowired
@Qualifier("scene6")
Scene6 scene6;
@Test
public void testScene6 () throws InterruptedException {
log.info("start testScene6");
testConsumeModeService(scene6,
EVENT_COUNT,
// 四個獨立消費者,一共消費400個事件
EVENT_COUNT * 4);
}
C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
- 邏輯圖如下:
- 實現程式碼如下:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;
@Service("scene7")
public class Scene7 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1和C2獨立消費
.handleEventsWith(c1, c2)
// C3和C4也是獨立消費,但C3和C4都依賴C1和C2
.then(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
}
- 單元測試程式碼:
@Autowired
@Qualifier("scene7")
Scene7 scene7;
@Test
public void testScene7 () throws InterruptedException {
log.info("start testScene7");
testConsumeModeService(scene7,
EVENT_COUNT,
// 五個獨立消費者,一共消費500個事件
EVENT_COUNT * 5);
}
C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
- 邏輯圖如下:
- 實現程式碼如下:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
/**
* @author will ([email protected])
* @version 1.0
* @description: C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
* @date 2021/5/23 11:05
*/
@Service("scene8")
public class Scene8 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c5 = new MailWorkHandler(eventCountPrinter);
disruptor
// C1和C2共同消費
.handleEventsWithWorkerPool(c1, c2)
// C3和C4也是獨立消費,但C3和C4都依賴C1和C2
.thenHandleEventsWithWorkerPool(c3, c4)
// 然後C5依賴C3和C4
.thenHandleEventsWithWorkerPool(c5);
}
}
- 單元測試程式碼:
@Autowired
@Qualifier("scene8")
Scene8 scene8;
@Test
public void testScene8 () throws InterruptedException {
log.info("start testScene8");
testConsumeModeService(scene8,
EVENT_COUNT,
// C1和C2共同消費,C3和C4共同消費,C5雖然只是一個,但也是共同消費模式,
// 也就是一共有三組消費者,所以一共消費300個事件
EVENT_COUNT * 3);
}
C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
- 邏輯圖如下:
- 實現程式碼如下:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
@Service("scene9")
public class Scene9 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1和C2共同消費
.handleEventsWithWorkerPool(c1, c2)
// C3和C4獨立消費,但C3和C4都依賴C1和C2
.then(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
}
- 單元測試程式碼:
@Autowired
@Qualifier("scene9")
Scene9 scene9;
@Test
public void testScene9 () throws InterruptedException {
log.info("start testScene9");
testConsumeModeService(scene9,
EVENT_COUNT,
// C1和C2共同消費(100個事件),
// C3和C4獨立消費(200個事件),
// C5獨立消費(100個事件),
// 所以一共消費400個事件
EVENT_COUNT * 4);
}
C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
- 邏輯圖如下:
- 實現程式碼如下:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
@Service("scene10")
public class Scene10 extends ConsumeModeService {
@Override
protected void disruptorOperate() {
MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
MailEventHandler c5 = new MailEventHandler(eventCountPrinter);
disruptor
// C1和C2共同消費
.handleEventsWith(c1, c2)
// C3和C4是共同消費,但C3和C4都依賴C1和C2
.thenHandleEventsWithWorkerPool(c3, c4)
// 然後C5依賴C3和C4
.then(c5);
}
}
- 單元測試程式碼:
@Test
public void testScene10 () throws InterruptedException {
log.info("start testScene10");
testConsumeModeService(scene10,
EVENT_COUNT,
// C1和C2獨立消費(200個事件),
// C3和C4共同消費(100個事件),
// C5獨立消費(100個事件),
// 所以一共消費400個事件
EVENT_COUNT * 4);
}
- 至此,一些常見場景的程式碼已完成,希望本文能給您一些參考,幫您更得心應手的用好這個優秀的工具;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程式設計師欣宸
微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...
https://github.com/zq2599/blog_demos