1. 程式人生 > >利用Spark sql操作Hdfs資料與Mysql資料,sql視窗函式的使用

利用Spark sql操作Hdfs資料與Mysql資料,sql視窗函式的使用

需求說明:

                                                                  對熱門商品進行統計

        根據商品的點選資料,統計出各個區域的銷量排行TOPK 產品

        輸入:開始時間與結束時間

        輸出:每個城市的銷量排行TOP K 產品

地區級別

地區名稱

產品名稱

點選量

產品型別

A

西南片區

霧霾口罩

1000000

第三方

A

西南片區

霧霾口罩

1000000

第三方

A

西南片區

霧霾口罩

1000000

第三方

B

華中地區

蘋果

1000

自營

B

華中地區

蘋果

1000

自營

B

華中地區

蘋果

1000

自營

涉及表:

使用者行為表(檔案日誌)  city_id , product_id,點選量 

地區表(mysql)格式如下:


產品表(mysql)格式如下:


使用 spark core 與spark sql實現

主要技術點: Spark sql操作Hdfs資料與Mysql資料,sql視窗函式的使用,dataFrame的使用


   關於spark讀取mysql(地區表,產品表)程式碼如下:

/**
    *獲取mysql表資料,並註冊為spark臨時表
    */
  def loadMysqlData(): Unit = {
    //建立連線mysql連線
    val jdbcOptions = Map("url" -> "jdbc:mysql://192.168.100.212:3306/hxh?user=root&password=123456", "dbtable" -> "areas")
    val reader = sqlContext.read.format("jdbc")
    val jdbcOptions2 = Map("url" -> "jdbc:mysql://192.168.100.212:3306/hxh?user=root&password=123456", "dbtable" -> "product")
    val reader2 = sqlContext.read.format("jdbc")
    //把查詢出來的表註冊為臨時表
      reader.options(jdbcOptions).load().registerTempTable("spark_areas")
      reader2.options(jdbcOptions2).load().registerTempTable("spark_product")
  }

關於spark統計地區、點選量程式碼如下:

package com.hxh

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}


object UserAnalysis {
  val conf = new SparkConf().setAppName("test").setMaster("local[*]")
  val sparkContext = new SparkContext(conf)
  val sqlContext = new HiveContext(sparkContext)

  def main(args: Array[String]): Unit = {
    sqlContext.sql("use bigdata")
    sqlContext.sql("select * from t_pages_click ").registerTempTable("tPagesClick")
    loadMysqlData()
    areaNameCount()
    areaRowCount()
    sqlContext.sql("select areaLevel,areaName,productName,sumClick,extendName " +
      "from click_row_count " +
      "where numSum<=3 " +
      "order by areaLevel asc,sumClick desc" ).show(50)

  }
  def areaRowCount(): Unit ={
    sqlContext.sql("select " +
      "CASE WHEN areaName IN ( '華北地區', '東北地區') THEN 'A' "+
      " WHEN areaName IN ( '華東地區', '華中地區') THEN 'B' "+
      " WHEN areaName IN ( '華南地區', '西南地區') THEN 'C' "+
      "WHEN areaName IN ('西北地區') THEN 'D' "+
      "ELSE'資料錯誤' END as areaLevel,areaName,productName," +
      "sumClick," +
      "Row_Number() OVER (PARTITION BY areaName order by sumClick DESC) AS numSum," +
      "if(extendInfo='1','自營','第三方') extendName "+
      "from areaNameCount ").registerTempTable("click_row_count")
  }
  /**
    * 按地區統計點選量
    */
  def areaNameCount(): Unit ={
    sqlContext.sql("select areas.area_name areaName," +
      "product.product_name productName,count(1) sumClick," +
      "product.extend_info extendInfo from tPagesClick " +
      "join spark_areas areas " +
      "on  tPagesClick.city_id=areas.city_id " +
      "join spark_product  product " +
      "on  product.product_id=tPagesClick.click_product_id " +
      "group by areas.area_name,product.product_name,product.extend_info").registerTempTable("areaNameCount")
  }

結果執行結果如下: