1. 程式人生 > >基於SparkSQL的網站日誌分析實戰

基於SparkSQL的網站日誌分析實戰

基於SparkSQL的網站日誌分析實戰

使用者行為日誌概述

          使用者行為日誌:使用者每次訪問網站時所有的行為資料(訪問、瀏覽、搜尋、點選...)
                                   使用者行為軌跡、流量日誌
為什麼要記錄使用者訪問行為日誌
網站頁面的訪問量
網站的黏性
推薦
           2.使用者行為日誌生成渠道

Nginx
Ajax
3.使用者行為日誌內容

日誌資料內容:
1)訪問的系統屬性: 作業系統、瀏覽器等等
2)訪問特徵:點選的url、從哪個url跳轉過來的(referer)、頁面上的停留時間等
3)訪問資訊:session_id、訪問ip(訪問城市)等
日誌資訊如下所示:
2013-05-19 13:00:00     http://www.taobao.com/17/?tracker_u=1624169&type=1      B58W48U4WKZCJ5D1T3Z9ZY88RU7QA7B1        http://hao.360.cn/      1.196.34.243   
4.使用者行為日誌分析的意義
網站的眼睛(訪問者來自什麼地方,找什麼東西,那些頁面最受歡迎,訪問者的入口地址是什麼等)
網站的神經(網站佈局是否合理,導航層次是否清晰,功能是否存在問題,轉換路徑是否靠譜)
網站的大腦(如何分析目標,如何分配廣告預算(廣告推廣))
離線資料處理架構

資料處理流程
1)資料採集
Flume: web日誌寫入到HDFS


2)資料清洗
髒資料
Spark、Hive、MapReduce 或者是其他的一些分散式計算框架  
清洗完之後的資料可以存放在HDFS(Hive/Spark SQL)


3)資料處理
按照我們的需要進行相應業務的統計和分析
Spark、Hive、MapReduce 或者是其他的一些分散式計算框架


4)處理結果入庫
結果可以存放到RDBMS、NoSQL


5)資料的視覺化
通過圖形化展示的方式展現出來:餅圖、柱狀圖、地圖、折線圖
ECharts、HUE、Zeppelin
離線資料處理架構

專案需求

需求一:統計imooc主站最受歡迎的課程/手記的Top N訪問次數
需求二:按地市統計imooc主站最受歡迎的Top N課程
(1).根據IP地址提取出城市資訊
(2).視窗函式在Spark SQL中的使用
按流量統計imooc主站最受歡迎的Top N課程
功能實現

          網站主站日誌介紹
訪問時間
訪問URL
訪問過程耗費流量
訪問IP地址
資料清洗
原始日誌
 10.100.0.1 - - [10/Nov/2016:00:01:02 +0800] "HEAD / HTTP/1.1" 301 0 "117.121.101.40" "-" - "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.16.2.3 Basic ECC zlib/1.2.3 libidn/1.18 libssh2/1.4.2" "-" - - - 0.000
 第一次清洗後(訪問時間 主站URL耗費的流量ip地址)
2017-05-11 15:07:17     http://www.imooc.com/video/14322        245     202.96.134.133
2017-05-11 06:52:31     http://www.imooc.com/article/17891      535     222.129.235.182
2017-05-11 18:46:43     http://www.imooc.com/article/17898      807     218.75.35.226
程式碼實現參見gitee地址【https://gitee.com/robei/SparkSQLProject/blob/master/src/main/scala/com/imooc/log/SparkStatFormatJob.scala】
第二次資料清洗
輸入:訪問時間、訪問URL、耗費的流量、訪問IP地址資訊
輸出:URL、cmsType(video/article)、cmsId(編號)、流量、ip、城市資訊、訪問時間、天
程式碼實現參見gitee地址【https://gitee.com/robei/SparkSQLProject/blob/master/src/main/scala/com/imooc/log/SparkStatCleanJob.scala】

根據ip地址解析城市資訊

使用github上已有的開源專案
1)git clone https://github.com/wzhe06/ipdatabase.git
2)編譯下載的專案:mvn clean package -DskipTests
3)安裝jar包到自己的maven倉庫
mvn install:install-file -Dfile=~/source/ipdatabase/target/ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

資料清洗儲存到目標地址() 

