1. 程式人生 > >transwarp Slipstream 簡介之高階功能

transwarp Slipstream 簡介之高階功能

1. 監控告警

Slipstream整合監控告警工具Alert4J,用於在流應用出問題的時候報錯,支援郵件推送,也可以與微信、其 他監控工具整合。
Alert4J當前版本沒有專門的配置介面,將在下個版本時支援。當前版本可以通過增加alert4j.properties文 件支援。下面是該檔案的一般配置內容,以郵件推送為例:

 alert4j.service=email
  email.server.host=smtp.exmail.qq.com
  email.server.port=465
  email.server.ssl=true
  email.validate=true
  email.sender
.username[email protected].io email.sender.password=test email.from.address[email protected].io email.to.addresses[email protected].io

2.複雜事件處理(CEP)

Slipstream 支援在流上做複雜事件處理(Complex Event Processing)。複雜事件的處理分為事件和操作兩 個部分,事件就是流中的資料,通常會在事件上加上過濾條件。操作即事件的邏輯順序及其生命週期的控 制,例如兩次取款操作發生在一個特定的時間間隔內且滿足第二次取款操作發生在第一次取款操作之後。我 們將這種通過操作組合在一起的一系列事件稱為複雜事件模式。
使用CEP需要在任務級別開啟NGMR_ENGINE_MODE為morphling,使引擎執行在事件驅動模式下。Slipstream 的CEP支援系統時間和事件時間兩種方式,如果需要使用事件事件,還需要在任務級別開啟事件時間的引數選 項。
CEP的基本語法如下:

SELECT EVENT1.[column], 1 EVNET2.[column],
......,
EVNETn.[column] FROM PATTERN 2
( EVENT1=[stream][condition] [FOLLOWEDBY | ,] 3
EVENT2= [stream][condition] [FOLLOWEDBY | ,] ....
EVENTn= [stream][condition] ) 4 WITHIN (time interval); 5
1 EVENT: 模式中定義的事件名,可以任意定義
2 PATTERN: 指定模式的關鍵字
3 表示事件之間的關係,FOLLOWEDBY表示只要事件B發生在事件A之後,那麼事件B也應該參與計算;如果此 處是“,”表示事件B必須是事件A之後發生的第一個事件。
4 condition:即該事件的發生條件,即SQL中的條件表示式。
5 WITHIN: 指定該次複雜事件處理的時間區間。

案例1(銀行)
例 14. 檢測盜刷行為
銀行需要在10分鐘之內檢測出當前某筆取款交易是否存在盜刷銀行卡的行為。
1. 建立輸入流

        CREATE APPLICATION cep_example;
        USE APPLICATION cep_example;
        SET streamsql.use.eventmode=true;
        CREATE STREAM transaction(
            location_id STRING, card_id STRING, behavior STRING
            )
        tblproperties(
            "topic"="transaction_t1",
            "kafka.zookeeper"="localhost:2188",
            "kafka.broker.list"="localhost:9098"
);
        CREATE TABLE exception_ret(
            location_id_1 STRING,
            location_id_2 STRING,
            behavior STRING,
            card_id STRING
);

2 建立規則啟動流任務

       INSERT INTO exception_ret
        SELECT e1.location_id, e1.card_id, e1.behavior,
               e2.location_id, e2.card_id, e2.behavior
        FROM PATTERN (
--同一張卡10分鐘內在兩個不同地點發生了取款行為,意味著有盜刷可能 e1=transaction[e1.behavior='withdraw']
FOLLOWEDBY
e2=transaction[
                e2.card_id = e1.card_id AND
                e2.behavior='withdraw' AND
                e2.location_id != e1.location_id]
        ) WITHIN ('10' minute);

