1. 程式人生 > >第80課:Spark SQL網站搜尋綜合案例實戰

第80課:Spark SQL網站搜尋綜合案例實戰

內容:

    1.案例分析
    2.案例實戰

一、案例分析


    專案:以京東找出搜尋平臺排名的產品,The hottest
    元資料:date,userID,item,city,device
    總體思路:混合使用Spark SQL和Spark Core的內容
        第一步:原始的ETL,過濾資料後產生目標資料,實際企業中可能過濾條件非常複雜(進行廣播),使用RDD的filter等進行操作;
        第二步:過濾後的目標資料進行指定條件的查詢,查詢條件也可能非常複雜(進行廣播),使用RDD的filter運算元;
        第三步:由於商品是分為種類的,我們在得出最終的結果之前,首先會基於商品進行UV(當然也可以對使用者的商品的訪問PV),此時要對商品映象UV=計算的話,必須構建K-V的 RDD,例如構建過程為為(dateItem,UserID)以方便進行groupByKey,在呼叫了的groupByKey之後對user進行去重,並計算出每一天每一種商品的UV,最終計算出來的的結果的資料型別();
        第四步:使用開窗函式row_number統計出每日商品UV前五名的內容,row_number()OVER (PARTITION BY  date ORDER BY UV DESC) rank,此時會產生以date為日期、item、uv為Row的dataFrame
        第五步:DataFrame轉換成RDD,根據日期進行分組並分析出彌天排名為前5的熱搜item;
        第六步:進行Key-Values交換,然後呼叫sortByKey進行點選熱度排名;
        第七步:再次進行Key-Value交換,得出目標資料為(data#item,UV)的格式;
        第八步:通過RDD直接操作MYSQL等把結果放入生成系統中的DB中,通過Java EE等Server技術進行視覺化以提供市場營銷人員、倉庫排程系統、快遞系統、倉庫決策人員吃用資料創造價值;
            當然也可以放在Hive中,Java EE等技術通過JDBC等連結訪問Hive;
            當然也可以放在Spark SQL中,通過Thrift技術通過Java EE使用等;
            當然,如果像雙十一等時候,一般首選放在Redis中,這樣可以實現類似秒殺系統的響應速度

二、程式碼案例

    1.生成日誌的程式碼示例

package SparkSQL

import java.io.{File, FileWriter, PrintWriter}
import java.text.SimpleDateFormat
import java.util.{Calendar, Random}

import scala.reflect.macros.ParseException

/**
  * FileName: SparkSQLUserLogsManually
  * Author:   hadoop
  * Email:    
[email protected]
* Date: 18-11-24 下午9:48 * Description: * */ object SparkSQLUserLogsManually { def main (args: Array[String]): Unit = { //資料的容量為10000條 val numberItems = 10000 //資料儲存位置 val pathPath = "/home/hadoop/IdeaScala/" ganerateUserLogs(numberItems,pathPath) } /** * 將資料寫入到指定的檔案中 * @param pathPath 資料儲存的目錄 * @param fileName 資料儲存的檔案 * @param strUserLog */ def writeLog (pathPath: String, fileName: String, strUserLog: String) = { var fw:FileWriter = null var out:PrintWriter = null try{ val writeFile = new File(pathPath+fileName) if(!writeFile.exists()){ writeFile.createNewFile() }else{ writeFile.delete() } fw = new FileWriter(writeFile,true) out = new PrintWriter(fw) out.print(strUserLog) }catch{ case e:Exception=>e.printStackTrace() }finally { try{ if (out!=null) out.close if (fw != null) fw.close }catch{ case e:Exception=>e.printStackTrace() } } } /** *a * @param value * @param formate * @param step * @return */ def getCountDate (value: Null, formate: String, step: Int) = { val sdf = new SimpleDateFormat(formate) val cal = Calendar.getInstance() if(value != null){ try{ cal.setTime(sdf.parse(value)) }catch{ case e:ParseException=>e.printStackTrace() } } cal.add(Calendar.DAY_OF_MONTH,step) sdf.format(cal.getTime) } def ganerateUserID () = { val random = new Random val userID = Array[String]( "98415b9c-f3d4-45c3-bc7f-dce3126c6c0b", "7371b4bd-8535-461f-a5e2-c4814b2151e1", "49852bfa-a662-4060-bf68-0dddde5feea1", "8768f089-f736-4346-a83d-e23fe05b0ecd", "a76ff021-049c-4a1a-8372-02f9c51261d5", "8d5dc011-cbe2-4332-99cd-a1848ddfd65d", "a2bccbdf-f0e9-489c-8513-011644cb5cf7", "89c79413-a7d1-462c-ab07-01f0835696f7", "8d525daa-3697-455e-8f02-ab086cda7851", "c6f57c89-9871-4a92-9cbe-a2d76cd79cd0", "19951134-97e1-4f62-8d5c-134077d1f955", "3202a063-4ebf-4f3f-a4b7-5e542307d726", "40a0d872-45cc-46bc-b257-64ad898df281", "b891a528-4b5e-4ba7-949c-2a32cb5a75ec", "0d46d52b-75a2-4df2-b363-43874c9503a2", "c1e4b8cf-0116-46bf-8dc9-55eb074ad315", "6fd24ac6-1bb0-4ea6-a084-52cc22e9be42", "5f8780af-93e8-4907-9794-f8c960e87d34", "692b1947-8b2e-45e4-8051-0319b7f0e438", "dde46f46-ff48-4763-9c50-377834ce7137") userID(random.nextInt(20)) } def ganerateItemID()={ val random = new Random val itemID = Array("小米","休閒鞋","洗衣機","顯示器","顯示卡","洗衣液","行車記錄儀") itemID(random.nextInt(7)) } def ganerateCityID()={ val random = new Random val CityNames = Array("上海", "北京", "深圳", "廣州", "紐約", "倫敦", "東京", "首爾", "莫斯科", "巴黎") CityNames(random.nextInt(10)) } def ganerateDevice()={ val random = new Random val Devices = Array("android","iphone","ipad","PC") Devices(random.nextInt(4)) } def ganerateUserLogs(numberItems: Int, pathPath: String): Unit = { val userLogBuffer = new StringBuffer() val fileName = "SparkSQLUserlogsHottest.log" val formate = "yyyy-MM-dd" for (i <- 0 until numberItems){ val date = getCountDate(null,formate,-1) val userID = ganerateUserID() val itemID = ganerateItemID() val cityID = ganerateCityID() val device = ganerateDevice() userLogBuffer.append(date+"\t" +userID +"\t"+itemID+"\t"+cityID+"\t"+device+"\n") println(userLogBuffer.toString) writeLog(pathPath,fileName,userLogBuffer+"") } } }


    2.日誌案例

package SparkSQL

import java.util

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

object SparkSQLUserLogsHosttest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkSQLUserLogsHosttest").setMaster("local[4]")
    val sc = new SparkContext(conf)
   // val sqlContext = new HiveContext(sc)
    val spark = SparkSession.builder().appName("SparkSQLUserLogsHosttest").master("local[4]").config("spark.sql.warehouse.dir", "/home/hadoop/IdeaScala/LearningBigdata/spark-warehouse").getOrCreate()

    var path = "/home/hadoop/IdeaScala/SparkSQLUserlogsHottest.log"
    val file = sc.textFile(path)

    val devicebd = "iphone"
    val broadcastdevice = sc.broadcast(devicebd)
    val lines = file.filter(line=>{
      line.contains(broadcastdevice.value)
    })
    val listRow = lines.collect()
    for(row <- listRow){
      println(row)
    }

    val pairs = lines.map(line=>{
      val splited = line.split("\t")
      val one = 1
      val dataanditemanduserid = splited(0)+"#"+splited(2)+"#"+splited(1)
      (dataanditemanduserid,one)
    })

    val pairRow = pairs.collect()
    for(pair <- pairRow){
      println(pair)
    }

    val reduceedPairs = pairs.reduceByKey(_+_)
    val reduceedRow = reduceedPairs.collect()
    val peopleInformation:util.ArrayList[String] = new util.ArrayList[String]()
    for(eachRow <- reduceedRow) {
      println(eachRow)
      val rowSplited = eachRow._1.split("#")
      val userID = rowSplited(2)
      val itemID = rowSplited(1)
      val dateID = rowSplited(0)
      //val jsonzip = "{ Date :" +dateID+", UserID :"+userID+",Username :"+userID+",Item : "+itemID+",count : "+ eachRow._2+"}"
      val jsonzip = "{\"Date\":\"" + dateID + "\", \"UserID\":\"" + userID + "\", \"Username\":\"" + userID + "\", \"Item\":\"" + itemID + "\", \"count\":" + eachRow._2 + "}"
      peopleInformation.add(jsonzip)
    }

    for (row <- peopleInformation.toArray()){
      println(row)
    }

    val peopleInformationRDD = sc.parallelize(peopleInformation.toArray())
    val peopleInformationDS = spark.read.json(peopleInformationRDD.toString())
    peopleInformationDS.createOrReplaceTempView("peopleInformations")
    val sqlText = "SELECT UserID,Item, count "+
      "FROM ("+
      "SELECT "+
      "UserID,Item, count,"+
      "row_number() OVER (PARTITION BY UserID ORDER BY count DESC) rank"+
      " FROM peopleInformations "+
      ") sub_peopleInformations "+
      "WHERE rank <= 3 "
    val execellentNameAgeDS = spark.sql(sqlText)
    execellentNameAgeDS.show()
    execellentNameAgeDS.write.format("json").save(""+"Result")

  }

}