1. 程式人生 > >Spark2.3(三十四):Spark Structured Streaming之withWaterMark和windows視窗是否可以實現最近一小時統計

Spark2.3(三十四):Spark Structured Streaming之withWaterMark和windows視窗是否可以實現最近一小時統計

WaterMark除了可以限定來遲資料範圍,是否可以實現最近一小時統計?

WaterMark目的用來限定引數計算資料的範圍:比如當前計算資料內max timestamp是12::00,waterMark限定資料分為是60 minutes,那麼如果此時輸入11:00之前的資料就會被捨棄不參與統計,視為來遲範圍超出了60minutes限定範圍。

那麼,是否可以藉助它實現最近一小時的資料統計呢?

程式碼示例:

package com.dx.streaming

import java.sql.Timestamp
import java.text.SimpleDateFormat

import
org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{Encoders, SparkSession} import org.apache.log4j.{Level, Logger} case class MyEntity(id: String, timestamp: Timestamp, value: Integer) object Main { Logger.getLogger("org.apache.spark").setLevel(Level.WARN); Logger.getLogger(
"akka").setLevel(Level.ERROR); Logger.getLogger("kafka").setLevel(Level.ERROR); def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate() val lines = spark.readStream.format("socket").option("host", "192.168.0.141").option("port", 19999).load() var sdf
= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") import spark.implicits._ lines.as(Encoders.STRING) .map(row => { val fields = row.split(",") MyEntity(fields(0), new Timestamp(sdf.parse(fields(1)).getTime), Integer.valueOf(fields(2))) }) .createOrReplaceTempView("tv_entity") spark.sql("select id,timestamp,value from tv_entity") .withWatermark("timestamp", "60 minutes") .createOrReplaceTempView("tv_entity_watermark") val resultDf = spark.sql( s""" |select id,sum(value) as sum_value |from tv_entity_watermark |group id |""".stripMargin) val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start() query.awaitTermination() query.stop() } }

當通過nc -lk 19999中依次(每組輸入間隔幾秒時間即可)輸入如下資料時:

1,2018-12-01 12:00:01,100
2,2018-12-01 12:00:01,100

1,2018-12-01 12:05:01,100
2,2018-12-01 12:05:01,100

1,2018-12-01 12:15:01,100
2,2018-12-01 12:15:01,100

1,2018-12-01 12:25:01,100
2,2018-12-01 12:25:01,100

1,2018-12-01 12:35:01,100
2,2018-12-01 12:35:01,100

1,2018-12-01 12:45:01,100
2,2018-12-01 12:45:01,100

1,2018-12-01 12:55:01,100
2,2018-12-01 12:55:01,100

1,2018-12-01 13:05:02,100
2,2018-12-01 13:05:02,100

1,2018-12-01 13:15:01,100
2,2018-12-01 13:15:01,100

發現最終統計結果為:

id  , sum_value
1   ,  900
2   ,  900

而不是期望的

id  , sum_value
1   ,  600
2   ,  600

既然是不能限定資料統計範圍是60minutes,是否需要藉助於視窗函式window就可以實現呢?

是否需要藉助於watermark和視窗函式window就可以實現最近1小時資料統計呢?

    spark.sql("select id,timestamp,value from tv_entity")
      .withWatermark("timestamp", "60 minutes")
      .createOrReplaceTempView("tv_entity_watermark")

    val resultDf = spark.sql(
      s"""
         |select id,sum(value) as sum_value
         |from  tv_entity_watermark
         |group window(timestamp,'60 minutes','60 minutes'),id
         |""".stripMargin)

    val query = resultDf.writeStream.format("console").outputMode(OutputMode.Update()).start()

依然輸入上邊的測試資料,會發現超過1小時候資料會重新開闢(歸零後重新統計)一個統計結果,而不是滾動的一小時統計。

就是把上邊的測試資料分為了兩組來分別統計:

第一組(小時)參與統計資料:

1,2018-12-01 12:00:01,100
2,2018-12-01 12:00:01,100

1,2018-12-01 12:05:01,100
2,2018-12-01 12:05:01,100

1,2018-12-01 12:15:01,100
2,2018-12-01 12:15:01,100

1,2018-12-01 12:25:01,100
2,2018-12-01 12:25:01,100

1,2018-12-01 12:35:01,100
2,2018-12-01 12:35:01,100

1,2018-12-01 12:45:01,100
2,2018-12-01 12:45:01,100

1,2018-12-01 12:55:01,100
2,2018-12-01 12:55:01,100

第二組(小時)參與統計資料:

1,2018-12-01 13:05:02,100
2,2018-12-01 13:05:02,100

1,2018-12-01 13:15:01,100
2,2018-12-01 13:15:01,100

猜測總結:

根據上邊測試結果可以推出一個猜測結論:

在spark structured streaming中是不儲存引數統計的資料的,只是對資料進行了maxTimestamp.avgTimestamp,minTimestamp儲存,同時只是對資料的統計結果進行儲存,下次再次觸發統計時只是在原有的統計結果之上進行累加等操作,而參與統計的資料應該是沒有儲存,否則這類需求應該是可以實現。