1. 程式人生 > >Scala Spark 得到最近一天的資料 重點:join

Scala Spark 得到最近一天的資料 重點:join

0.資料

val data=
"""
user date      item1 item2
1    2015-12-01 14  5.6
1    2015-12-01 10  0.6
1    2015-12-02 8   9.4
1    2015-12-02 90  1.3
2    2015-12-01 30  0.3
2    2015-12-01 89  1.2
2    2015-12-30 70  1.9
2    2015-12-31 20  2.5
3    2015-12-01 19  9.3
3    2015-12-01 40  2.3
3    2015-12-02 13  1.4
3    2015-12-02 50  1.0
3    2015-12-02 19  7.8
"""

1.方案一

val data2 = data.trim.split("\\n").map(_.split("\\s+").map{
    f=>{
    (f(0),Listbuffer(f(1).toString,f(2).toInt,f(3).toDouble)
    }
}

val data3 = sc.parallelize(data2)

val dataReduce = data3.reduceByKey((x,y) =>
    if(x(0).toString >= y(0).toString) x else y)

val dataUserAndDateKey = data3.map{
    rec
=>((rec._1,rec._2(0)),rec) } //daraUserAndDateKey:((3,2015-12-02),(...)) val dataReduceUserAndDateKey = dataReduce.map{ rec => ((rec._1,rec._2(0)),rec) } //daraReduceUserAndDateKey:((3,2015-12-02),(...)), //不過每個使用者只有最新一天的一條記錄 //現在要得到的是每個使用者最新一天的所有記錄 val joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey) joinData.foreach(println) ((3
,2015-12-02),((3,ListBuffer(2015-12-02, 13, 1.4)), (3,ListBuffer(2015-12-02, 13, 1.4)))) ((2,2015-12-31),((2,ListBuffer(2015-12-31, 20, 2.5)), (2,ListBuffer(2015-12-31, 20, 2.5)))) ((3,2015-12-02),((3,ListBuffer(2015-12-02, 50, 1.0)), (3,ListBuffer(2015-12-02, 13, 1.4)))) ((3,2015-12-02),((3,ListBuffer(2015-12-02, 19, 7.8)), (3,ListBuffer(2015-12-02, 13, 1.4)))) ((1,2015-12-02),((1,ListBuffer(2015-12-02, 8, 9.4)), (1,ListBuffer(2015-12-02, 8, 9.4)))) ((1,2015-12-02),((1,ListBuffer(2015-12-02, 90, 1.3)), (1,ListBuffer(2015-12-02, 8, 9.4)))) //我們要的是 joinData.map(rec=>rec._2._1).foreach(println) (3,ListBuffer(2015-12-02, 13, 1.4)) (2,ListBuffer(2015-12-31, 20, 2.5)) (3,ListBuffer(2015-12-02, 50, 1.0)) (3,ListBuffer(2015-12-02, 19, 7.8)) (1,ListBuffer(2015-12-02, 8, 9.4)) (1,ListBuffer(2015-12-02, 90, 1.3))

2.方案二

1.準備資料

val inputLines = sc.parallelize(data.split("\\r?\\n"))
// 把header 去掉
val data2 = inputLines.filter(l => !l.startsWith("user"))
data2.foreach(println)

2.找到每個使用者最近的一天

val keyByUser = data.map(
line => {
    val a = line.split("\\s+")
    (a(0),line)
    })

//對每個使用者,找到他最新的一天
val latestByUser = keyByUser.reduceByKey(
(x,y)=>
if(x.split("\\s+")(1) > y.split("\\s+") x else y)
latestByUser.foreach(println)

3.Join 原始資料和最近的一天來得到結果

val latestKeyedByUserAndDate = latestByUser.map(
x=>(x._1 +":"+x._2.split("\\s+")(1),x._2))

val originalKeyedByUserAndDate = data.map(
line =>{
        val a = line.split("\\s+")
        (a(0) + ":"+a(1),line)
})
val result = latestKeyedByUserAndDate.join(originalKeyedByUserAndDate)
result.foreach(println)

4.將結果轉換成你想要的形式

def createCombiner(v:(String,String)):
    List[(String,String)] = List[(String,String)](v)

def mergeValue(acc:LIst[(String,String)],
value:(String,String)):List[(String,String)]
 = value::acc

def mergeCombiners(acc1:List[(String,String)],
acc2:List[(String,String)]):List[(String,String)]
= acc2 ::: acc1

//use combineByKey
val transformedResult = result.mapValue(
l=>{
    val a = l._2.split(" +")
    (a(2),a(3))
}).combineByKey(createCombiner,mergeValue,mergeCombiners)
transformedResult.foreach(println)