1. 程式人生 > >Spark SQL 分析 Nginx 訪問日誌

Spark SQL 分析 Nginx 訪問日誌

前言

專案地址

github: Spark SQL 分析 Imooc 訪問日誌

環境說明

  • Java版本:1.8
  • Scala版本:2.11.12
  • Hadoop版本:hadoop-2.6.0-cdh5.14.0
  • spark版本:spark-2.3.1-bin-2.6.0-cdh5.14.0(自己編譯)
  • MySQL版本:5.7.22
  • zeppelin版本:0.8

資料集

Imooc 訪問日誌檔案:access.20161111.log

資料量:一千多萬條訪問日誌、5G多

日誌格式

60.165.39.1 - - [10/Nov/2016:00:01:53 +0800] "POST /course/ajaxmediauser HTTP/1.1" 200 54 "www.imooc.com" "http://www.imooc.com/code/1431" mid=1431&time=60 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36 SE 2.X MetaSr 1.0" "-" 10.100.136.64:80 200 0.014 0.014
14.145.74.175 - - [10/Nov/2016:00:01:53 +0800] "POST /course/ajaxmediauser/ HTTP/1.1" 200 54 "www.imooc.com" "http://www.imooc.com/video/678" mid=678&time=60&learn_time=551.5 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-" 10.100.136.64:80 200 0.014 0.014

百度雲盤下載地址:連結:https://pan.baidu.com/s/1VfOG14mGW4P4kj20nzKx8g 提取碼:uwjg

開發測試資料:access.1w.log(10000條)

需求

  • 統計某天最受歡迎的TopN課程
  • 統計某天各個省市各自的 TopN 課程
  • 按照流量進行統計 TopN 課程
  • 某天最受歡迎的文章
  • 某天進行code最多的課程
  • 統計某天最勤奮的 IP
  • 歡迎補充…

統計結果視覺化(zeppelin展示)

開發步驟

資料清洗

根據需求,從日誌中解析出我們需要的資訊,譬如可能有:

  1. 訪問的系統屬性: 作業系統、瀏覽器等等
  2. 訪問特徵:url、referer (從哪個url跳轉過來的)、頁面上的停留時間等
  3. 訪問資訊:session_id、訪問ip(訪問城市)等

主程式

  • SparkStatFormatJob.scala 第一步,清洗出 ip, time, url, traffic

  • SparkStatCleanJob.scala 第二步,最終清洗轉換得到 url、cmsType、cmsId、traffic、ip、city、time、day

  • AccessConvertUtil.scala 定義DataFrame schema,將日誌資訊轉為物件,幫助RDD轉為DataFrame

  • DateUtils.scala 時間格式轉換

步驟

  1. 使用 Spark SQL 解析訪問日誌
  2. 解析出課程編號型別
  3. 根據IP解析出城市名稱
  4. 使用 Spark SQL 將訪問時間按天進行分割槽輸出

關鍵程式碼

清洗第一步

accessFile.map(line => {
      val splits = line.split(" ") // 按空格分割
      val ip = splits(0) // 第一個是IP
      // 原始日誌的第三個和第四個欄位拼接起來就是完整的訪問時間: [10/Nov/2016:00:01:02 +0800] ==> yyyy-MM-dd HH:mm:ss
      val time = splits(3) + " " + splits(4)
      val url = splits(11).replaceAll("\"", "") // 第11個是 URL
      val traffic = splits(9) // 第9個是流量
      List(DateUtils.parse(time), url, traffic, ip)
    })
      // 過濾
      .filter(item => !"10.100.0.1".equals(item(3)))
      .filter(item => !"-".equals(item(1)))
      // 拼成一個物件 (DateUtils.parse(time), url, traffic, ip)
      .map(item => item(0) + "\t" + item(1) + "\t" + item(2) + "\t" + item(3))
      // 儲存
      .saveAsTextFile(Constants.protocol + Constants.tempOut)

清洗第二步

val filterRDD = accessRDD.map(line => AccessConvertUtil.parseLog(line))
val accessDF = spark.createDataFrame(filterRDD, AccessConvertUtil.struct)

// 儲存到 parquet
accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(Constants.protocol + Constants.cleanedOut)

清洗結果樣例


