Spark+Hbase 億級流量分析實戰( PV/UV )
作為一個百億級的流量實時分析統計系統怎麼能沒有PV /UV 這兩經典的超級瑪麗亞指標呢,話說五百年前它倆可是鼻祖,咳咳...,不好意思沒忍住,迴歸正文, 大豬 在上一篇已經介紹了 小巧高效能ETL程式設計與實現 了,到現在,我們的資料已經落地到Hbase 上了,而且日誌的時間也已經寫到 Mysql 了,萬事都已經具備了,接下來我們就要擼指標了,先從兩個經典的指標開始擼。
程式流程
我們先理一下整個程式的計算流程,請看大圖:

-
開始計算是我們的 Driver 程式入口
-
開始計算之前檢查監聽Redis 有沒有收到程式退出通知,如果有程式結束,否則往下執行
-
首先去查詢我們上篇文章的ETL loghub 日誌的進度的平均時間點
-
Switch 處是判斷loghub 的時間距離我們上次計算的指標時間是否相差足夠時間,一般定義為3分鐘時間之後,因為loghub 的時間會有少量的波動情況
-
不滿足則 Sleep 30秒,可以自己控制Sleep範圍。
-
滿足則計算
上次指標計算結束時間
~(loghub時間 - 3分鐘日誌波動)
-
計算完成更新指標結果並且更新指標計算時間,然後回到第 2 點。
程式實現
先從 DriverMain 入口開始擼起
//監聽redis退出訊息 while (appRunning) { val dbClient = new DBJdbc(props.getProperty("jdbcUrl")) //日誌offset val loghubTime = dbClient.query("loghub").toLocalDateTime.minusMinutes(3) //指標計算offset val indicatorTime =dbClient.query("indicator").toLocalDateTime //兩個時間相差(分) val betweenTimeMinutes = Duration.between(indicatorTime, loghubTime).toMinutes val format = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS") //相差足夠時間則進行指標執行,否則睡眠 if (betweenTimeMinutes >= 1) { app.run(spark, indicatorTime, loghubTime) //計算完成更新指標時間 dbClient.upsert(Map("offsetName" -> "indicator"), Update(sets = Map("time" -> loghubTime.toString)), "offset") } else { //讓我們的老大哥睡會,別太累了 TimeUnit.SECONDS.sleep(30) } } 複製程式碼
從註釋上看,整體思路還是比較清晰的。
接下來我們跟著往下看 run
裡面的方法做了什麼有意思的操作
conf.set(TableInputFormat.INPUT_TABLE, Tables.LOG_TABLE) conf.set("TableInputFormat.SCAN_ROW_START", start) conf.set("TableInputFormat.SCAN_ROW_START", end) val logDS = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat2], classOf[ImmutableBytesWritable], classOf[Result] ) .map(tp2 => HbaseUtil.resultToMap(tp2._2)) .map(map => { LogCase( //子case類,存放多種格式的時間 dt = DT( map.get("time").toLocalDateTimeStr(), map.get("time").toLocalDate().toString ), `type` = map.get("type"), aid = map.get("aid"), uid = map.get("uid"), tid = map.get("tid"), ip = map.get("ip") ) }).toDS() logDS.cache() logDS.createTempView("log") //各類指標 new PV().run() new UV().run() 複製程式碼
start
跟 end
就是上面傳下來需要查詢的日誌時間範圍
簡要說明:就是把Hbase的時間範圍資料轉成SparkSQL中的一張 log
表
在UV 跟PV 指標計算裡面就可以使用這張 log
表了
我們看看這兩個經典的指標裡面到底有什麼乾坤:
spark.sql( """ |SELECT |aid, |dt.date, |COUNT(1) as pv |FROM |log |GROUP BY |aid, |dt.date """.stripMargin) .rdd .foreachPartition(rows => { val props = PropsUtils.properties("db") val dbClient = new DBJdbc(props.getProperty("jdbcUrl")) rows.foreach(row => { dbClient.upsert( Map( "time" -> row.getAs[String]("date"), "aid" -> row.getAs[String]("aid") ), Update(incs = Map("pv" -> row.getAs[Long]("pv").toString)), "common_report" ) }) dbClient.close() }) 複製程式碼
哇然一看,大哥你這也寫得太簡單了吧
不就是一個普通的 PV 演算法,再加上分割槽 foreachPartition
操作把更到的每一行聚合的結果資料 upsert
到我們的 common_report
指標表
group by後面跟上要聚合的維度,以上是想統計每篇文章每天的PV
從這個方法我們就能推算出 common_report
長什麼樣了,至少有 time
+ aid
這兩個唯一索引欄位,還有pv這個欄位,預設值肯定是 0
百聞不如一見,看看錶的DDL 是不是這樣子:
create table common_report ( id bigint auto_increment primary key, aid bigint not null, pv int default 0 null, uv int default 0 null, time date not null, constraint common_report_aid_time_uindex unique (aid, time) ); 複製程式碼
果然一點都沒錯。

再看 dbClient.upsert 裡面大概也能猜到是實現了mysql的upsert功能,大概的sql就會生成下面格式:
INSERT INTO common_report (time, aid, pv) VALUES ('2019-03-26', '10000', 1) ON DUPLICATE KEY UPDATE pv = pv + 1; 複製程式碼
大豬那 UV 是怎麼實現咧?一個使用者在今天來過第一次之後再來就不能重複計算了噢。
大豬答:這個簡單簡單,可以使用 Redis
去重嘛,但是我們使用的都是 Hbase
了,還使用它做啥子咧,具體我們看一下 UV 裡面到底是如何實現的:
val logDS = spark.table("log").as(ExpressionEncoder[LogCase]) import spark.implicits._ logDS .mapPartitions(partitionT => { val hbaseClient = DBHbaseHelper.getDBHbase(Tables.CACHE_TABLE) val md5 = (log: LogCase) => MD5Hash.getMD5AsHex(s"${log.dt.date}|${log.aid}|${log.uid}|uv".getBytes) partitionT .grouped(Consts.BATCH_MAPPARTITIONS) .flatMap { tList => tList .zip(hbaseClient.incrments(tList.map(md5))) .map(tp2 => { val log = tp2._1 log.copy(ext = EXT(tp2._2)) }) } }).createTempView("uvTable") spark.sql( """ |SELECT |aid, |dt.date, |COUNT(1) as uv |FROM |uvTable |WHERE |ext.render = 1 |GROUP BY |aid, |dt.date """.stripMargin) .rdd .foreachPartition(rows => { val props = PropsUtils.properties("db") val dbClient = new DBJdbc(props.getProperty("jdbcUrl")) rows.foreach(row => { dbClient.upsert( Map( "time" -> row.getAs[String]("date"), "aid" -> row.getAs[String]("aid") ), Update(incs = Map("uv" -> row.getAs[Long]("uv").toString)), "common_report" ) }) dbClient.close() }) 複製程式碼
spark.sql 這裡跟PV一樣嘛,就是多了一句條件 ext.render = 1
,但是上面那一大堆是啥子咧?
大豬CACHE_TABLE 是什麼來的,是Hbase一張中間表,使用者存使用者UV標記的,建表語句如下,因為維度都是按天,所以我們TTL設計3天就可以了,兩天也可以。
create 'CACHE_FOR_TEST',{NAME => 'info',TTL => '3 DAYS',CONFIGURATION => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy','KeyPrefixRegionSplitPolicy.prefix_length'=>'2'},COMPRESSION=>'SNAPPY'},SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0'] 複製程式碼
那還有其它的呢?
莫慌莫慌, 大豬 這就慢慢解釋道:
val logDS = spark.table("log").as(ExpressionEncoder[LogCase]) 複製程式碼
上面這句的意思就是就是把log表給取出來,當然也可以通過引數傳遞。
下面的 mapPartitions
挺有意思的:
partitionT .grouped(1000) .flatMap { tList => tList .zip(hbaseClient.incrments(tList.map(md5))) .map(tp2 => { val log = tp2._1 log.copy(ext = EXT(tp2._2)) }) } 複製程式碼
實際上面是處理每個分割槽的資料,也就是轉換資料,我們每來一條資料就要去Hbase那 incrment
一次,返回來的結果就是 render ,使用者今天來多少次就 incrment
相應的次數。
那有什麼用?我直接從Hbase GET
取出資料,再判斷有沒有,如果沒有這個使用者就是今天第一次來,再把這個使用者 PUT
進Hbase打一個標記,so easy。
其實當初我們也是這麼做的,後面發現業務的東西還是放在SQL裡面一起寫比較好,容易維護,而且incrment好處多多,因為它是帶事務的,可以多執行緒進行修改。
而且你們也發現了 GET
跟 PUT
是兩次請求操作,保證不了事務的,指標幾千萬的資料少了那麼幾條,你們都不知道我當初找它們有辛苦。

你們有沒有發現 render = 1
的時候是代表UV(剛好等於1的時候為什麼是UV?這裡大家要慢慢地品嚐一下了,其實就是實現了 GET
跟 PUT
操作),如果 render = 2
的時候又可以代表今天來過兩次以上的使用者指標,隨時擴充套件,就問你擼這樣的程式碼結構爽不爽?

看看 incrments 方法實現了啥子
def incrments(incs: Seq[String], family: String = "info", amount: Int = 1): Seq[Long] = { if (incs.isEmpty) { Seq[Long]() } else { require(incs.head.length == 32, "pk require 32 length") val convertIncs = incs map { pk => new Increment(Bytes.toBytes(pk.take(8))).addColumn(Bytes.toBytes(family), Bytes.toBytes(pk.takeRight(24)), amount) } val results = new Array[Object](convertIncs.length) table.batch(convertIncs.asJava, results) results.array.indices.map( ind => Bytes.toLong( results(ind) .asInstanceOf[Result] .getValue( Bytes.toBytes(family), Bytes.toBytes(incs(ind).takeRight(24)) ) ) ) } } 複製程式碼
這個方法就是實現了 incrment 的批量處理,因為我們在線上生產環境的時候測試過,批量處理比單條處理效能高了上百倍,所以這也就是為什麼要寫在 mapPartitions
裡面的原因了,因為只有在這個方法裡面才有批量資料轉換操作, foreachPartition
是批量處理操作, foreach
,與 map
是一條一條操作不能使用,我們在輸出報表到Mysql的地方已經用到過了。
大豬不知不覺已經寫了那麼長的文章了

關閉計算程式只需要給redis發一條stop訊息就可以啦
RedisUtil().getResource.publish("computeListenerMessage", "stop") 複製程式碼
不能再複製程式碼了,不能顯得文章是靠程式碼撐起來的。

福利 完整專案原始碼
