1. 程式人生 > >使用SparkSQL 分析日誌中IP數、流量等資料

使用SparkSQL 分析日誌中IP數、流量等資料

寫在前面

前面文章中,我們使用Spark RDD從非結構化的日誌檔案中分析出了訪問獨立IP數,單個視訊訪問獨立IP數和每時CDN流量,這篇文章主要介紹使用Spark SQL從結構化的資料中完成這些資料的分析,如下圖所有,先將日誌檔案結構化成csv檔案,此檔案可從原始碼cdn.csv中獲取

結構化資料

Pom檔案中新增SparkSQL依賴

          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId
>
<version>2.0.0</version> </dependency>

建立SparkSession物件

    //建立sparkSession
    val sparkSession = SparkSession.builder
      .config("spark.sql.warehouse.dir", "D:\\WorkSpace\\spark\\spark-learning\\spark-warehouse")
        .master("local")
      .appName("spark session example"
) .getOrCreate()

載入結構化資料

    //獲取檔案路徑
    val path=sqlWordCount.getClass.getClassLoader.getResource("cdn.csv").getPath
    //讀取檔案
    val df = sparkSession.read.csv(path)
    //將載入的資料臨時命名為log
    df.createOrReplaceTempView("log")

計算獨立IP總數和每個IP訪問數

程式碼

    val allIpCountSQL="select count(DISTINCT _c1)   from log "
val ipCountSQL="select _c1 as IP,count(_c1) as ipCount from log group by _c1 order by ipCOunt desc limit 10" //查詢獨立IP總數 sparkSession.sql(allIpCountSQL).foreach(row=>println("獨立IP總數:"+row.get(0))) //檢視IP數前10 sparkSession.sql(ipCountSQL).foreach(row=>println("IP:"+row.get(0)+" 次數:"+row.get(1)))

上面這段程式碼就是簡單的資料庫SQL統計查詢,好像比前面使用RDD計算簡單多了

結果

獨立ID總數:21012
IP:114.55.227.102 次數:481
IP:114.55.25.11 次數:481
IP:115.236.173.95 次數:378
IP:27.18.175.140 次數:333
IP:115.201.129.102 次數:288
IP:39.190.84.175 次數:277
IP:125.122.240.71 次數:258
IP:115.236.173.94 次數:257
IP:114.55.109.239 次數:231
IP:183.129.67.106 次數:223

計算每個視訊獨立IP總數

程式碼

   //查詢每個視訊獨立IP數
    val videoIpCount="select _c0,count(DISTINCT _c1) as count from log  group by _c0 order by count desc  limit 10 "
    sparkSession.sql(videoIpCount).foreach(row=>println("IP:"+row.get(0)+" 次數:"+row.get(1)))

看了這個程式碼,感覺也是很簡單的,就是按視訊ID分組,再統計每個分組中不同IP的數量

結果

視訊ID:149356 次數:3958
視訊ID:149064 次數:3885
視訊ID:149349 次數:1938
視訊ID:149341 次數:1631
視訊ID:149344 次數:1334
視訊ID:149328 次數:1237
視訊ID:89973 次數:945
視訊ID:149339 次數:826
視訊ID:149345 次數:578
視訊ID:149327 次數:545

計算每個小時CDN流量

計算思路

這裡面主要有一個時間段的問題,不然和上面的都是一樣 groupby 一下再 sum 一下就OK了,不過日誌中記錄的是Unix時間戳,只能按秒去分組統計每秒的流量,我們要按每小時分組去統計,所以核心就是將時間戳轉化成小時,總體過程如下
1. 通過SQL查出時間和大小
2. 將結果中的時間轉成小時
3. 將時間格式化好後的RDD轉成DataFrame,用於SQL查詢
4. 通過SQL按小時分組查出結果

程式碼


      def  getHour(time:String)={
        val date=new Date(Integer.valueOf(time)*1000);
        val sf=new SimpleDateFormat("HH");
        sf.format(date)
      }

    //查詢每個小時視訊流量
    val hourCdnSQL="select _c4,_c8 from log "
    //取出時間和大小將格式化時間,RDD中格式為 (小時,大小)
    val dataRdd= sparkSession.sql(hourCdnSQL).rdd.map(row=>Row(getHour(row.getString(0)),java.lang.Long.parseLong(row.get(1).toString)))

    val schema=StructType(
      Seq(
        StructField("hour",StringType,true)
        ,StructField("size",LongType,true)
      )
    )

    //將dataRdd轉成DataFrame
    val peopleDataFrame = sparkSession.createDataFrame(dataRdd,schema)
    peopleDataFrame.createOrReplaceTempView("cdn")
    //按小時分組統計
    val results = sparkSession.sql("SELECT hour , sum(size) as size  FROM cdn group by hour  order by hour ")
    results.foreach(row=>println(row.get(0)+"時 流量:"+row.getLong(1)/(1024*1024*1024)+"G"))

結果

00時 流量:4G
01時 流量:12G
02時 流量:18G
03時 流量:23G
04時 流量:26G
05時 流量:28G
06時 流量:23G
07時 流量:22G
08時 流量:4G
09時 流量:20G
10時 流量:24G
11時 流量:29G
12時 流量:36G
13時 流量:33G
14時 流量:29G
15時 流量:34G
16時 流量:42G
17時 流量:39G
18時 流量:29G
19時 流量:22G
20時 流量:27G
21時 流量:6G
22時 流量:4G
23時 流量:3G

完整專案程式碼及資料