SODBASE CEP學習進階篇(六):實現反壓和流限速
前面文章介紹過流資料輸入速率要和處理能力相匹配,短時資料爆發由內部緩衝佇列來緩衝。如果確實存在某個時間點持續資料爆發,可以考慮採取反壓限流的方法。
1. 示例操作步驟
(1)下載SODBASE Studio2.0.22(sp1)以上版本,解壓,開啟configuration/global.properties,將引擎的緩衝佇列長度設的較小,作以下配置
maxqueuelength=100
warnqueuelength=20
(2)下載示例EPL模型backpress01.sod、backpress02.sod
backpress01功能:CPU滿負荷不停的產生模擬資料,列印到螢幕。因為嚴格地講,螢幕列印的速度也是跟不上CPU滿負荷產生資料速度的。因此容易達到佇列報警值20。
backpress02功能:將system.sys_warn的報警事件反接到backpress01的控制Socket埠上。
(3)開啟SODBASE Studio,匯入backpress01.sod、backpress02.sod
先測試執行backpress02,再將backpress01也測試執行起來。
(4)結果輸出
可以看到如下的輸出結果,並且發現隔一段時間,資料就慢下來,這就是反壓限速的原因。
2. 工作原理
2.1可控制輸入介面卡
上面示例能夠實現反壓限速,是因為編寫backpress01的輸入介面卡時,繼承了SODBASE CEP一類可控制輸入介面卡。這類可控制輸入介面卡,通過Socket監聽控制事件,並在接收到控制事件後呼叫回撥函式callback(PrimitiveEvent e),在回撥函式中可以讓輸入流sleep一段時間。
讀者要實現同樣的功能,只需整合實現com.sodbase.inputadaptor.controllable.ControllableInputAdaptorI類。前3個引數預設為資料流名、控制監聽埠、控制監聽埠超時時間(ms)。讀者要新增引數,可以從第4個引數開始新增。示例中的輸入介面卡程式碼如下
public class ControllableTestInputAdaptor extends ControllableInputAdaptorI { private boolean running=true; private long suspendtime=0; @Override public void setUp() { //必須呼叫super.setUp()啟動控制監聽埠 super.setUp(); } @Override public void callback(PrimitiveEvent primitiveEvent) { if(primitiveEvent.getAttributeValueType("cause").getValue().equals(Constants.causecode1)) suspendtime=5000; } @Override public boolean isRunning() { return running; } @Override public void stopInputStream() { //必須呼叫super.stopInputStream()關閉控制監聽埠 super.stopInputStream(); this.running=false; } public void run() { int count = 1; while (running) { try { if(suspendtime>0) { Thread.sleep(suspendtime); suspendtime=0; } } catch (InterruptedException e) { e.printStackTrace(); } PrimitiveEvent primitiveEvent = new PrimitiveEvent(); primitiveEvent.getAttributeMap().put("id", new ValueType(String.valueOf(count),"string")); count++; Date d = new Date(); long time = d.getTime(); primitiveEvent.setStart_ts(time); primitiveEvent.setEnd_ts(time); this.putEventToStream(primitiveEvent); } } }
2.2 系統報警事件
SODBASE CEP允許通過warnqueuelength配置緩衝佇列長度的報警長度,可針對報警採取一些運維管理措施。如果不配置,預設為最大緩衝佇列長度maxqueuelength的80%。
system.sys_warn和system.sys_error是系統內建報警流,通常會含3個欄位
(1)cause:報警和報錯的原因
(2)queryname:引起報警和報錯的EPL語句名稱
(3)message:訊息提示
如cause=’warnqueuelengthexceeded’時,即超過了緩衝佇列的報警閾值。
2.3 注意事項
(1)多使用者環境下,報警流queryname欄位會加字首"使用者名稱."。
(2)設計系統時,不要過度依賴反壓限速功能,因為反壓限速會增加輸入端負載,也會給系統帶來新的問題。正確方式是在系統架構初期,採用模擬資料和最大輸入速率配置好緩衝區大小,設計好處理方式並留出餘量,讓處理能力和輸入速率相匹配。
SODBASE CEP用於輕鬆、高效實施資料監測、監控類、實時交易類專案快取擴充套件參見。