1. 程式人生 > >使用spark rdd計算手機在基站停留時間

使用spark rdd計算手機在基站停留時間

lac_log.txt

9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

user.log

18611132889,20160327075000,9F36407EAD0629FC166F14DDE7970F68,1
18688888888,20160327075100,9F36407EAD0629FC166F14DDE7970F68,1
18611132889,20160327081000,9F36407EAD0629FC166F14DDE7970F68,0
18688888888,20160327081300,9F36407EAD0629FC166F14DDE7970F68,0
18688888888,20160327175000,9F36407EAD0629FC166F14DDE7970F68,1
18611132889,20160327182000,9F36407EAD0629FC166F14DDE7970F68,1
18688888888,20160327220000,9F36407EAD0629FC166F14DDE7970F68,0

通過使用者手機連線訊號塔(也稱基站)產生的日誌資訊,判斷使用者的家庭地址和工作地址,也就是求出使用者的手機在哪2個位置停留時間最長。現在有兩張表,
使用者產生的訪問日誌表(user.log),格式如下:

手機號 進入基站的時間 基站id 事件型別(1代表進入基站,2代表出基站)
18611132889 20160327075000 9F36407EAD0629FC166F14DDE7970F68 1

基站資訊表(lac_info.txt),格式如下:

基站id 基站的經度 基站的經度
9F36407EAD0629FC166F14DDE7970F68 116.304864 40.050645

請使用spark rdd 實現此需求,執行結果如下:

手機號:位置經緯度1 停留時長 位置經緯度2 停留時長
18611132889 :(116.296302,40.032296) 97500|(116.304864,40.050645) 54000|
18688888888 :(116.296302,40.032296) 87600|(116.304864,40.050645) 51200|
實現程式碼如下

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object demo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("month demo").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val rdd_Info = sc.textFile("d://user.log").map(line => {
      val fields = line.split(",")
      //事件型別
      val eventType = fields(3)
      //時間戳
      val time = fields(1)
      val timeLong = if (eventType == "1") -time.toLong else time.toLong
      //(手機號,基站ID資訊,帶符號的時間戳)
      ((fields(0), fields(2)), timeLong)
    })
    val rdd_lacInfo = rdd_Info.reduceByKey(_ + _).map(t => {
      val mobile = t._1._1 //手機號
      val lac = t._1._2 //id
      val time = t._2 //時間
      (lac, (mobile, time))
    })
    val rdd_coordinate = sc.textFile("d://lac_info.txt").map(line => {
      val f = line.split(",")
      //(基站ID, (經度, 緯度))
      (f(0), (f(1), f(2)))
    })
    val rdd_all = rdd_lacInfo.join(rdd_coordinate).map(t => {
      val lac = t._1
      val mobile = t._2._1._1
      //      println(mobile)
      val time = t._2._1._2
      val x = t._2._2._1
      val y = t._2._2._2
      (mobile, lac, time, x, y)
    })
    //按照手機號進行分組
    val rdd_mobile = rdd_all.groupBy(_._1)
    //取出停留時間最長的前兩個基站
    val rdd_topTwo: RDD[(String, List[(String, String, Long, String, String)])] = rdd_mobile.mapValues(it => {
      it.toList.sortBy(_._3).reverse.take(2)
    })
    val re = rdd_topTwo.map(t => {
      val long1 = t._2(0)._4
      val lac1 = t._2(0)._5
      val timelong1 = t._2(0)._3
      val timelong2 = t._2(1)._3
      (t._1, (long1, lac1, timelong1), (long1, lac1, timelong2))
    })
    re.foreach(println)
    sc.stop()
  }
}

//輸出
(18611132889,(116.296302,40.032296,97500),(116.296302,40.032296,54000))
(18688888888,(116.296302,40.032296,87600),(116.296302,40.032296,51200))