1. 程式人生 > >SODBASE CEP學習(六):流式計算中的儲存和ETL

SODBASE CEP學習(六):流式計算中的儲存和ETL

許多流式計算應用離不開儲存,也就是把資料存在硬碟上,例如歷史資料的儲存。畢竟硬碟適合長期地儲存大量資料。在介紹具體方法之前,先講一個實際專案裡經常要用到的原則,那就是:儲存讀寫速度要和記憶體計算速度匹配。

怎麼理解呢?例如記憶體計算速度是10萬events/s,儲存讀寫速度是1萬events/s,那麼很容易引起資料不能及時寫到儲存中,而導致OOM等各種問題。還有,硬碟讀寫的速度、網路傳輸的速度經常受到外部因素的干擾,通常沒有記憶體計算速度穩定,這點也要考慮到。

所以流式計算中使用儲存,通常

(1)要設計快取佇列,CEP引擎中一般有自帶的快取佇列,實現IO和計算的非同步。當然,SODBASE CEP中進一步增強了避免OOM和預警的機制。

(2)儘可能的將記憶體計算和儲存操作分離。原因:

一來是為了更好地使用CEP引擎自帶的快取佇列。

二來是為了更好的管理。這樣,在儲存操作中可以更方便地使用批量寫入、通過計算減少寫入資料量,過濾掉不需要寫入的資料,從而提高儲存讀寫效率。

使用儲存的方法一般有兩類

第一類:在EPL中直接用。例如,在EPL中用Java函式,而Java函式又負責儲存讀寫。

第二類:使用輸入輸出介面卡。

本文主要介紹使用輸入輸出介面卡。最後介紹一下,為什麼輸入介面卡、CEP引擎、輸出介面卡恰好也能作很好地應用在Extract Transformation Load (ETL) 操作中。

1. 關係型資料庫輸入輸出介面卡

Mysql、Oracle、Sql server、postgresdb等都是關係型資料庫。不要小瞧關係資料庫,大部分資料儲存的企業應用可以用關係資料庫解決。Mysql叢集可以支撐億級使用者的特大型的網際網路應用。以mysql為例,常用的3個輸出介面卡:

(1)資料庫查詢

com.sodbase.outputadaptor.database.MysqlQueryAdaptor

作用:對於每個結果事件,查詢mysql資料庫,並將查詢的結果插入到新的流中

  0: stream name 查詢的結果插入到新的流的名稱
  1: database name
  2: user name
  3: password
  4: host name
  5: port
  6: sql  查詢sql,允許使用?{...}變數,通過輸出的事件屬性值,如:?{name}
  7: columnnames  e.g.   "name:string,age:double,ishappy:boolean" Three types "string,double,boolean" are supported

(2)資料庫更新

com.sodbase.outputadaptor.mysql.MysqlSqlExecutionAdaptor

作用:對於每個結果事件,執行sql語句,如DML、資料更新等
引數:

   0     databasename 
   1     dbusername 
   2     dbpassword 
   3     hostname 
   4     dbport 
   5    sql 

        inputStreamConnected = params[6];//sql執行結束後,發事件給流inputStreamConnected,這個事件只有時間戳屬性

(3)資料庫備份

com.sodbase.outputadaptor.mysql.MysqlBackupOutputAdaptor

作用:鎖表執行sql語句,類似於上面的adatpor,通常用於執行備份語句。

   0     databasename 
   1     dbusername
   2     dbpassword 
   3     hostname 
   4     dbport
   5     sql
   6     tablename //用於鎖定所需備份的表
   7     inputStreamConnected //sql執行結束後,發事件給流inputStreamConnected,這個事件只有時間戳屬性

其它Oracle、Sql server、postgresdb介面卡類似,也都支援。

另外,還有一個輸入介面卡com.sodbase.inputadaptor.database.MySqlInputAdaptor,EPL啟動時查詢資料庫的資料插入到流中。

應用場景,例如:定時、超時任務防止系統宕機後丟失,將沒有執行過的任務存在資料庫中,重啟伺服器時,就會把這些任務再加到事件流,也就是任務佇列裡。

2. Nosql介面卡和分散式快取介面卡

支援cassandra、hbase、monogodb, 巨杉等nosql資料庫,支援redis等分散式快取資料庫,使用者也可以方便地自定義介面卡。