【https://gitee.com/robei/SparkSQLProject/blob/e60788c9e0f3c2ffc446d9aa8acaa5a66ac006fc/src/main/scala/com/imooc/log/SparkStatCleanJob.scala】
需求一的實現:統計主站最受歡迎的課程/手記的Top N訪問次數
/**
    * 需求一:主站最受歡迎的TopN課程統計
    *
    * @param spark
    * @param cleanDF
    */
  def videoAccessTopNStat(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
 
    //------------------使用DataFrame API完成統計操作--------------------------------------------
    /* import spark.implicits._
     val videoAccessTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
       .groupBy("day","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)*/
 
    //    videoAccessTopNDF.printSchema()
    /**
      * root
      * |-- day: string (nullable = true)
      * |-- cmsId: long (nullable = true)
      * |-- times: long (nullable = false)
      */
 
    //    videoAccessTopNDF.show(10,false)
    /**
      * +--------+-----+------+
      * |day     |cmsId|times |
      * +--------+-----+------+
      * |20170511|14540|111027|
      * |20170511|4000 |55734 |
      * |20170511|14704|55701 |
      * |20170511|14390|55683 |
      * |20170511|14623|55621 |
      * |20170511|4600 |55501 |
      * |20170511|4500 |55366 |
      * |20170511|14322|55102 |
      * +--------+-----+------+
      */
 
    //-------------------------使用SQL API完成操作-------------------------
    cleanDF.createOrReplaceTempView("access_logs")
    //建立臨時表 access_logs
    val videoAccessTopNDF = spark.sql("select day,cmsId,count(1) as times from access_logs" +
      " where day="+day+" and cmsType='video'" +
      " group by day,cmsId order by times desc")
 
    videoAccessTopNDF.show(10, false)
    /**
      * +--------+-----+------+
      * |day     |cmsId|times |
      * +--------+-----+------+
      * |20170511|14540|111027|
      * |20170511|4000 |55734 |
      * |20170511|14704|55701 |
      * |20170511|14390|55683 |
      * |20170511|14623|55621 |
      * |20170511|4600 |55501 |
      * |20170511|4500 |55366 |
      * |20170511|14322|55102 |
      * +--------+-----+------+
      */
    //-------------------將統計結果寫入資料庫-------------------
    try {
      videoAccessTopNDF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayVideoAccessStat]
 
        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val times = info.getAs[Long]("times")
 
          list.append(DayVideoAccessStat(day, cmsId, times))
        })
        StatDAO.insertDayVideoAccessTopN(list)
      })
    }catch {
      case e:Exception => e.printStackTrace()
    }
    /**
      * 在mysql中建立day_video_access_topn_stat,主站最受歡迎的Top N課程
      *  create table day_video_access_topn_stat (
      *      day varchar(8) not null,
      *      cms_id bigint(10) not null,
      *      times bigint(10) not null,
      *      primary key (day, cms_id)
      *  );
      */
  }
