1. 程式人生 > >transwarp Slipstream 簡介之實戰應用

transwarp Slipstream 簡介之實戰應用

Application間的資源隔離

Exg:使用者在基於某個App下的Stream時,只能在改App下檢視該App下的StreamJob;當用戶退出該App時,將無法檢視其他App下的StreamJob只能檢視當前App下的StreamJob。

流上的統計

Emily接到了老闆的第一個任務:如何實現對網站訪問次數做統計。假設源資料如下:
27.0.1.125,www.transwarp.io/home.html,2016-8-14 20:12:31.132
54.231.66.16,www.transwarp.io/product.html,2016-8-14 20:43:31.213
72.21.203.5,www.transwarp.io/case.html,2016-8-14 20:45:31.132
207.241.224.2,www.transwarp.io/product.html,2016-8-14 20:46:15.540
12.129.206.133,www.transwarp.io/product.html,2016-8-14 20:47:21.332
208.111.148.7,www.transwarp.io/home.html,2016-8-14 20:50:31.876

  • Emily建立一個流和一個表:
 CREATE STREAM accesslog(ip STRING, url STRING, time TIMESTAMP) ROW FORMAT DELIMITED FIELDS
  TERMINATED BY ',' TBLPROPERTIES("topic"="accesslog","kafka.zookeeper"="172.16.1.128:2181"
  ,"kafka.broker.list"="172.16.1.128:9092");