3. 資料庫分批寫入

如果大家還記得kleen closure操作符,用它可以方便地做資料庫分批寫入,解決儲存讀寫瓶頸問題

CREATE QUERY tensecondsdata
SELECT tostring(T2.price) AS pricebatch, tostring(T2.name) AS namebatch 
FROM T1:timer,T2:stock,T3:timer 
PATTERN T1;T2^+;T3  
WHERE T1._start_time_=T3._start_time_-10000  
WITHIN 10000

這個EPL將資料分成10s為一批。timer是定時輸入介面卡,週期10s。如果要精確,可以用兩個定時輸入timer1和timer2,週期都是10s,但起點相差1個單位時間(一般單位是ms)。這是因為模式中T1._end_time_<T2._end_time_<T3._end_time_是一個開集合,我們用兩個timer1,可以做成一個半開半閉集合,不漏掉恰好在timer時間點上的資料。

CREATE QUERY tensecondsdata
SELECT tostring(T2.price) AS pricebatch, tostring(T2.name) AS namebatch 
FROM T1:timer1,T2:stock,T3:timer2
PATTERN T1;T2^+;T3  
WHERE T1._start_time_=T3._start_time_-10001 
WITHIN 10001

4. 應用示例

4.1 電壓監測

本節將介紹實時測量監測系統中電壓監測資料的監測、儲存示例。類似原理可以適用到發電資料、裝置狀態、水情、空氣、水情、水質、汙染源等監測應用中。資料儲存也可以推廣到各類關係資料庫、Nosql資料庫。

4.1.1儲存實時測量資料

注意這裡是為了讓大家更好了解細節,所以用給出EPL和XML。建議大家用SODBASE Studio建模。

CREATE QUERY VD0002 
SELECT T1.lineid AS lineid,T1.voltagevalue AS voltagevalue 
FROM T1:VD0000_output 
PATTERN T1 
WITHIN 0
VD0000_output是電壓測量資料流
    <outputAdaptors>
        <isOutputAsSelection>true</isOutputAsSelection>
        <outputAdaptorClassName>com.sodbase.outputadaptor.mysql.MysqlSqlExecutionAdaptor</outputAdaptorClassName>
        <adaptorParams>voltage</adaptorParams>
        <adaptorParams>user</adaptorParams>
        <adaptorParams>password</adaptorParams>
        <adaptorParams>192.168.1.3</adaptorParams>
        <adaptorParams>3306</adaptorParams>
        <adaptorParams>insert into historicalvoltage(lineid,voltagevalue,timestamp) values('?{lineid}',?{voltagevalue},'?{_end_time_}')</adaptorParams>
        <adaptorParams>operationcompletestream</adaptorParams>
        <isExternal>false</isExternal>
        <queryName>VD0002</queryName>
    </outputAdaptors>
?{}是在sql語句中使用事件欄位的值

4.1.2定時備份

有時使用者需要將歷史資料進行備份

CREATE QUERY VD0003 
SELECT JAVA:com.example.voltage.Voltage:getDate() AS date, JAVA:com.example.voltage.Voltage:getDayStarttime() AS starttime, JAVA:com.example.voltage.Voltage:getDayEndtime() AS endtime 
FROM T1:timer 
PATTERN T1 
WITHIN 0
timer資料流是有定時觸發輸入介面卡生成的,用法見前面介紹EPL的文章
    <outputAdaptors>
        <isOutputAsSelection>true</isOutputAsSelection>
        <outputAdaptorClassName>com.sodbase.outputadaptor.mysql.MysqlBackupOutputAdaptor</outputAdaptorClassName>
        <adaptorParams>voltage</adaptorParams>
        <adaptorParams>username</adaptorParams>
        <adaptorParams>password</adaptorParams>
        <adaptorParams>192.168.1.3</adaptorParams>
        <adaptorParams>3306</adaptorParams>
        <adaptorParams>select * into outfile 'D:/?{date}.txt' from historicalvoltage where timestamp>=?{starttime} and timestamp<?{endtime}</adaptorParams>
        <adaptorParams>historicalvoltage</adaptorParams>
        <adaptorParams>endbackupmessage</adaptorParams>
        <isExternal>false</isExternal>
        <queryName>VD0003</queryName>
    </outputAdaptors>

