1. 程式人生 > >Spark非常實用的視窗函式

Spark非常實用的視窗函式

spark 累加歷史主要用到了視窗函式,而進行全部統計,則需要用到rollup函式

1 應用場景:

1、我們需要統計使用者的總使用時長(累加歷史)

2、前臺展現頁面需要對多個維度進行查詢,如:產品、地區等等

3、需要展現的表格頭如: 產品、2015-04、2015-05、2015-06

2 原始資料:

product_code event_date duration
1438 2016-05-13 165
1438 2016-05-14 595
1438 2016-05-15 105
1629 2016-05-13 12340
1629 2016-05-14 13850
1629 2016-05-15 227

3 業務場景實現

3.1 業務場景1:累加歷史:

如資料來源所示:我們已經有當天使用者的使用時長,我們期望在進行統計的時候,14號能累加13號的,15號能累加14、13號的,以此類推

3.1.1 spark-sql實現

//spark sql 使用視窗函式累加歷史資料

sqlContext.sql(
"""
  select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration
  from userlogs_date
""").show
+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

3.1.2 dataframe實現

//使用Column提供的over 函式,傳入視窗操作
import org.apache.spark.sql.expressions._
val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date")
df_userlogs_date.select(
    $"pcode",
    $"event_date",
    sum($"duration").over(first_2_now_window).as("sum_duration")
).show

+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

3.1.3 擴充套件 累加一段時間範圍內

實際業務中的累加邏輯遠比上面複雜,比如,累加之前N天,累加前N天到後N天等等。以下我們來實現:

3.1.3.1 累加歷史所有:

select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)
Window.partitionBy("pcode").orderBy("event_date")

上邊四種寫法完全相等

3.1.3.2 累加N天之前,假設N=3

//如果,不想要分割槽,想從每月的第一天累加的當前天 可以去掉partition
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 3 preceding and current row) as sum_duration
 from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0) 

3.1.3.3 累加前N天,後M天: 假設N=3 M=5

select pcode,event_date,sum(duration) over (partition by pcode order by
 event_date asc rows between 3 preceding and 5 following ) as sum_duration
 from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)

3.1.3.4 累加該分割槽內所有行

select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and unbounded following ) 
as sum_duration from userlogs_date
Window.partitionBy("pcode").orderBy("event_date").rowsBetween
(Long.MinValue,Long.MaxValue)

總結如下:
preceding:用於累加前N行(分割槽之內)。若是從分割槽第一行頭開始,則為 unbounded。 N為:相對當前行向前的偏移量
following :與preceding相反,累加後N行(分割槽之內)。若是累加到該分割槽結束,則為 unbounded。N為:相對當前行向後的偏移量
current row:顧名思義,當前行,偏移量為0
說明:上邊的前N,後M,以及current row均會累加該偏移量所在行

3.1.3.4 實測結果

累加歷史:分割槽內當天及之前所有 寫法
1:select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc) as sum_duration from userlogs_date




+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

累加歷史:分割槽內當天及之前所有 寫法2:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and current row) as 
sum_duration from userlogs_date



+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

累加當日和昨天:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 1 preceding and current row) as sum_duration
 from userlogs_date



+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         760|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       26190|
| 1629|2016-05-15|       14077|
+-----+----------+------------+
累加當日、昨日、明日:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between 1 preceding and 1 following ) as sum_duration
 from userlogs_date



+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         760|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         700|
| 1629|2016-05-13|       26190|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       14077|
+-----+----------+------------+

累加分割槽內所有:當天和之前之後所有:
select pcode,event_date,sum(duration) over (partition by pcode order by 
event_date asc rows between unbounded preceding and unbounded following )
 as sum_duration from userlogs_date



+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| 1438|2016-05-13|         865|
| 1438|2016-05-14|         865|
| 1438|2016-05-15|         865|
| 1629|2016-05-13|       26417|
| 1629|2016-05-14|       26417|
| 1629|2016-05-15|       26417|
+-----+----------+------------+

3.2 業務場景2:統計全部

3.2.1 spark sql實現



//spark sql 使用rollup新增all統計
sqlContext.sql(
"""
  select pcode,event_date,sum(duration) as sum_duration
  from userlogs_date_1
  group by pcode,event_date with rollup
  order by pcode,event_date
""").show()

+-----+----------+------------+                                                 
|pcode|event_date|sum_duration|
+-----+----------+------------+
| null|      null|       27282|
| 1438|      null|         865|
| 1438|2016-05-13|         165|
| 1438|2016-05-14|         595|
| 1438|2016-05-15|         105|
| 1629|      null|       26417|
| 1629|2016-05-13|       12340|
| 1629|2016-05-14|       13850|
| 1629|2016-05-15|         227|
+-----+----------+------------