按系統時間統計每10秒各個 url 有多少訪問量:
  CREATE TABLE result(url STRING, count
INT);
CREATE STREAM waccesslog AS SELECT * FROM accesslog STREAMWINDOW (LENGTH '10' SECOND SLIDE '10' SECOND); INSERT INTO result SELECT url, COUNT(*) FROM waccesslog GROUP BY url;
  • 按訊息時間統計每10秒各個 url 有多少訪問量:
 CREATE TABLE result2(url STRING, count INT);
  CREATE STREAM accesslog2(ip STRING, url STRING, time
TIMESTAMP) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' TBLPROPERTIES ("topic"="accesslog", "kafka.zookeeper"="172.16.1.128:2181","kafka.broker.list" ="172.16.1.128:9092","timefield"="time", "timeformat"="yyyy-MM-dd HH-mm-ss.SSS");
CREATE STREAM waccesslog2 AS SELECT * FROM accesslog2 STREAMWINWDOW sw AS (SEPARATED BY time LENGTH '10' SECOND SLIDE '10' SECOND); INSERT INTO result2 SELECT url, COUNT(*) FROM waccesslog2 GROUP BY url;
  • 按時間欄位切分的多流關聯
    由於基於Slipstream實現比較快,Emily接到新的任務將原有sql遷移,需要將兩個流和一個維度表按照資料 欄位時間做切分。現有期貨價格和現貨價格兩個流, 分別有ID, 時間, 和價格三個欄位,例如:
    qihuo
    2016-8-14 20:50:00,1,50.20
    2016-8-14 20:50:00,2,65.40
    2016-8-14 20:50:00,3,31.30
    2016-8-14 20:50:01,1,50.80
    2016-8-14 20:50:01,2,65.10
    2016-8-14 20:50:01,3,29.90

    xianhuo
    2016-8-14 20:50:00,1,55.10
    2016-8-14 20:50:00,2,67.20
    2016-8-14 20:50:00,3,33.10
    2016-8-14 20:50:01,1,55.20
    2016-8-14 20:50:01,2,66.70
    2016-8-14 20:50:01,3,30.30
    Emily先建立兩個流:
  CREATE STREAM qihuo2(timestamp STRING, id STRING, price DOUBLE) ROW FORMAT DELIMITED FIELDS
  TERMINATED BY ',' TBLPROPERTIES ('topic'='qihuo', 'timefield'='timestamp', 'timeformat'='yyyy-MM-dd
  HH-mm-ss', "kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="172.16.1.128:9092");
  CREATE STREAM xianhuo2(timestamp STRING, id STRING, price DOUBLE) ROW FORMAT DELIMITED FIELDS
  TERMINATED BY ',' TBLPROPERTIES ('topic'='xianhuo', 'timefield'='timestamp', 'timeformat'='yyyy-MM-
  dd HH-mm-ss', "kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="172.16.1.128:9092");

按訊息時間對兩個流進行關聯, 並求現貨於期貨價格之差, 轉化了 (時間, ID, 現貨期貨之差) 為欄位的數 據流:

  CREATE STREAM transform_stream2 AS
    SELECT
      qihuo.timestamp,
      qihuo.id,
      (xianhuo.price - qihuo.price) AS diff
    FROM
      qihuo2 qihuo JOIN xianhuo2 xianhuo
        ON
        qihuo.timestamp=xianhuo.timestamp
        AND
        qihuo.id=xianhuo.id;

在上述資料流上,每隔1秒,取長度為3分鐘的視窗. 轉化為 “最近3分鐘” 的資料流:


 CREATE STREAM window_diff_stream AS
    SELECT * FROM transform_stream2
      STREAMWINDOW w1 AS (LENGTH '3' MINUTE SLIDE '1' SECOND);

最終求取最近3分鐘內, 每個ID現貨期貨之差的最大值和最小值,以及它們的差:


  CREATE VIEW max_min_diff_window_stream AS SELECT maxTable.id, maxTable.diff AS maxDiff,
  maxTable.timestamp AS maxTime, minTable.diff AS minDiff, minTable.timestamp AS minTime,
  (maxTable.diff - minTable.diff) AS maxMinDiff, minTable.maxtime AS calTime FROM
  (SELECT id,
  timestamp,
  diff,
  maxtime FROM (
  SELECT id,
  timestamp,
  diff,
  row_number() OVER (PARTITION BY id ORDER BY diff ) AS minDiff,
  max(timestamp) OVER (PARTITION BY id) AS maxtime FROM window_diff_stream)
  WHERE minDiff=1) minTable
  JOIN
  (SELECT id,
  timestamp,
  diff FROM (
  SELECT id,
  timestamp,
  diff,
  row_number() OVER (PARTITION BY id ORDER BY diff DESC ) AS maxDiff
  FROM window_diff_stream)
  WHERE maxDiff=1) maxTable
  ON minTable.id = maxTable.id;mily觸發計算:
  CREATE TABLE result(maxTableId STRING, maxDiff DOUBLE, maxTime STRING, minDiff DOUBLE, minTime
  STRING, maxMinDiff DOUBLE, calTime STRING);
  SET streamsql.use.eventtime=true;
  INSERT INTO result SELECT * FROM  max_min_diff_window_stream;

流上的PL/SQL

當她老闆得知Slipstream還支援PLSQL時,讓她調研是否將原有的PLSQL程式碼遷移到Slipstream上。除了上述 期貨和現貨表, 現在還有一個交易表, 包含ID,時間,以及交易量, Emily要對每個ID累計從開市到當前時間的 交易量。另外一個需求是當 max_min_diff_window_stream 這個 stream 中 maxmindiff 大於等於 50 時, 觸發警報, 選取day_sum中, 累計值前20的記錄, 插入警告表:

 CREATE STREAM transaction_stream(timestamp STRING, id STRING, transaction DOUBLE) ROW FORMAT
  DELIMITED FIELDS TERMINATED BY ',' TBLPROPERTIES ('topic'='transaction', "kafka.zookeeper"="tw-
  node127:2181");
  CREATE TABLE day_sum(id STRING , sd STRING, total DOUBLE) ROW FORMAT DELIMITED FIELDS TERMINATED BY
  ',';
  CREATE TABLE warm_transaction (id STRING, timestamp STRING, total DOUBLE);
  CREATE VIEW transaction_sum AS SELECT id, timestamp, sum(s.transaction) total FROM
  transaction_stream s GROUP BY id, timestamp;
  SET plsql.show.sqlresults=true;
  SET stream.enabled=true;
  SET stream.tables=qihuo2,xianhuo2,transaction_stream;
DECLARE
  threshold_count int := 0
  BEGIN
  INSERT INTO day_sum SELECT id, sd, CASE WHEN isnull(total2) THEN total1 ELSE (total1 + total2) END
  total FROM
  (SELECT t1.id id, t1.timestamp sd , t1.total total1, t2.total total2 FROM transaction_sum t1 LEFT
  JOIN day_sum t2 ON t1.id=t2.id AND (to_unix_timestamp(t2.sd, 'yyyy-MM-dd HH:mm:ss') +
  1)=unix_timestamp(t1.timestamp, 'yyyy-MM-dd HH:mm:ss'))
  CREATE STREAM transaction_stream(timestamp STRING, id STRING, transaction DOUBLE) ROW FORMAT
  DELIMITED FIELDS TERMINATED BY ',' TBLPROPERTIES ('topic'='${db}.transaction', "kafka.zookeeper"
  ="tw-node127:2181");
  SELECT count(*) INTO threshold_count FROM max_min_diff_window_stream WHERE maxmindiff >= 50
  IF threshold_count > 0 THEN
     INSERT INTO warm_transaction SELECT id, sd, total FROM day_sum ORDER BY total DESC LIMIT 20
END IF END;

StreamJob的持久化

Emily可以熟練使用Slipstream了,但有一天當她配置了一些資訊後,應用意外終止了,因此她開始使 用StreamJob做持久化:

CREATE STREAMJOB access_log_streamjob AS ("INSERT INTO result SELECT url, COUNT(*) FROM waccesslog
  GROUP BY url");
  START STREAMJOB access_log_streamjob;

郵件警告

在每臺機器的/etc/inceptor1/conf/alert4j.properties中配置:

  alert4j.service=email
  email.server.host=smtp.exmail.qq.com
  email.server.port=465
  email.server.ssl=true
  email.validate=true
  email.sender.username[email protected].io
  email.sender.password=password
  email.from.address[email protected].io
  email.to.addresses[email protected].io

當系統發生問題時,系統會發郵件到[email protected], 其中包含了關於系統問題的重要資訊.

高可用性

通過Transwarp Manager管理介面配置額外的InceptorServer,設定它的role為Standby。 設定完畢後將其啟 動。
使用 checkpoint 和 WAL 保證不丟失資料:
第一步, 通過Transwarp Manager管理介面修改InceptorServer配置,新增”stream.driver.checkpoint.dir” 配置項,並將其設定為HDFS上的某個目錄。配置完畢後,重啟Inceptor。
第二步, 建立 application 時需要指定 application 的 checkpoint 目錄,設定為HDFS上的某個目錄:

  CREATE APPLICATION app1 WITH APPPROPERTIES("application.checkpoint.dir"="/tmp/app1/",
  "application.enable.wal"="true");
或者對於已有的 application, 設定這兩個屬性:
  ALTER APPLICATION app1 SET APPPROPERTIES("application.checkpoint.dir"="/tmp/app1/");
  ALTER APPLICATION app1 SET APPPROPERTIES("application.enable.wal"="true");

Holodesk配置

Emily老闆告訴她,網站訪問次數統計功能非常有用,有好幾個其他小組都在使用該統計結果。然而,由於當 前資料存在Hive Table,查詢速度太慢。老闆讓她想想有沒有什麼辦法使查詢速度更快。Emily想到組 件Holodesk可以用於快速查詢分析,因此她將結果表設定為Holodesk,並做了相關引數設定:

CREATE TABLE holo(url STRING, count INT) STORED AS HOLODESK TBLPROPERTIES("holodesk.window.length"
  ="100000","holodesk.window.timeout"="100000");
  CREATE STREAM w_accesslog AS SELECT * FROM accesslog STREAMWINDOW sw AS (LENGTH '10' SECOND SLIDE
  '10' SECOND) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
  INSERT INTO holo SELECT url, COUNT(*) FROM w_accesslog GROUP BY url;

Slipstream引數整理

SET =;
這裡寫圖片描述