Scala Spark 得到最近一天的資料 重點:join
阿新 • • 發佈:2018-12-25
0.資料
val data=
"""
user date item1 item2
1 2015-12-01 14 5.6
1 2015-12-01 10 0.6
1 2015-12-02 8 9.4
1 2015-12-02 90 1.3
2 2015-12-01 30 0.3
2 2015-12-01 89 1.2
2 2015-12-30 70 1.9
2 2015-12-31 20 2.5
3 2015-12-01 19 9.3
3 2015-12-01 40 2.3
3 2015-12-02 13 1.4
3 2015-12-02 50 1.0
3 2015-12-02 19 7.8
"""
1.方案一
val data2 = data.trim.split("\\n").map(_.split("\\s+").map{
f=>{
(f(0),Listbuffer(f(1).toString,f(2).toInt,f(3).toDouble)
}
}
val data3 = sc.parallelize(data2)
val dataReduce = data3.reduceByKey((x,y) =>
if(x(0).toString >= y(0).toString) x else y)
val dataUserAndDateKey = data3.map{
rec =>((rec._1,rec._2(0)),rec)
}
//daraUserAndDateKey:((3,2015-12-02),(...))
val dataReduceUserAndDateKey = dataReduce.map{
rec => ((rec._1,rec._2(0)),rec)
}
//daraReduceUserAndDateKey:((3,2015-12-02),(...)),
//不過每個使用者只有最新一天的一條記錄
//現在要得到的是每個使用者最新一天的所有記錄
val joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey)
joinData.foreach(println)
((3 ,2015-12-02),((3,ListBuffer(2015-12-02, 13, 1.4)),
(3,ListBuffer(2015-12-02, 13, 1.4))))
((2,2015-12-31),((2,ListBuffer(2015-12-31, 20, 2.5)),
(2,ListBuffer(2015-12-31, 20, 2.5))))
((3,2015-12-02),((3,ListBuffer(2015-12-02, 50, 1.0)),
(3,ListBuffer(2015-12-02, 13, 1.4))))
((3,2015-12-02),((3,ListBuffer(2015-12-02, 19, 7.8)),
(3,ListBuffer(2015-12-02, 13, 1.4))))
((1,2015-12-02),((1,ListBuffer(2015-12-02, 8, 9.4)),
(1,ListBuffer(2015-12-02, 8, 9.4))))
((1,2015-12-02),((1,ListBuffer(2015-12-02, 90, 1.3)),
(1,ListBuffer(2015-12-02, 8, 9.4))))
//我們要的是
joinData.map(rec=>rec._2._1).foreach(println)
(3,ListBuffer(2015-12-02, 13, 1.4))
(2,ListBuffer(2015-12-31, 20, 2.5))
(3,ListBuffer(2015-12-02, 50, 1.0))
(3,ListBuffer(2015-12-02, 19, 7.8))
(1,ListBuffer(2015-12-02, 8, 9.4))
(1,ListBuffer(2015-12-02, 90, 1.3))
2.方案二
1.準備資料
val inputLines = sc.parallelize(data.split("\\r?\\n"))
// 把header 去掉
val data2 = inputLines.filter(l => !l.startsWith("user"))
data2.foreach(println)
2.找到每個使用者最近的一天
val keyByUser = data.map(
line => {
val a = line.split("\\s+")
(a(0),line)
})
//對每個使用者,找到他最新的一天
val latestByUser = keyByUser.reduceByKey(
(x,y)=>
if(x.split("\\s+")(1) > y.split("\\s+") x else y)
latestByUser.foreach(println)
3.Join 原始資料和最近的一天來得到結果
val latestKeyedByUserAndDate = latestByUser.map(
x=>(x._1 +":"+x._2.split("\\s+")(1),x._2))
val originalKeyedByUserAndDate = data.map(
line =>{
val a = line.split("\\s+")
(a(0) + ":"+a(1),line)
})
val result = latestKeyedByUserAndDate.join(originalKeyedByUserAndDate)
result.foreach(println)
4.將結果轉換成你想要的形式
def createCombiner(v:(String,String)):
List[(String,String)] = List[(String,String)](v)
def mergeValue(acc:LIst[(String,String)],
value:(String,String)):List[(String,String)]
= value::acc
def mergeCombiners(acc1:List[(String,String)],
acc2:List[(String,String)]):List[(String,String)]
= acc2 ::: acc1
//use combineByKey
val transformedResult = result.mapValue(
l=>{
val a = l._2.split(" +")
(a(2),a(3))
}).combineByKey(createCombiner,mergeValue,mergeCombiners)
transformedResult.foreach(println)