+--------------------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|url                                         |cmsType|cmsId|traffic|ip             |city|time               |day     |
+--------------------------------------------+-------+-----+-------+---------------+----+-------------------+--------+
|http://www.imooc.com/code/1852              |code   |1852 |2345   |117.35.88.11   |陝西省 |2016-11-10 00:01:02|20161110|
|http://www.imooc.com/learn/85/?src=360onebox|learn  |85   |14531  |115.34.187.133 |北京市 |2016-11-10 00:01:27|20161110|
|http://www.imooc.com/course/list?c=fetool   |course |0    |66     |120.198.231.151|廣東省 |2016-11-10 00:01:27|20161110|
|http://www.imooc.com/code/10047             |code   |10047|54     |101.36.73.155  |北京市 |2016-11-10 00:01:27|20161110|
+--------------------------------------------+-------+-----+-------+---------------+----+-------------------+--------+

Spark SQL 統計 TopN

主程式

  • TopNStatJob.scala Spark SQL 統計主類
  • StatDao.scala 將各個統計作業的統計結果寫到資料庫
  • MySQLUtils.scala 管理 MySQL JDBC 連線

關鍵程式碼

/**
  * 統計某天各個省市各自的 TopN 課程
  */
def cityAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {
  import spark.implicits._
  val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video" && $"cmsId" =!= "0")
    .groupBy("city", "day", "cmsId")
    .agg(count("cmsId").as("times"))

  // Window 函式在Spark SQL的使用: 視窗函式 row_number 的作用是根據表中欄位進行分組,然後根據表中的欄位排序,
  //  給組中的每條記錄新增一個序號;且每組的序號都是從1開始,可利用它的這個特性進行分組取top-n
  val top3DF = cityAccessTopNDF.select(
    cityAccessTopNDF("day"), cityAccessTopNDF("city"),
    cityAccessTopNDF("cmsId"), cityAccessTopNDF("times"),
    row_number().over(Window.partitionBy(cityAccessTopNDF("city")) // 根據 city 分組,根據 times 降序排序
      .orderBy(cityAccessTopNDF("times").desc)).as("times_rank")
  ).filter("times_rank <= 3")

  // 儲存到 MySQL,需建立結果表 day_video_city_access_topn_stat
  try {
    top3DF.foreachPartition(partition => {
      val list = new ListBuffer[DayCityVideoAccessStat]

      partition.foreach(item => {
        val day = item.getAs[String]("day")
        val cmsId = item.getAs[Long]("cmsId")
        val city = item.getAs[String]("city")
        val times = item.getAs[Long]("times")
        val timesRank = item.getAs[Int]("times_rank")
        list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
      })
      StatDao.insertDayCityVideoAccessTopN(list)
    })
  } catch {
    case e: Exception => e.printStackTrace()
  }
}

統計結果樣例

+--------+-------+-----+-----+----------+
|day     |city   |cmsId|times|times_rank|
+--------+-------+-----+-----+----------+
|20161110|北京市    |1309 |20   |1         |
|20161110|北京市    |3369 |16   |2         |
|20161110|北京市    |4018 |15   |3         |
|20161110|遼寧省    |1336 |2    |1         |
|20161110|遼寧省    |9028 |1    |2         |
|20161110|遼寧省    |8141 |1    |3         |
|20161110|浙江省    |3078 |19   |1         |
|20161110|浙江省    |12552|16   |2         |
|20161110|浙江省    |3237 |14   |3         |
+--------+-------+-----+-----+----------+

專案開發說明

1、 CDH相關的軟體下載地址:http://archive.cloudera.com/cdh5/cdh/5/,spark自己編譯的,看官方文件即可

2、IDEA需要安裝Scala外掛

3、 Windows上開發需解壓Hadoop和spark原始碼,然後在環境變數中配置HADOOP_HOME和SPARK_HOME

4、 windows上需下載相應版本的 winutils.exe 檔案放到 $HADOOP_HOME/bin

5、 解析IP地址使用 ipdatabase ,三個步驟:

1)git clone https://github.com/wzhe06/ipdatabase.git
2)編譯下載的專案:mvn clean package -DskipTests
3)安裝jar包到自己的maven倉庫
mvn install:install-file -Dfile=/home/whirly/source/ipdatabase/target/ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

6、 需要建立相應的資料庫和資料表,用於儲存統計結果,具體的表結構見 imooc_log.sql ,Navicat 匯入MySQL即可,建立庫表完畢後須修改 MySQLUtils.scala 中的配置資訊

7、 zeppelin 可匯入 最受歡迎的TopN課程.json 檔案檢視結果,也可以使用視覺化方案,譬如echarts、highcharts、D3.js、HUE等等…


更多內容可訪問我的個人部落格:http://laijianfeng.org
關注【小旋鋒】微信公眾號,及時接收博文推送
原文地址:Spark SQL 分析 Imooc 訪問日誌

長按關注【小旋鋒】微信公眾號