1. 程式人生 > >大資料學習筆記(spark日誌分析案例)

大資料學習筆記(spark日誌分析案例)

前提:500w條記錄環境下(可以更多,視計算機效能而定),統計每天最熱門的top3板塊。

1、PV和UV

我們要統計的是最熱門的top3板塊,而熱門如果只是簡單地通過頁面瀏覽量(PV)或者使用者瀏覽量(UV)來決定都顯得比較片面,這裡我們綜合這兩者(0.3PV+0.7UV)來獲取我們的需求。

1.1、PV

PageView:瀏覽量。(有幾次瀏覽就算幾次)   在這裡插入圖片描述

1.2、UV

UserView:使用者量。(同一個使用者同一天瀏覽一個模組多次,只能算一次)   在這裡插入圖片描述 1.3、PV+UV

通過上面的分析已經解釋了PV和UV的含義,以及獲取這兩個值的具體操作思路。下面探討一下,如何在這兩個值的基礎上,求出每天最熱門的top3板塊。

按照前面的操作已經獲得了兩個RDD,PVRDD、UVRDD。在這兩個RDD上使用join連線,在join運算元裡面通過(0.3PV+0.7UV)可以獲得每天的各個模組的一個熱度值。將這個值排序。取前三名,就是我們要求的每天最熱top3板塊了。 在這裡插入圖片描述

2、生成資料

由於沒有獲取大量資料的條件,這裡我們通過程式碼自己製造一部分資料來進行相關操作。我模仿的資料結構是:UUID  使用者id  時間戳  頁面id   模組名(中間用\t製表符分隔)

package com.hpe.data;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.UUID;

public class MakeLogData {
	public static void main(String[] args) throws Exception {
		BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("d:/logdata")));
		Random random = new Random();
		int logNUm = 5000000;
		StringBuilder stringBuilder = new StringBuilder();
		List<String> channelList = Arrays.asList("spark","hdfs","mr","yarn","hive","scala","python");
		 
		for (int i = 0; i < logNUm; i++) {
			String sessionId = UUID.randomUUID().toString();
			int userId = random.nextInt(10000);
			int year = 2018;
			int month = random.nextInt(12) + 1;
			int day = random.nextInt(30) + 1;
			int hour = random.nextInt(24);
			int minute = random.nextInt(60);
			int second = random.nextInt(60);
			String dateTime = year + "-" + month + "-" + day + " " + hour + ":" + minute + ":" + second;
			SimpleDateFormat form = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");	
			long time = form.parse(dateTime).getTime();
			int pageId = random.nextInt(100);
			String channel = channelList.get(i % channelList.size() );
			stringBuilder.append(sessionId + "\t" + userId + "\t" + time + "\t" + pageId + "\t" + channel + "\n");
			bw.write(stringBuilder.toString());
			stringBuilder.delete(0, stringBuilder.length());
		}
		
		bw.flush();
		bw.close();
	}
}

3、程式碼

package com.hpe.spark.loganalyse

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.util.Date
import java.text.SimpleDateFormat
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.rdd.RDD

object PVAndUV {
  def main(args: Array[String]): Unit = {
    //配置資訊
    val conf = new SparkConf();
    conf.setAppName("UV + PV")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    
    //載入資料
    val rdd = sc.textFile("d:/data/logdata")
    
    
    //呼叫方法
     val rdd2 = first(rdd)
     
     //rdd2.saveAsTextFile("d:/data/Log2")
     
     sc.stop()
  }
  
  //封裝方法
  def first(rdd:RDD[String]) = {
    
    
    //切割字串
    val splitRDD = rdd.map { _.split("\t") }
    
    //過濾,去除髒資料
    val filterRDD = splitRDD.filter { _.length == 5 }
   
    //PV model
    val reduceRDD = pv(filterRDD)
   
    //UV model
    val reduceRDD2 = uv(filterRDD)

    //jion 合併兩個RDD
    val unionRDD = reduceRDD.join(reduceRDD2)
    
    //返回  時間_模組
    //(時間_模組,(a,b))
    val endRDD = unionRDD
      .map(x =>{
        val value = x._2._1 *0.3 + x._2._2 *0.7
        (x._1,value)
      })
      .sortBy(_._2,false)
      .map(x =>{
        val day = x._1.split("_")(0)
        val model = x._1.split("_")(1)
        (day,model)
      })
      .groupByKey()
      .map(x => {
        val list = x._2.take(3)
        (x._1,list)
      }).foreach { println }
    
    
    endRDD
  }
  
  //pv操作
  def pv(filterRDD:RDD[Array[String]]) = {
    val mapRDD = filterRDD.map { x => {
      val time = x(2).toLong
      val date = new Date(time)
      val format = new SimpleDateFormat("yyyy-MM-dd")
      val dateStr = format.format(date)
      x(2) = dateStr
      //返回  時間_模組
      (x(2) + "_" + x(4),1)
    } }
   
    val reduceRDD=mapRDD.reduceByKey(_+_)
    reduceRDD
  }
  
  //uv操作
  def uv(filterRDD:RDD[Array[String]]) = {
    val mapRDD2 = filterRDD.map { x => {
      val time = x(2).toLong
      val date = new Date(time)
      val format = new SimpleDateFormat("yyyy-MM-dd")
      val dateStr = format.format(date)
      x(2) = dateStr
      //返回  使用者id_模組_時間
      (x(1) + "_" + x(2) + "_" + x(4),null)
    } }
   
    //去重
    val disRDD = mapRDD2.distinct()
    
    //只需要key,組裝二元組
    val tupleRDD = disRDD.map(x =>{
      val key = x._1
      //key:會員id_時間_板塊id
      //把會員id切掉
      val newKey = key.substring(key.indexOf("_")+1, key.length())
      (newKey,1)
    })
    
    //累加
    val reduceRDD2=tupleRDD.reduceByKey(_+_)
    reduceRDD2
  }
}