使用 Spark 輕鬆做資料透視(Pivot)
spark從1.6開始引入,到現在2.4版本,pivot運算元有了進一步增強,這使得後續無論是交給pandas繼續做處理,還是交給R繼續分析,都簡化了不少。大家無論在使用pandas、numpy或是R的時候,首先會做的就是處理資料,尤其是將列表,轉成成合適的形狀。
列表
在說透視表之前,我們先看看,什麼是列表,在傳統觀念上,列表的每一行代表一條記錄,而每一列代表一個屬性。
+-------+-------+-----+
| date|project|value|
+-------+-------+-----+
|2018-01| p1| 100|
|2018-01| p2| 200|
|2018-01| p3| 300|
|2018-02| p1| 1000|
|2018-02| p2| 2000|
|2018-03| px| 999|
+-------+-------+-----+
舉個簡單的例子,如上表,一條記錄可能代表某個專案,在某個年月創造的價值。而在這個表裡面,某一列,就代表一個屬性,比如date代表日期,project代表專案名稱。而這裡每一行,代表一條獨立,完整的記錄,一條與另外一條記錄,沒有直接的關係。
這種結構,也是一般關係型資料庫的資料結構。
透視表
透視表沒有一個明確的定義,一般是觀念上是指,為了方便進行資料分析,而對資料進行一定的重排,方便後續分析,計算等操作。透視表每一個元素及其對應的“座標”一起形成一條完整的記錄。
+-------+------+------+-----+-----+
| date| p1| p2| p3| px|
+-------+------+------+-----+-----+
|2018-01| 100.0| 200.0|300.0| 0.0|
|2018-02|1000.0|2000.0| 0.0| 0.0|
|2018-03| 0.0| 0.0| 0.0|999.0|
+-------+------+------+-----+-----+
上面的表,是將列表進行重排後的透視表,其第一行和第一列可以理解成索引,而在表中根據索引可以確定一條唯一的值,他們一起組成一條相當於列表裡的資料。
通過一般的定義,我們能看出,透視表主要用於分析,所以,一般的場景我們都會先對資料進行聚合,以後再對資料分析,這樣也更有意義。就好像,將話費清單,做成透視表,儘管邏輯上沒有任何問題,但是結果是可能比現在的清單列表更難查閱。
PS:一些可以借鑑的名詞,目前維基百科並沒有收錄,也只能權且理解一下吧
建模擬資料
先來模擬個數據吧,按照前面的例子,建個csv,這裡多加了一列s2,是為了做多透視列的,
date,project,value,s2 2018-01,p1,100,12 2018-01,p2,200,33 2018-01,p3,300,44 2018-02,p1,1000,22 2018-02,p2,2000,41 2018-03,px,999,22
spark API
我們先來看下DEMO程式
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); SparkContext sc = SparkContext.getOrCreate(sparkConf); SparkSession ss = new SparkSession(sc); Dataset<Row> ds = ss.read() //csv分隔符 .option("sep", ",") //是否包含header .option("header", "true") //載入csv路徑 .csv("E:\\devlop\\workspace\\sparkdemo\\src\\main\\java\\com\\dafei1288\\spark\\data1.csv"); Dataset<Row>r = //設定分組 ds.groupBy(col("date")) //設定pivot .pivot("project") //設定聚合 .agg(sum("value")); r.show();
在載入csv的時候,我們設定了分隔符,以及讀取表頭。
對載入後的dataset只需要進行3步設定
-
groupBy 設定分組列
-
pivot 設定pivot列
-
agg 設定聚合方式,可以是求和、平均等聚合函式
我們得到的輸出結果如下:
+-------+------+------+-----+-----+
| date| p1| p2| p3| px|
+-------+------+------+-----+-----+
|2018-03| null| null| null|999.0|
|2018-02|1000.0|2000.0| null| null|
|2018-01| 100.0| 200.0|300.0| null|
+-------+------+------+-----+-----+
請注意,這裡和sql有些區別,就是groupBy的時候,不需要將project列寫入了,如果寫入成了
groupBy(col("date"),col("project"))
那麼結果就是這樣了
+-------+-------+------+------+-----+-----+
| date|project| p1| p2| p3| px|
+-------+-------+------+------+-----+-----+
|2018-01| p3| null| null|300.0| null|
|2018-01| p2| null| 200.0| null| null|
|2018-01| p1| 100.0| null| null| null|
|2018-03| px| null| null| null|999.0|
|2018-02| p1|1000.0| null| null| null|
|2018-02| p2| null|2000.0| null| null|
+-------+-------+------+------+-----+-----+
sparkSQL
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); SparkContext sc = SparkContext.getOrCreate(sparkConf); SparkSession ss = new SparkSession(sc); Dataset<Row> ds = ss.read() .option("sep", ",") .option("header", "true").csv("E:\\devlop\\workspace\\sparkdemo\\src\\main\\java\\com\\dafei1288\\spark\\data1.csv"); ds.registerTempTable("f"); Dataset<Row>r = ds.sqlContext().sql( "select * from ( select date,project as p,sum(value) as ss from f group by date,project ) pivot ( sum(ss) for p in ( 'p1','p2','p3','px' ) ) order by date"); r.na().fill(0).show();
可以看到,這裡我們將讀取的csv註冊成了表f,使用spark sql語句,這裡和oracle的透視語句類似
pivot語法: pivot( 聚合列 for 待轉換列 in (列值) )
其語法還是比較簡單的。
為了展示資料好看一點,我特意使用語句
r.na().fill(0)
將空值`null`替換成了0。
+-------+------+------+-----+-----+
| date| p1| p2| p3| px|
+-------+------+------+-----+-----+
|2018-01| 100.0| 200.0|300.0| 0.0|
|2018-02|1000.0|2000.0| 0.0| 0.0|
|2018-03| 0.0| 0.0| 0.0|999.0|
+-------+------+------+-----+-----+
多聚合列
上文提到了,多做了一列,就是為了這個DEMO準備的,使用如下SparkSQL語句,設定多聚合列透視表
select * from ( select date,project as p,sum(value) as ss,sum(s2) as ss2 from f group by date,project ) pivot ( sum(ss),sum(ss2) for p in ( 'p1','p2','p3','px' ) ) order by date
這裡為例方便看,我就截圖了
為了防止OOM的情況,spark對pivot的資料量進行了限制,其可以通過 spark.sql.pivotMaxValues 來進行修改,預設值為10000,這裡是指piovt後的列數。
好了,關於spark pivot就介紹到這了,其實這裡與矩陣的行列轉換類似,pivot對應的也有unpivot,下次我們再聊。
參考資料:
https://stackoverflow.com/questions/30244910/how-to-pivot-dataframe
https://databricks.com/session/pivoting-data-with-sparksql