1. 程式人生 > >併發框架Disruptor場景應用

併發框架Disruptor場景應用

轉自 https://blog.csdn.net/qq_19558705/article/details/77247912

今天用一個停車場問題來加深對Disruptor的理解。一個有關汽車進入停車場的問題。當汽車進入停車場時,系統首先會記錄汽車資訊。同時也會發送訊息到其他系統處理相關業務,最後傳送簡訊通知車主收費開始。看了很多文章,裡面的程式碼都是大同小異的,可能程式碼真的是很經典。以下程式碼也是來源網路,只是自己手動敲的,加了一些註釋。

程式碼包含以下內容:
1) 事件物件Event
2)三個消費者Handler
3)一個生產者Processer
4)執行Main方法
Event類:汽車資訊

public class MyInParkingDataEvent {
 
	private String carLicense; // 車牌號
 
	public String getCarLicense() {
		return carLicense;
	}
 
	public void setCarLicense(String carLicense) {
		this.carLicense = carLicense;
	}
 
}

Handler類:一個負責儲存汽車資料,一個負責傳送kafka資訊到其他系統中,最後一個負責給車主發簡訊通知

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
 
/**
 * Handler 第一個消費者,負責儲存進場汽車的資訊
 *
 */
public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent> , WorkHandler<MyInParkingDataEvent>{
 
	@Override
	public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
		long threadId = Thread.currentThread().getId(); // 獲取當前執行緒id
		String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
		System.out.println(String.format("Thread Id %s 儲存 %s 到資料庫中 ....", threadId, carLicense));
	}
 
	@Override
	public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
			throws Exception {
		this.onEvent(myInParkingDataEvent);
	}
 
}

import com.lmax.disruptor.EventHandler;
 
/**
 * 第二個消費者,負責傳送通知告知工作人員(Kafka是一種高吞吐量的分散式釋出訂閱訊息系統)
 */
public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent>{
 
	@Override
	public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
			throws Exception {
		long threadId = Thread.currentThread().getId(); // 獲取當前執行緒id
		String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
		System.out.println(String.format("Thread Id %s 傳送 %s 進入停車場資訊給 kafka系統...", threadId, carLicense));
	}
 
}

import com.lmax.disruptor.EventHandler;
 
/**
 * 第三個消費者,sms簡訊服務,告知司機你已經進入停車場,計費開始。
 */
public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent>{
 
	@Override
	public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
			throws Exception {
		long threadId = Thread.currentThread().getId(); // 獲取當前執行緒id
		String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
		System.out.println(String.format("Thread Id %s 給  %s 的車主傳送一條簡訊,並告知他計費開始了 ....", threadId, carLicense));
	}
 
}

Producer類:負責上報停車資料

import java.util.concurrent.CountDownLatch;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
 
/**
 * 生產者,進入停車場的車輛
 */
public class MyInParkingDataEventPublisher implements Runnable{
	
	private CountDownLatch countDownLatch; // 用於監聽初始化操作,等初始化執行完畢後,通知主執行緒繼續工作
	private Disruptor<MyInParkingDataEvent> disruptor;
	private static final Integer NUM = 1; // 1,10,100,1000
	
	public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
			Disruptor<MyInParkingDataEvent> disruptor) {
		this.countDownLatch = countDownLatch;
		this.disruptor = disruptor;
	}
	
	@Override
	public void run() {
		MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
		try {
			for(int i = 0; i < NUM; i ++) {
				disruptor.publishEvent(eventTranslator);
				Thread.sleep(1000); // 假設一秒鐘進一輛車
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			countDownLatch.countDown(); // 執行完畢後通知 await()方法
			System.out.println(NUM + "輛車已經全部進入進入停車場!");
		}
	}
	
}
 
class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
 
	@Override
	public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
		this.generateData(myInParkingDataEvent);
	}
	
	private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
		myInParkingDataEvent.setCarLicense("車牌號: 鄂A-" + (int)(Math.random() * 100000)); // 隨機生成一個車牌號
		System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
		return myInParkingDataEvent;
	}
	
}

執行的Main方法:

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
 
/**
 * 執行的Main方法 ,
 * 一個生產者(汽車進入停車場);
 * 三個消費者(一個記錄汽車資訊,一個傳送訊息給系統,一個傳送訊息告知司機)
 * 前兩個消費者同步執行,都有結果了再執行第三個消費者
 */
public class MyInParkingDataEventMain {
	
	public static void main(String[] args) {
		long beginTime=System.currentTimeMillis();
		int bufferSize = 2048; // 2的N次方
		try {
			// 建立執行緒池,負責處理Disruptor的四個消費者
			ExecutorService executor = Executors.newFixedThreadPool(4);
			
			// 初始化一個 Disruptor
			Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
				@Override
				public MyInParkingDataEvent newInstance() {
					return new MyInParkingDataEvent(); // Event 初始化工廠
				}
			}, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
			
			// 使用disruptor建立消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
			EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
					new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
			
			// 當上面兩個消費者處理結束後在消耗 smsHandler
			MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
			handlerGroup.then(myParkingDataSmsHandler);
			
			// 啟動Disruptor
			disruptor.start();
			
			CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者執行緒準備好了就可以通知主執行緒繼續工作了
			// 生產者生成資料
			executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
			countDownLatch.await(); // 等待生產者結束
			
			disruptor.shutdown();
			executor.shutdown();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));
	}
	
}