1. 程式人生 > >【原創】案例分享(3)使用者行為分析--見證scala的強大

【原創】案例分享(3)使用者行為分析--見證scala的強大

場景分析

使用者行為分析應用的場景很多,像線上網站訪問統計,線下客流分析(比如影象人臉識別、wifi探針等),比較核心的指標有幾個:

PV | UV | SD | SC

指標說明:

PV(Page View):網站瀏覽量或者商場門店的訪問量
UV(Unique Visitor):獨立訪客數,即去重後的人數
SD(Session Duration):單次會話停留時間
SC(Session Count):會話次數

使用者行為分析的原始資料通常是一系列時間離散資料,比如網站訪問記錄:使用者在一個時間點訪問了一個網頁,然後又在下個時間點訪問了下個網頁;

 

這些原始資料可以抽象為:

User | Timestamp | Target

即使用者在什麼時間點訪問了什麼目標;

 

統計PV、UV比較簡單,但是在時間離散資料的基礎上,要計算SD、SC這兩個指標,常用的方式是設定過期時間閾值,如果使用者兩次訪問的時間間隔超過閾值,則認為是兩次Session;然後在一次Session的所有資料中取時間最早和最晚的資料來統計本次Session Duration;

統計示例

輸入資料

(user1, 2018-12-01 01:00:00, t1)
(user1, 2018-12-01 01:01:30, t1)
(user1, 2018-12-01 01:06:00, t1)
(user1, 2018-12-01 01:20:00, t1)
(user1, 2018-12-01 01:24:00, t1)

可以統計出

PV=5,UV=1

過期時間閾值設定為5分鐘,以上資料應該統計出來2次Session,分別是:

Session1: (2018-12-01 01:00:00 到 2018-12-01 01:06:00),Duration:6分鐘
Session2: (2018-12-01 01:20:00 到 2018-12-01 01:24:00),Duration:4分鐘

實際處理時還要資料亂序的問題,尤其是在實時計算中,你想好怎樣做了嗎?

 

Scala程式碼實現

下面給出scala實現,來見證scala的強大:

scala核心程式碼(一步foldLeft)

scala

  val expireInSecond = 300
  def mergeTimeArray(arr1 : ArrayBuffer[(Long, Long)], arr2 : ArrayBuffer[(Long, Long)]) : ArrayBuffer[(Long, Long)] 
= { if (arr1.head._1.equals(0l)) arr2 else if (arr2.head._1.equals(0l)) arr1 else (arr1 ++ arr2).sortBy(_._1).foldLeft(ArrayBuffer[(Long, Long)]())((result, item) => if (!result.isEmpty && result.last._2 + expireInSecond >= item._1) {result.update(result.length - 1, (result.last._1, math.max(result.last._2, item._2))); result} else result += item) }

spark核心程式碼(2步map 1步aggregateByKey)

scala

  /**
    * @param data (user, timestamp, target)
    * @return (user, target, session_count, session_duration)
    */
  def process(data : RDD[(String, Long, String)]) : RDD[(String, String, Integer, Double)] = {
    //((user, target), timestamp)
    data.map(item => ((item._1, item._3), item._2))
      //((user, target), Array[(startTime, endTime)])
      .aggregateByKey(ArrayBuffer((0l, 0l)))((result : ArrayBuffer[(Long, Long)], timestamp: Long) => mergeTimeArray(result, ArrayBuffer((timestamp, timestamp))), (result1 : ArrayBuffer[(Long, Long)], result2 : ArrayBuffer[(Long, Long)]) => mergeTimeArray(result1, result2))
      //(user, target, session_count, session_duration)
      .map(item => (item._1._1, item._1._2, item._2.length, item._2.foldLeft(0l)((result, item) => result + (item._2 - item._1)).toDouble / item._2.length))
  }

測試執行

  def main(args : Array[String]) : Unit = {
    val conf = new SparkConf().setAppName("UserAnalysis").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val arr = Array(("user1", 1546054000l, "t1"), ("user1", 1546054090l, "t1"), ("user1", 1546054360l, "t1"), ("user1", 1546055200l, "t1"), ("user1", 1546055440l, "t1"))
    //(user, timestamp, target)
    val data : RDD[(String, Long, String)] = sc.parallelize(arr)
    this.process(data).foreach(println)
  }

輸出

(user1,t1,2,300.0)