案例2:工業機床
例 15. 工業機床異常警報
需要對工業機床以下兩種情況進行監測:
• Case1: 機械臂需要在1分鐘內對某節點溫度急劇變化的事件做出預警
• Case2: 機械臂需要在1分鐘內對某次裝置加工操作的異常位置資訊做出預警,可能是該機械臂存在 裝置故障。
1. 建立輸入流

  CREATE APPLICATION cep_example;
            USE APPLICATION cep_example;
            SET streamsql.use.eventmode=true;
            CREATE STREAM robotarm(armid STRING, temperature FLOAT)
            tblproperties(
                "topic"="arm_t1",
                "kafka.zookeeper"="localhost:2188",
                "kafka.broker.list"="localhost:9098"
            );
            CREATE TABLE tem_ret(armid STRING, temperature FLOAT);
            CREATE TABLE coords_ret(armid STRING, coords FLOAT);

2.建立規則啟動流任務

    INSERT INTO tem_ret
            SELECT e2.armid, e2.temperature
            FROM PATTERN(
-- 溫度在1分鐘內變化超過20度,給出預警 e1=robotarm[e1.temperature > 0 && < 80] FOLLOWEDBY e2=robotarm[e2.armid=e1.armid &&
                          e2.temperature - e1.temperature > 20]
            ) WITHIN ('1' minute);
            INSERT INTO coords_ret
            SELECT e2.armid, e2.x, e2.y
            FROM PATTERN(
-- 前後兩次位置座標變化大於閥值時,給出預警 e1=robotarm[x * x + y * y <= 1], e2=robotarm[
                    e2.armid=e1.armid &&
                    e2.x * e2.x + e2.y * e2.y - (e1.x * e1.x + e1.y * e1.y) >=0.3
                ]) WITHIN ('1' minute);

案例3:交通行業

例 16. 對交通車流量和套牌車進行預警監測
需要對車輛交通的以下兩種情況進行監測:
• Case1: 每分鐘統計一次卡口車流量,若10分鐘內某卡口的過車流量超過閥值,需要及時預警並反饋 現場進行交通疏導
• Case2: 10分鐘兩個跨地市的行政區域出現同一個車牌,有理由懷疑是套牌車,需要及時預警反饋。 1. 建立輸入流

      CREATE APPLICATION cep_example;
            USE APPLICATION cep_example;
            SET streamsql.use.eventmode=true;
            CREATE STREAM traffic(veh_id STRING, veh_type STRING, speed FLOAT, location_id STRING)
            tblproperties(
                "topic"="traffic_t1",
                "kafka.zookeeper"="localhost:2188",
                "kafka.broker.list"="localhost:9098"
            );
            CREATE TABLE traffic_flow_ret (location_id STRING, traffic_flow INT);
            CREATE TABLE traffic_susp(loc_id1 STRING, loc_id2 STRING, veh_id STRING);
  1. 建立規則啟動流任務
      CREATE STREAM traffic_flow AS
            SELECT location_id, count(*) as veh_flow
            FROM traffic STREAMWINDOW w1 as (length '1' minute slide '1' minute)
            GROUP BY location_id;
            INSERT INTO traffic_flow_ret
            SELECT e2.location_id, e2.veh_flow
            FROM PATTERN(
-- 10分鐘內某同一卡口車流量增幅超過60 e1=traffic_flow[e1.veh_flow > 0], e2=traffic_flow[
                    e2.veh_flow - e1.veh_flow > 60 AND
                    e2.location_id = e1.location_id]
            ) WITHIN ('10' minute);
            INSERT INTO traffic_susp
            SELECT e1.location_id, e2.location_id, e2.veh_id
            FROM PATTERN(
-- 車輛型別為A1類,10分鐘內不同地區出現同一個車牌號 e1=traffic[e1.veh_type="A1"] FOLLOWEDBY
e2=traffic[e2.veh_id = e1.veh_id AND e2.location_id != e1.location_id]
            ) WITHIN ('10' minute);