需求二:按地市統計主站最受歡迎的Top N課程
/**
    * 需求二:按地市統計主站最受歡迎的Top N課程
    * @param spark
    * @param cleanDF
    */
  def cityAccessTopSata(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
    //------------------使用DataFrame API完成統計操作--------------------------------------------
     import spark.implicits._
 
     val cityAccessTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
       .groupBy("day","city","cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
 
//    cityAccessTopNDF.printSchema()
    /**
      * root
      *    |-- day: string (nullable = true)
      *    |-- city: string (nullable = true)
      *    |-- cmsId: long (nullable = true)
      *    |-- times: long (nullable = false)
      */
 
//    cityAccessTopNDF.show(false)
    /**
      *  +--------+----+-----+-----+
      *  |day     |city|cmsId|times|
      *  +--------+----+-----+-----+
      *  |20170511|浙江省 |14540|22435|
      *  |20170511|北京市 |14540|22270|
      *  |20170511|安徽省 |14540|22149|
      *  |20170511|廣東省 |14540|22115|
      *  |20170511|上海市 |14540|22058|
      *  |20170511|北京市 |4600 |11271|
      *  |20170511|安徽省 |14390|11229|
      *  |20170511|廣東省 |14623|11226|
      *  |20170511|上海市 |14704|11219|
      *  |20170511|廣東省 |14704|11216|
      *  |20170511|廣東省 |4600 |11215|
      *  |20170511|上海市 |4000 |11182|
      *  |20170511|北京市 |14390|11175|
      *  |20170511|廣東省 |4000 |11169|
      *  |20170511|上海市 |4500 |11167|
      *  |20170511|安徽省 |14704|11162|
      *  |20170511|北京市 |4000 |11156|
      *  |20170511|浙江省 |14322|11151|
      *  |20170511|上海市 |14623|11149|
      *  |20170511|廣東省 |4500 |11136|
      *  +--------+----+-----+-----+
      */
    //-----------Window函式在Spark SQL中的使用--------------------
    val cityTop3DF = cityAccessTopNDF.select(
      cityAccessTopNDF("day"),
      cityAccessTopNDF("city"),
      cityAccessTopNDF("cmsId"),
      cityAccessTopNDF("times"),
      row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
        .orderBy(cityAccessTopNDF("times").desc)).as("times_rank")
    ).filter("times_rank <= 3").orderBy($"city".desc,$"times_rank".asc)
 
    cityTop3DF.show(false)//展示每個地市的Top3
    /**
      *    +--------+----+-----+-----+----------+
      *    |day     |city|cmsId|times|times_rank|
      *    +--------+----+-----+-----+----------+
      *    |20170511|浙江省 |14540|22435|1         |
      *    |20170511|浙江省 |14322|11151|2         |
      *    |20170511|浙江省 |14390|11110|3         |
      *    |20170511|廣東省 |14540|22115|1         |
      *    |20170511|廣東省 |14623|11226|2         |
      *    |20170511|廣東省 |14704|11216|3         |
      *    |20170511|安徽省 |14540|22149|1         |
      *    |20170511|安徽省 |14390|11229|2         |
      *    |20170511|安徽省 |14704|11162|3         |
      *    |20170511|北京市 |14540|22270|1         |
      *    |20170511|北京市 |4600 |11271|2         |
      *    |20170511|北京市 |14390|11175|3         |
      *    |20170511|上海市 |14540|22058|1         |
      *    |20170511|上海市 |14704|11219|2         |
      *    |20170511|上海市 |4000 |11182|3         |
      *    +--------+----+-----+-----+----------+
      */
    //-------------------將統計結果寫入資料庫-------------------
    try {
      cityTop3DF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayCityVideoAccessStat]
 
        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val city = info.getAs[String]("city")
          val times = info.getAs[Long]("times")
          val timesRank = info.getAs[Int]("times_rank")
 
          list.append(DayCityVideoAccessStat(day, cmsId,city, times,timesRank))
        })
        StatDAO.insertDayCityVideoAccessTopN(list)
      })
    }catch {
      case e:Exception => e.printStackTrace()
    }
    /**
      *   create table day_video_city_access_topn_stat (
      *      day varchar(8) not null,
      *      cms_id bigint(10) not null,
      *      city varchar(20) not null,
      *      times bigint(10) not null,
      *      times_rank int not null,
      *      primary key (day, cms_id, city)
      *   );
      */
  }
需求三:按照流量統計主站最受歡迎的Top N課程
 /**
    * 需求三:按照流量統計主站最受歡迎的Top N課程
    * @param spark
    * @param cleanDF
    */
  def videoTraffsTopStat(spark: SparkSession, cleanDF: DataFrame, day:String): Unit = {
    //------------------使用DataFrame API完成統計操作--------------------------------------------
    import spark.implicits._
 
    val trafficsTopNDF = cleanDF.filter($"day" === day && $"cmsType" === "video")
      .groupBy("day","cmsId").agg(sum("traffic").as("traffics")).orderBy($"traffics".desc)
 
     trafficsTopNDF.show()
    /**
      *  +--------+-----+--------+
      *  |     day|cmsId|traffics|
      *  +--------+-----+--------+
      *  |20170511|14540|55454898|
      *  |20170511|14390|27895139|
      *  |20170511| 4500|27877433|
      *  |20170511| 4000|27847261|
      *  |20170511|14623|27822312|
      *  |20170511| 4600|27777838|
      *  |20170511|14704|27737876|
      *  |20170511|14322|27592386|
      *  +--------+-----+--------+
      */
    //-------------------將統計結果寫入資料庫-------------------
    try {
      trafficsTopNDF.foreachPartition(partitionOfRecords => {
        val list = new ListBuffer[DayVideoTrafficsStat]
 
        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val traffics = info.getAs[Long]("traffics")
 
          list.append(DayVideoTrafficsStat(day, cmsId,traffics))
        })
        StatDAO.insertDayVideoTrafficsTopN(list)
      })
    }catch {
      case e:Exception => e.printStackTrace()
    }
    /**
      *  create table day_video_traffics_topn_stat (
      *      day varchar(8) not null,
      *      cms_id bigint(10) not null,
      *      traffics bigint(20) not null,
      *      primary key (day, cms_id)
      *   );
      */
  }
【https://gitee.com/robei/SparkSQLProject】