1. 程式人生 > >使用Spark Streaming SQL基於時間視窗進行資料統計

使用Spark Streaming SQL基於時間視窗進行資料統計

2.時間窗語法說明

Spark Streaming SQL支援兩類視窗操作:滾動視窗(TUMBLING)和滑動視窗(HOPPING)。

2.1滾動視窗

滾動視窗(TUMBLING)根據每條資料的時間欄位將資料分配到一個指定大小的視窗中進行操作,視窗以視窗大小為步長進行滑動,視窗之間不會出現重疊。例如:如果指定了一個5分鐘大小的滾動視窗,資料會根據時間劃分到 [0:00 - 0:05)、 [0:05, 0:10)[0:10, 0:15)等視窗。

  • 語法
GROUP BY TUMBLING ( colName, windowDuration ) 
  • 示例

對inventory表的inv_data_time時間列進行視窗操作,統計inv_quantity_on_hand的均值;視窗大小為1分鐘。

SELECT avg(inv_quantity_on_hand) qoh
FROM inventory
GROUP BY TUMBLING (inv_data_time, interval 1 minute)

2.2滑動視窗

滑動視窗(HOPPING),也被稱作Sliding Window。不同於滾動視窗,滑動視窗可以設定視窗滑動的步長,所以視窗可以重疊。滑動視窗有兩個引數:windowDuration和slideDuration。slideDuration為每次滑動的步長,windowDuration為視窗的大小。當slideDuration < windowDuration時視窗會重疊,每個元素會被分配到多個視窗中。
所以,滾動視窗其實是滑動視窗的一種特殊情況,即slideDuration = windowDuration則等同於滾動視窗。

  • 語法
GROUP BY HOPPING ( colName, windowDuration, slideDuration ) 
  • 示例

對inventory表的inv_data_time時間列進行視窗操作,統計inv_quantity_on_hand的均值;視窗為1分鐘,滑動步長為30秒。

SELECT avg(inv_quantity_on_hand) qoh
FROM inventory
GROUP BY HOPPING (inv_data_time, interval 1 minute, interval 30 second)

3.系統架構

業務日誌收集到Aliyun SLS後,Spark對接SLS,通過Streaming SQL對資料進行處理並將統計後的結果寫入HDFS中。後續的操作流程主要集中在Spark Streaming SQL接收SLS資料並寫入HDFS的部分,有關日誌的採集請參考

日誌服務

4.操作流程

4.1環境準備

  • 建立E-MapReduce 3.21.0以上版本的Hadoop叢集。
  • 下載並編譯E-MapReduce-SDK包
git clone [email protected]:aliyun/aliyun-emapreduce-sdk.git
cd aliyun-emapreduce-sdk
git checkout -b master-2.x origin/master-2.x
mvn clean package -DskipTests

編譯完後, assembly/target目錄下會生成emr-datasources_shaded_${version}.jar,其中${version}為sdk的版本。

4.2建立表

命令列啟動spark-sql客戶端

spark-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

建立SLS和HDFS表

spark-sql> CREATE DATABASE IF NOT EXISTS default;
spark-sql> USE default;

-- 資料來源表
spark-sql> CREATE TABLE IF NOT EXISTS sls_user_log
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");

--結果表
spark-sql> CREATE TABLE hdfs_user_click_count
USING org.apache.spark.sql.json
OPTIONS (path '${hdfsPath}');

4.3統計使用者點選數

spark-sql>SET streaming.query.name=user_click_count; 
spark-sql>SET spark.sql.streaming.checkpointLocation.user_click_count=hdfs:///tmp/spark/sql/streaming/test/user_click_count; 
spark-sql>insert into hdfs_user_click_count 
select sum(cast(action_click as int)) as click, userId, window from sls_user_log 
where delay(__time__)<"1 minute" 
group by TUMBLING(__time__, interval 5 second), userId;

其中,內建函式delay()用來設定Streaming SQL中的watermark,後續會有專門的文章介紹Streaming SQL watermark的相關內容。

4.4檢視結果

可以看到,產生的結果會自動生成一個window列,包含視窗的起止時間資訊。

5.結語

本文簡要介紹了流式處理中基於事件時間進行處理的場景,以及Spark Streaming SQL時間視窗的相關內容,並通過一個簡單案例介紹了時間視窗的使用。後續文章,我將介紹Spark Streaming SQL的更多內容。

作者:ligh-rain

原文連結

本文為雲棲社群原創內容,未經