Spark SQL 分析 Nginx 訪問日誌
前言
專案地址
github: Spark SQL 分析 Imooc 訪問日誌
環境說明
- Java版本:1.8
- Scala版本:2.11.12
- Hadoop版本:hadoop-2.6.0-cdh5.14.0
- spark版本:spark-2.3.1-bin-2.6.0-cdh5.14.0(自己編譯)
- MySQL版本:5.7.22
- zeppelin版本:0.8
資料集
Imooc 訪問日誌檔案:access.20161111.log
資料量:一千多萬條訪問日誌、5G多
日誌格式:
60.165.39.1 - - [10/Nov/2016:00:01:53 +0800] "POST /course/ajaxmediauser HTTP/1.1" 200 54 "www.imooc.com" "http://www.imooc.com/code/1431" mid=1431&time=60 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36 SE 2.X MetaSr 1.0" "-" 10.100.136.64:80 200 0.014 0.014 14.145.74.175 - - [10/Nov/2016:00:01:53 +0800] "POST /course/ajaxmediauser/ HTTP/1.1" 200 54 "www.imooc.com" "http://www.imooc.com/video/678" mid=678&time=60&learn_time=551.5 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-" 10.100.136.64:80 200 0.014 0.014
百度雲盤下載地址:連結:https://pan.baidu.com/s/1VfOG14mGW4P4kj20nzKx8g 提取碼:uwjg
開發測試資料:access.1w.log(10000條)
需求
- 統計某天最受歡迎的TopN課程
- 統計某天各個省市各自的 TopN 課程
- 按照流量進行統計 TopN 課程
- 某天最受歡迎的文章
- 某天進行code最多的課程
- 統計某天最勤奮的 IP
- 歡迎補充…
統計結果視覺化(zeppelin展示)
開發步驟
資料清洗
根據需求,從日誌中解析出我們需要的資訊,譬如可能有:
- 訪問的系統屬性: 作業系統、瀏覽器等等
- 訪問特徵:url、referer (從哪個url跳轉過來的)、頁面上的停留時間等
- 訪問資訊:session_id、訪問ip(訪問城市)等
主程式
-
SparkStatFormatJob.scala 第一步,清洗出 ip, time, url, traffic
-
SparkStatCleanJob.scala 第二步,最終清洗轉換得到 url、cmsType、cmsId、traffic、ip、city、time、day
-
AccessConvertUtil.scala 定義DataFrame schema,將日誌資訊轉為物件,幫助RDD轉為DataFrame
-
DateUtils.scala 時間格式轉換
步驟
- 使用 Spark SQL 解析訪問日誌
- 解析出課程編號型別
- 根據IP解析出城市名稱
- 使用 Spark SQL 將訪問時間按天進行分割槽輸出
關鍵程式碼
清洗第一步
accessFile.map(line => {
val splits = line.split(" ") // 按空格分割
val ip = splits(0) // 第一個是IP
// 原始日誌的第三個和第四個欄位拼接起來就是完整的訪問時間: [10/Nov/2016:00:01:02 +0800] ==> yyyy-MM-dd HH:mm:ss
val time = splits(3) + " " + splits(4)
val url = splits(11).replaceAll("\"", "") // 第11個是 URL
val traffic = splits(9) // 第9個是流量
List(DateUtils.parse(time), url, traffic, ip)
})
// 過濾
.filter(item => !"10.100.0.1".equals(item(3)))
.filter(item => !"-".equals(item(1)))
// 拼成一個物件 (DateUtils.parse(time), url, traffic, ip)
.map(item => item(0) + "\t" + item(1) + "\t" + item(2) + "\t" + item(3))
// 儲存
.saveAsTextFile(Constants.protocol + Constants.tempOut)
清洗第二步
val filterRDD = accessRDD.map(line => AccessConvertUtil.parseLog(line))
val accessDF = spark.createDataFrame(filterRDD, AccessConvertUtil.struct)
// 儲存到 parquet
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(Constants.protocol + Constants.cleanedOut)
清洗結果樣例
+--------------------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|url |cmsType|cmsId|traffic|ip |city|time |day |
+--------------------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc.com/code/1852 |code |1852 |2345 |117.35.88.11 |陝西省 |2016-11-10 00:01:02|20161110|
|http://www.imooc.com/learn/85/?src=360onebox|learn |85 |14531 |115.34.187.133 |北京市 |2016-11-10 00:01:27|20161110|
|http://www.imooc.com/course/list?c=fetool |course |0 |66 |120.198.231.151|廣東省 |2016-11-10 00:01:27|20161110|
|http://www.imooc.com/code/10047 |code |10047|54 |101.36.73.155 |北京市 |2016-11-10 00:01:27|20161110|
+--------------------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
Spark SQL 統計 TopN
主程式
- TopNStatJob.scala Spark SQL 統計主類
- StatDao.scala 將各個統計作業的統計結果寫到資料庫
- MySQLUtils.scala 管理 MySQL JDBC 連線
關鍵程式碼
/**
* 統計某天各個省市各自的 TopN 課程
*/
def cityAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {
import spark.implicits._
val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video" && $"cmsId" =!= "0")
.groupBy("city", "day", "cmsId")
.agg(count("cmsId").as("times"))
// Window 函式在Spark SQL的使用: 視窗函式 row_number 的作用是根據表中欄位進行分組,然後根據表中的欄位排序,
// 給組中的每條記錄新增一個序號;且每組的序號都是從1開始,可利用它的這個特性進行分組取top-n
val top3DF = cityAccessTopNDF.select(
cityAccessTopNDF("day"), cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"), cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city")) // 根據 city 分組,根據 times 降序排序
.orderBy(cityAccessTopNDF("times").desc)).as("times_rank")
).filter("times_rank <= 3")
// 儲存到 MySQL,需建立結果表 day_video_city_access_topn_stat
try {
top3DF.foreachPartition(partition => {
val list = new ListBuffer[DayCityVideoAccessStat]
partition.foreach(item => {
val day = item.getAs[String]("day")
val cmsId = item.getAs[Long]("cmsId")
val city = item.getAs[String]("city")
val times = item.getAs[Long]("times")
val timesRank = item.getAs[Int]("times_rank")
list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
})
StatDao.insertDayCityVideoAccessTopN(list)
})
} catch {
case e: Exception => e.printStackTrace()
}
}
統計結果樣例
+--------+-------+-----+-----+----------+
|day |city |cmsId|times|times_rank|
+--------+-------+-----+-----+----------+
|20161110|北京市 |1309 |20 |1 |
|20161110|北京市 |3369 |16 |2 |
|20161110|北京市 |4018 |15 |3 |
|20161110|遼寧省 |1336 |2 |1 |
|20161110|遼寧省 |9028 |1 |2 |
|20161110|遼寧省 |8141 |1 |3 |
|20161110|浙江省 |3078 |19 |1 |
|20161110|浙江省 |12552|16 |2 |
|20161110|浙江省 |3237 |14 |3 |
+--------+-------+-----+-----+----------+
專案開發說明
1、 CDH相關的軟體下載地址:http://archive.cloudera.com/cdh5/cdh/5/,spark自己編譯的,看官方文件即可
2、IDEA需要安裝Scala外掛
3、 Windows上開發需解壓Hadoop和spark原始碼,然後在環境變數中配置HADOOP_HOME和SPARK_HOME
4、 windows上需下載相應版本的 winutils.exe 檔案放到 $HADOOP_HOME/bin
5、 解析IP地址使用 ipdatabase ,三個步驟:
1)git clone https://github.com/wzhe06/ipdatabase.git
2)編譯下載的專案:mvn clean package -DskipTests
3)安裝jar包到自己的maven倉庫
mvn install:install-file -Dfile=/home/whirly/source/ipdatabase/target/ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
6、 需要建立相應的資料庫和資料表,用於儲存統計結果,具體的表結構見 imooc_log.sql ,Navicat 匯入MySQL即可,建立庫表完畢後須修改 MySQLUtils.scala 中的配置資訊
7、 zeppelin 可匯入 最受歡迎的TopN課程.json 檔案檢視結果,也可以使用視覺化方案,譬如echarts、highcharts、D3.js、HUE等等…
更多內容可訪問我的個人部落格:http://laijianfeng.org
關注【小旋鋒】微信公眾號,及時接收博文推送
原文地址:Spark SQL 分析 Imooc 訪問日誌