1. 程式人生 > >sql 重寫ipCount II

sql 重寫ipCount II

package com.ws.sparksql
import com.ws.spark.IpFromUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * sql 統計日誌中ip歸屬地出現次數
  */
object SqlIpFromCount {

  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder().appName("SqlIpFromCount").master("local[*]").getOrCreate()

    import sparkSession.implicits._

    //讀取規則
    val rulesData: Dataset[String] = sparkSession.read.textFile("E:\\bigData\\testdata\\ip.txt")

    val rules: Dataset[(Long, Long, String)] = rulesData.map(l => {
      val fields = l.split("[|]")
      val beginNum = fields(2).toLong
      val endNum = fields(3).toLong
      val province = fields(6)
      (beginNum, endNum, province)
    })

    val rulesCollect: Array[(Long, Long, String)] = rules.collect()
    //廣播變數,只能用sc例項
    val broadCast: Broadcast[Array[(Long, Long, String)]] = sparkSession.sparkContext.broadcast(rulesCollect)

    //讀取資料
    val data: Dataset[String] = sparkSession.read.textFile("E:\\bigData\\testdata\\access.log")

    val ipNum: Dataset[Long] = data.map(l => {
      val fields = l.split("[|]")
      val ip = fields(1)
      //ip轉十進位制
      val ipNum = IpFromUtils.ipToLong(ip)
      ipNum
    })

    val ipNumDataFrame: DataFrame = ipNum.toDF("ip_num")

    ipNumDataFrame.createTempView("t_ips")

    //定義一個sql函式
    sparkSession.udf.register("iptoProvince", (ipNum: Long) => {
      //獲取Driver端廣播的變數
      val rulesBroad: Array[(Long, Long, String)] = broadCast.value

      val index = IpFromUtils.binarySearch(rulesBroad, ipNum)

      var province = "暫無"

      if (index != -1) {
        province = rulesBroad(index)._3
      }
      province
    })
    //broadcastJoin
    val result = sparkSession.sql("select iptoProvince(ip_num) province , count(*) as times from t_ips group by province order by times desc")

    result.show()

    sparkSession.stop()
  }
}

結果 :

+--------+-----+
|province|times|
+--------+-----+
|      陝西| 1824|
|      北京| 1535|
|      重慶|  868|
|      河北|  383|
|      雲南|  126|
+--------+-----+