CEP的使用注意點:
1. 事件之間的分隔符有逗號(,)以及FOLLOWEDBY,兩者最主要的區別在於,逗號表示前後兩個事件一定是相 鄰的兩個事件,而FOLLOWEDBY則沒有這種要求。例如,(e1,e2)表示事件e2是事件e1之後的第一個事件; (e1 FOLLOWEDBY e2) 表示e2可以發生在e1之後的任意時刻(WITHIN的時間間隔內)。使用逗號分隔符的 時候,第一個事件在發生後會被丟棄,即PATTERN(e1,e2),當e3到達時,此時PATTERN變為(e2,e3)。 而FOLLOWEDBY不會丟棄事件。
2. 在實際場景中,往往需要針對每個使用者的事件流進行分析而不是全域性的事件,可以通過distribute by的 方式,按照指定的欄位(例如使用者ID)劃分到不同的資料流,並在這些不同的流上進行CEP的模式匹配分 析。例如,源資料流中包含欄位id,如果希望根據id來劃分流,並進行模式分析,那麼就需要對源資料 流做如下處理:

CREATE STREAM derived_stream
AS SELECT * FROM raw DISTRIBUTE BY id;
-- 對由id劃分的每個流進行模式分析
INSERT INTO ret SELECT e1.* FROM PATTERN(e1=derived_stream[e1.cnt > 0]) WITHIN ('1' minute);

高可用性

  1. 微批模式的流處理高可用性

• 開啟CheckPoint
1. 通過Transwarp Manager管理介面修改InceptorServer配置,新增”stream.driver.checkpoint.dir” 配置項,並將其設定為HDFS上的某個目錄。配置完畢後,重啟Inceptor。
2. 建立Application時需要指定Application的CheckPoint目錄,設定為HDFS上的某個目錄。

        CREATE APPLICATION app1 WITH APPPROPERTIES("application.checkpoint.dir"="/tmp/app1/")

• 啟動Standby InceptorServer
通過Transwarp Manager管理介面配置額外的InceptorServer,設定它的role為Standby。設定完畢後將其啟動。

事件驅動模式下的高可用性

