【原創】案例分享(3)使用者行為分析--見證scala的強大
場景分析
使用者行為分析應用的場景很多,像線上網站訪問統計,線下客流分析(比如影象人臉識別、wifi探針等),比較核心的指標有幾個:
PV | UV | SD | SC
指標說明:
PV(Page View):網站瀏覽量或者商場門店的訪問量
UV(Unique Visitor):獨立訪客數,即去重後的人數
SD(Session Duration):單次會話停留時間
SC(Session Count):會話次數
使用者行為分析的原始資料通常是一系列時間離散資料,比如網站訪問記錄:使用者在一個時間點訪問了一個網頁,然後又在下個時間點訪問了下個網頁;
這些原始資料可以抽象為:
User | Timestamp | Target
即使用者在什麼時間點訪問了什麼目標;
統計PV、UV比較簡單,但是在時間離散資料的基礎上,要計算SD、SC這兩個指標,常用的方式是設定過期時間閾值,如果使用者兩次訪問的時間間隔超過閾值,則認為是兩次Session;然後在一次Session的所有資料中取時間最早和最晚的資料來統計本次Session Duration;
統計示例
輸入資料
(user1, 2018-12-01 01:00:00, t1)
(user1, 2018-12-01 01:01:30, t1)
(user1, 2018-12-01 01:06:00, t1)
(user1, 2018-12-01 01:20:00, t1)
(user1, 2018-12-01 01:24:00, t1)
可以統計出
PV=5,UV=1
過期時間閾值設定為5分鐘,以上資料應該統計出來2次Session,分別是:
Session1: (2018-12-01 01:00:00 到 2018-12-01 01:06:00),Duration:6分鐘
Session2: (2018-12-01 01:20:00 到 2018-12-01 01:24:00),Duration:4分鐘
實際處理時還要資料亂序的問題,尤其是在實時計算中,你想好怎樣做了嗎?
Scala程式碼實現
下面給出scala實現,來見證scala的強大:
scala核心程式碼(一步foldLeft)
scala
val expireInSecond = 300 def mergeTimeArray(arr1 : ArrayBuffer[(Long, Long)], arr2 : ArrayBuffer[(Long, Long)]) : ArrayBuffer[(Long, Long)]= { if (arr1.head._1.equals(0l)) arr2 else if (arr2.head._1.equals(0l)) arr1 else (arr1 ++ arr2).sortBy(_._1).foldLeft(ArrayBuffer[(Long, Long)]())((result, item) => if (!result.isEmpty && result.last._2 + expireInSecond >= item._1) {result.update(result.length - 1, (result.last._1, math.max(result.last._2, item._2))); result} else result += item) }
spark核心程式碼(2步map 1步aggregateByKey)
scala
/** * @param data (user, timestamp, target) * @return (user, target, session_count, session_duration) */ def process(data : RDD[(String, Long, String)]) : RDD[(String, String, Integer, Double)] = { //((user, target), timestamp) data.map(item => ((item._1, item._3), item._2)) //((user, target), Array[(startTime, endTime)]) .aggregateByKey(ArrayBuffer((0l, 0l)))((result : ArrayBuffer[(Long, Long)], timestamp: Long) => mergeTimeArray(result, ArrayBuffer((timestamp, timestamp))), (result1 : ArrayBuffer[(Long, Long)], result2 : ArrayBuffer[(Long, Long)]) => mergeTimeArray(result1, result2)) //(user, target, session_count, session_duration) .map(item => (item._1._1, item._1._2, item._2.length, item._2.foldLeft(0l)((result, item) => result + (item._2 - item._1)).toDouble / item._2.length)) }
測試執行
def main(args : Array[String]) : Unit = { val conf = new SparkConf().setAppName("UserAnalysis").setMaster("local[2]") val sc = new SparkContext(conf) val arr = Array(("user1", 1546054000l, "t1"), ("user1", 1546054090l, "t1"), ("user1", 1546054360l, "t1"), ("user1", 1546055200l, "t1"), ("user1", 1546055440l, "t1")) //(user, timestamp, target) val data : RDD[(String, Long, String)] = sc.parallelize(arr) this.process(data).foreach(println) }
輸出
(user1,t1,2,300.0)