1. 程式人生 > >SODBASE CEP學習進階篇(六):實現反壓和流限速

SODBASE CEP學習進階篇(六):實現反壓和流限速

前面文章介紹過流資料輸入速率要和處理能力相匹配,短時資料爆發由內部緩衝佇列來緩衝。如果確實存在某個時間點持續資料爆發,可以考慮採取反壓限流的方法。

1. 示例操作步驟

(1)下載SODBASE Studio2.0.22(sp1)以上版本,解壓,開啟configuration/global.properties,將引擎的緩衝佇列長度設的較小,作以下配置

maxqueuelength=100
warnqueuelength=20

(2)下載示例EPL模型backpress01.sod、backpress02.sod

backpress01功能:CPU滿負荷不停的產生模擬資料,列印到螢幕。因為嚴格地講,螢幕列印的速度也是跟不上CPU滿負荷產生資料速度的。因此容易達到佇列報警值20。

backpress02功能:將system.sys_warn的報警事件反接到backpress01的控制Socket埠上。

(3)開啟SODBASE Studio,匯入backpress01.sod、backpress02.sod

先測試執行backpress02,再將backpress01也測試執行起來。

(4)結果輸出

可以看到如下的輸出結果,並且發現隔一段時間,資料就慢下來,這就是反壓限速的原因。


2. 工作原理

2.1可控制輸入介面卡

上面示例能夠實現反壓限速,是因為編寫backpress01的輸入介面卡時,繼承了SODBASE CEP一類可控制輸入介面卡。這類可控制輸入介面卡,通過Socket監聽控制事件,並在接收到控制事件後呼叫回撥函式callback(PrimitiveEvent e),在回撥函式中可以讓輸入流sleep一段時間。

讀者要實現同樣的功能,只需整合實現com.sodbase.inputadaptor.controllable.ControllableInputAdaptorI類。前3個引數預設為資料流名、控制監聽埠、控制監聽埠超時時間(ms)。讀者要新增引數,可以從第4個引數開始新增。示例中的輸入介面卡程式碼如下

public class ControllableTestInputAdaptor extends ControllableInputAdaptorI
{
	private boolean running=true;
	private long suspendtime=0;
	@Override
	public void setUp()
	{
		//必須呼叫super.setUp()啟動控制監聽埠
		super.setUp();
	}
	
	@Override
	public void callback(PrimitiveEvent primitiveEvent)
	{
		if(primitiveEvent.getAttributeValueType("cause").getValue().equals(Constants.causecode1))
			suspendtime=5000;
	}

	@Override
	public boolean isRunning()
	{
		return running;
	}
	
	@Override
	public void stopInputStream()
	{
		//必須呼叫super.stopInputStream()關閉控制監聽埠
		super.stopInputStream();
		this.running=false;
	}
	public void run()
	{
		int count = 1;
		while (running)
		{
			try
			{
				if(suspendtime>0)
				{
					Thread.sleep(suspendtime);
					suspendtime=0;
				}
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			PrimitiveEvent primitiveEvent = new PrimitiveEvent();
			primitiveEvent.getAttributeMap().put("id", new ValueType(String.valueOf(count),"string"));
			count++;
			Date d = new Date();
			long time = d.getTime();
			primitiveEvent.setStart_ts(time);
			primitiveEvent.setEnd_ts(time);
			this.putEventToStream(primitiveEvent);
		}
	}

}

2.2 系統報警事件

SODBASE CEP允許通過warnqueuelength配置緩衝佇列長度的報警長度,可針對報警採取一些運維管理措施。如果不配置,預設為最大緩衝佇列長度maxqueuelength的80%。

system.sys_warn和system.sys_error是系統內建報警流,通常會含3個欄位

(1)cause:報警和報錯的原因

(2)queryname:引起報警和報錯的EPL語句名稱

(3)message:訊息提示

如cause=’warnqueuelengthexceeded’時,即超過了緩衝佇列的報警閾值。


2.3 注意事項

(1)多使用者環境下,報警流queryname欄位會加字首"使用者名稱."。

(2)設計系統時,不要過度依賴反壓限速功能,因為反壓限速會增加輸入端負載,也會給系統帶來新的問題。正確方式是在系統架構初期,採用模擬資料和最大輸入速率配置好緩衝區大小,設計好處理方式並留出餘量,讓處理能力和輸入速率相匹配。

SODBASE CEP用於輕鬆、高效實施資料監測、監控類、實時交易類專案微笑快取擴充套件參見。