基於spark實現表的join操作
1. 自連線
假設存在如下檔案:
[root@bluejoe0 ~]# cat categories.csv
1,生活用品,0
2,數碼用品,1
3,手機,2
4,華為Mate7,3
每一行的格式為:類別ID,類別名稱,父類ID
現在欲輸出每個類別的父類別的名稱,類似於SQL的自連線,注意到join的外來鍵其實是父類ID。
首先生成“父類ID->子類ID,子類名稱”
val categories=sc.textFile("/root/categories.csv")
val left = categories.map(_.split(",")).map(x=>(x(2 )->Map("id"->x(0),"name"->x(1))))
left的內容為:
Array((0,Map(id -> 1, name -> 生活用品)), (1,Map(id -> 2, name -> 數碼用品)), (2,Map(id -> 3, name -> 手機)), (3,Map(id -> 4, name -> 華為Mate7)))
接著生成“父類ID->父類ID,父類名稱”
val right = categories.map(_.split(",")).map(x=>(x(0)->Map("pid" ->x(0),"pname"->x(1))))
right的內容為:
Array((1,Map(pid -> 1, pname -> 生活用品)), (2,Map(pid -> 2, pname -> 數碼用品)), (3,Map(pid -> 3, pname -> 手機)), (4,Map(pid -> 4, pname -> 華為Mate7)))
接下來,將這兩個RDD進行合併,並按照key(key都是父類ID)進行reduce:
val merged = (left++right).reduceByKey(_ ++_)
merged的內容為:
Array((4,Map(pid -> 4, pname -> 華為Mate7)), (0,Map(id -> 1, name -> 生活用品)), (1,Map(id -> 2, name -> 數碼用品, pid -> 1, pname -> 生活用品)), (2,Map(id -> 3, name -> 手機, pid -> 2, pname -> 數碼用品)), (3,Map(id -> 4, name -> 華為Mate7, pid -> 3, pname -> 手機)))
搞定!!
可以採用flatMap來簡化以上的寫法:
val merged = categories.map(_.split(",")).flatMap((x)=>Array(x(2)->Map("id"->x(0),"name"->x(1)), x(0)->Map("pid"->x(0),"pname"->x(1)))).reduceByKey(_++_)
結果是一樣一樣的!!當然程式碼的可讀性大打折扣了~~~