4.1.3 做每日統計

假設每天定時備份完,需要做日統計

CREATE QUERY VD0003_2 
SELECT JAVA:com.example.voltage.Voltage:getYesterdayDate() AS date 
FROM T1:endbackupmessage 
PATTERN T1 
WITHIN 0

輸出介面卡

    <outputAdaptors>
        <isOutputAsSelection>true</isOutputAsSelection>
        <outputAdaptorClassName>com.sodbase.outputadaptor.mysql.MysqlSqlExecutionAdaptor</outputAdaptorClassName>
        <adaptorParams>voltage</adaptorParams>
        <adaptorParams>user</adaptorParams>
        <adaptorParams>password</adaptorParams>
        <adaptorParams>192.168.1.3</adaptorParams>
        <adaptorParams>3306</adaptorParams>
        <adaptorParams>insert into daystatistic(lineid,date,upbound,bottombound) select lineid as lineid,?{date} as date, max(voltagevalue) as upbound,min(voltagevalue) as bottombound from historicalvoltage group by lineid</adaptorParams>
        <adaptorParams>endstatisticmessage</adaptorParams>
        <isExternal>false</isExternal>
        <queryName>VD0003_2</queryName>
    </outputAdaptors>

4.2 船舶活動區域監測

本節介紹GIS實時監測領域的應用示例,即監測船舶是否偏離了規定航線或規定活動區域。原理也可以應用到其他GIS實時監測應用中。示例場景為作業的船隻通常有一定的活動範圍,如果船隻出現在異常的海域,應該向海岸管理人員預警提示。

具體EPL就不給出了,參考附件中的“時間處理拓撲圖”。

filter1負責當船舶實時位置上報時,查詢正常區域模型資料庫。

filter2負責根據船舶實時位置更新正常區域模型,即此船隻正常活動的區域(四個座標點形成的矩形區域)。

ARA負責判斷船舶是否偏離了正常航行區域,並進行預警。

整個拓撲圖中的事件驅動架構則是由級聯輸入輸出介面卡完成的。級聯輸入介面卡com.sodbase.inputadaptor.StubInputAdaptor級聯輸出介面卡com.sodbase.outputadaptor.connection.ConnectToSodInputOutputAdaptor前文有介紹。

4.3 ETL、資料庫監測、檔案監測

ETL說通俗一些,其實都是是定時監測資料庫(關係型或nosql資料庫),然後根據EPL規則將資料進行轉化。結果輸出到另一個數據庫中,一般是資料倉庫中,那就是ETL。

如果結果輸出到實時圖表顯示,那就是一種資料監測。有很多場合,資料庫作為了中間媒介來觀察資料的變化,也是一種資料監控的方法。

4.3.1 ETL示例

例:在一些專案中,需要把資料放到關係型資料庫更好地做OLAP分析,如將資料從Nosql資料庫同步到關係型資料庫叢集


例:資料質量管理、ETL、經營分析系統中將兩個資料庫表中的資料匯入到DataWarehouse,統一gender欄位的編碼。

CREATE QUERY DQ0002 
SELECT * 
FROM T1:mysql資料輸入,T2:mysql資料輸入2 
PATTERN T1|T2 
WITHIN 0 
DQ0002的輸出介面卡用 級聯節點(no watermark),輸出到流DQ0002_output
mysql資料輸入、mysql資料輸入2用的是mysql輸入介面卡。
CREATE QUERY DQ0002_output 
SELECT T1.name AS name,JAVA:dq.Code:standardizeGender(T1.gender) AS gender,T1.age AS age 
FROM T1:DQ0002_output 
PATTERN T1 
WITHIN 0 
DQ0002_output可以接資料儲存寫入的介面卡。

4.3.2 資料庫、檔案監測

有些情況,也會定時監測是其他資料來源,比如檔案、資料夾、各型別URI,和監測資料庫資料的原理是一樣的,在經營分析系統、資料質量管理等專案中也經常會用到。

SODBASE CEP用於輕鬆、高效實施資料監測、監控類專案微笑。EPL語法見SODSQL寫法與示例。圖形化建模請使用SODBASE Studio

使用SODBASE產品的程式設計師現在可領禮品啦