(Slipstream事件驅動模式下的高可用是在Slipstream流任務級別,即當一個流任務異常退出,或者 是Slipstream Server異常終止恢復後,流任務的計算結果還能夠保證與正常情況下的計算結果一致。該過程 對於使用者是透明的,使用者一般只關心流任務的計算結果。

兩種模式(standalone/zookeeper)
Slipstream當前支援兩種模式的高可用性的配置,其中Standalone模式能夠保證計算結果At-Least-Once,而 要想保證Exactly-once,則需要使用Zookeeper模式。Slipstream流任務高可用性必須在事件驅動模式下,即 任務中要設定streamsql.use.eventmode=true。使用Slipstream HA時必須通過使用建立任務(CREATE STREAMJOB)的方式來定義流任務,所對應的任務級別的引數則是指定在jobproperties屬性中。

Standalone 模式
Standalone是開啟Slipstream HA的預設模式。在該模式下,任務的元資訊儲存在Driver端的記憶體中,因此該 模式只能支援在Driver不重啟的前提下流任務的HA,當Driver重啟時,對應的資訊會丟失。在Standalone模 式下只能保證資料At-Least-Once的語義而無法保證Exactly-Once語義。
Standalone模式下需要在Transwarp Manager上配置以下引數:

1. spark.morphling.recovery.mode
HA的模式
2. spark.morphling.taskstate.backend
Task狀態資訊的儲存系統
3. spark.morphling.taskstate.checkpoint.directory
HDFS上儲存的任務狀態資訊

預設情況下Slipstream為使用者生成這些引數的預設值,可以通過Transwarp Manager介面的設定介面檢視,因 此預設情況下無需做其他的配置即可啟動叢集,如果希望修改HDFS的目錄,則需要在修改這些配置以後重 啟Slipstream。

例 17. Standalone模式下HA的使用示例
//1. 定義輸入流
        create stream plainetl_checkpoint(classid string, content string)
        tblproperties("topic"="plainetlha","kafka.zookeeper"="NodeA:2188","kafka.broker.list"="Node
        A:9098");
        create table plainTable(classid string, name string);
//2. 建立流任務
        create streamjob etl1 as ("insert into plainTable select * from plainetl_checkpoint")
        jobproperties(
"streamsql.use.eventmode"="true", "morphling.job.checkpoint.interval"="5000", morphling.job.enable.checkpoint=true, --定義該任務是否啟用HA morphling.task.max.failures=3); --任務失敗重試次數
//3. 啟動流任務
        start streamjob etl1;
//現在已經成功的啟動了一個名為etl1的流任務,若在任務執行的過程中出現異常例如任務被異常kill, 該任務會重新排程上線,重試次數為3。在Standalone模式下,一旦任務出現重新排程的情況,使用者可 能觀察到計算結果的值出現重複,這是由於Standalone模式下的HA只能保證資料至少被輸出一次,而無 法保證資料不重複輸出。如果使用者希望得到更精確的計算結果,Slipstream推薦使用者使用Zookeeper模 式。

Zookeeper 模式
Zookeeper模式是Slipstream所推薦使用者使用的模式,該模式下的任務元資訊儲存在Zookeeper上,此外該模 式下還會儲存某個任務每次已經完成的checkpoint資訊到HDFS上。這樣就保證即使整個Slipstream叢集發生 異常退出重啟恢復的時候,流任務還能保證計算的準確性。Zookeeper模式下的流任務可以保證資料Exactly- once的語義。
WAL-sink是一個可以保證資料結果只被寫一次的功能, 通過開啟WAL-sink功能使得流任務即使發生異常退 出,自動重啟提交後,資料結果也不會出現重複,即Exactly-Once。要使用WAL-sink需要外接資料庫的配 置,當前支援的資料庫有VoltDB以及MySQL。Slipstream預設配置的資料庫為MySQL。
Zookeeper模式需要使用者在Transwarp Manager介面上配置spark.morphling.recovery.mode為zookeeper,並指定在HDFS上儲存完整checkpoint資訊的目錄。Slipstream預設為使用者生成一個目錄。可以通過Transwarp Manager介面的配置項檢視該目錄的值。
注意:Zookeeper上的目錄以及HDFS的目錄為Slipstream啟動時生成的,若不慎刪除可能導致開啟checkpoint 的流任務無法正常啟動,此時需要手動建立該目錄並付給響應的寫許可權。

例 18. Zookeeper模式下HA的使用示例
該例將資料寫入到Hyperbase表中:
// 1. 建立輸入流
        create stream raw(classid string, content string)
        tblproperties("topic"="raw_t1","kafka.zookeeper"="NodeA:2188","kafka.broker.list"="NodeA:90
        98");
//2. 建立Hyperbase外部表
create external table hbase_ret(k string, v string)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping"=":key,content:value") tblproperties("hbase.table.name"="haret"); -- haret 需在{hyperbase}中事先建立
//3. 建立流任務
create streamjob zkmode as ("insert into hbase_ret select * from joinA") jobproperties(streamsql.use.eventmode=true,
morphling.task.max.failures=5,
 --任務失敗重試次數 "morphling.job.checkpoint.interval"="5000", "morphling.job.enable.checkpoint"="true", "morphling.job.enable.wal.sink"="true", 
--開啟WAL Sink功能 "morphling.wal.sink.committer.type"="mysql",
 --開啟WAL Sink的meta資訊儲存方式 "morphling.wal.sink.voltdb.jdbc.url"="jdbc:mysql://localhost/hainfo"); --WAL
//Sink所配置的資料庫的JDBC URL 4. 啟動流任務
        start streamjob zkmode;
//至此已經完成啟動一個Zookeeper模式下支援HA的流任務,在Slipstream叢集狀態正常的情況下,如果 任務遇到異常退出恢復後,可以保證資料計算的完整準確,就好像任務並沒有發生異常一樣。 當Slipstream叢集出現異常需要重啟,在重啟之後需要使用者重新提交這些流任務,一旦使用者提交流任務 後,任務會從上一次失敗的地方重新開始計算出正確的結果。再次說明的是,Slipstream的HA的功能指 的的任務級別的高可用性,即保證一個流任務的高可用,而非指的是一個Slipstream叢集級別的高可 用。

相關引數配置

Slipstream HA的配置引數分為server級別和任務級別,其中server級別的引數應該在啟動Slipstream Server時配置好,任務級別的引數是在建立流任務的時候指定給流任務屬性的引數。server級別的引數 中,Zookeeper模式對應的引數只需要在使用Zookeeper模式的時候配置即可,其他引數都是必須配置的參 數。

At-Least-Once支援

在開啟CheckPoint之後,使用者只需要在Application中設定Kafka的WAL,即可實現At-Least-Once。

  ALTER APPLICATION app1 SET APPPROPERTIES("application.enable.wal"="true")

自定義引數設定

  • Batch duration

Slipstream處理每個批次資料的間隔時間,通過以下方式設定:
系統時間模式

 CREATE APPLICATION app1 WITH APPPROPERTIES("stream.batch.duration.ms"="2000")
//事件時間模式下可用 TBLPROPERTIES 中的 "batchduration" 對每個stream設定。
  • Kafka receiver個數
Kafka receiver個數的多少會影響接收資料的併發度,通過以下方式設定在StreamJob level:
  ALTER STREAMJOB s1 SET JOBPROPERTIES("stream.number.receivers"="4")
  • Holodesk視窗設定

Holodesk是星環特有的基於記憶體的列式儲存表,專用於高速資料分析業務。Slipstream支援往Holodesk的數 據插入,由於資料持續不斷的插入可能會將記憶體耗盡,可在建立Holodesk表時設定以下引數調節Holodesk的 最大視窗大小:

 CREATE TABLE holo(id INT, name STRING) STORED AS HOLODESK TBLPROPERTIES("holodesk.window.length"
  ="60000","holodesk.window.timeout"="10000");
//"holodesk.window.length"用來限制最大視窗長度。"holodesk.window.timeout"是最大超時。兩個的單位都 是毫秒。例如,假設將"holodesk.window.length"設定為60000, holodesk.window.timeout設定為10000。 那麼holedesk建立後,向其插入資料時,會儲存60秒(視窗長度)內的資料,從第61秒開始,以後的資料被 標記為刪除,在第71秒時被真正刪除(超時長度為10秒)。

開啟流上的PLSQL

  SET stream.enabled= true;
  SET stream.tables=default.abc,default.efg;
使用者需要顯式地開啟Stream,才能執行流上的Slipstream。此外,在當前版本中,使用者需要顯式地指定哪些 表是作為Stream存在。
開啟PLSQL的編譯優化
SET plsql.optimize.dml.precompile=true; PLSQL解析執行,因此需要通過這個引數將編譯快取,可大大提升執行效率。

巢狀SELECT需要加 Ad-hoc的hint

這是因為在Slipstream中禁用了普通的SELECT,在PLSQL中這些SELECT可能巢狀於遊標中,在當前版本中如果 不加Ad-hoc可能出現誤判。
簡單函式示例


  set stream.enabled=true;
  set stream.tables=testa;
  create stream testa(id string, a string) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
  create table testb(id string, a string);
  create or replace procedure simple_test()
  is
  begin
    insert into testb select * from testa
  end;
begin
    simple_test()
  end;

遊標示例

set stream.enabled=true;
  set stream.tables=t1;
  create stream t1(id int, value int) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
  create table t2(id int,value int);
  declare
   v_id int
   v_value int
   cursor cur(cur_arg int)
is
select * from (select/*+ adhoc*/ * from t1 where id < cur_arg) order by id
  begin
   open cur(5)
   loop
   fetch cur into v_id, v_value
   exit when cur%notfound
   end loop
end;