1. 程式人生 > >基於spark實現表的join操作

基於spark實現表的join操作

1. 自連線

假設存在如下檔案:

[root@bluejoe0 ~]# cat categories.csv 
1,生活用品,0
2,數碼用品,1
3,手機,2
4,華為Mate7,3

每一行的格式為:類別ID,類別名稱,父類ID

現在欲輸出每個類別的父類別的名稱,類似於SQL的自連線,注意到join的外來鍵其實是父類ID

首先生成“父類ID->子類ID,子類名稱”

val categories=sc.textFile("/root/categories.csv")

val left = categories.map(_.split(",")).map(x=>(x(2
)->Map("id"->x(0),"name"->x(1))))

left的內容為:

Array((0,Map(id -> 1, name -> 生活用品)), (1,Map(id -> 2, name -> 數碼用品)), (2,Map(id -> 3, name -> 手機)), (3,Map(id -> 4, name -> 華為Mate7)))

接著生成“父類ID->父類ID,父類名稱”

val right = categories.map(_.split(",")).map(x=>(x(0)->Map("pid"
->x(0),"pname"->x(1))))

right的內容為:

Array((1,Map(pid -> 1, pname -> 生活用品)), (2,Map(pid -> 2, pname -> 數碼用品)), (3,Map(pid -> 3, pname -> 手機)), (4,Map(pid -> 4, pname -> 華為Mate7)))

接下來,將這兩個RDD進行合併,並按照key(key都是父類ID)進行reduce:

val merged = (left++right).reduceByKey(_
++_)

merged的內容為:

Array((4,Map(pid -> 4, pname -> 華為Mate7)), (0,Map(id -> 1, name -> 生活用品)), (1,Map(id -> 2, name -> 數碼用品, pid -> 1, pname -> 生活用品)), (2,Map(id -> 3, name -> 手機, pid -> 2, pname -> 數碼用品)), (3,Map(id -> 4, name -> 華為Mate7, pid -> 3, pname -> 手機)))

搞定!!

可以採用flatMap來簡化以上的寫法:

val merged = categories.map(_.split(",")).flatMap((x)=>Array(x(2)->Map("id"->x(0),"name"->x(1)), x(0)->Map("pid"->x(0),"pname"->x(1)))).reduceByKey(_++_)

結果是一樣一樣的!!當然程式碼的可讀性大打折扣了~~~

2. 兩張表連線