sql 重寫ipCount II
阿新 • • 發佈:2018-12-13
package com.ws.sparksql import com.ws.spark.IpFromUtils import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * sql 統計日誌中ip歸屬地出現次數 */ object SqlIpFromCount { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().appName("SqlIpFromCount").master("local[*]").getOrCreate() import sparkSession.implicits._ //讀取規則 val rulesData: Dataset[String] = sparkSession.read.textFile("E:\\bigData\\testdata\\ip.txt") val rules: Dataset[(Long, Long, String)] = rulesData.map(l => { val fields = l.split("[|]") val beginNum = fields(2).toLong val endNum = fields(3).toLong val province = fields(6) (beginNum, endNum, province) }) val rulesCollect: Array[(Long, Long, String)] = rules.collect() //廣播變數,只能用sc例項 val broadCast: Broadcast[Array[(Long, Long, String)]] = sparkSession.sparkContext.broadcast(rulesCollect) //讀取資料 val data: Dataset[String] = sparkSession.read.textFile("E:\\bigData\\testdata\\access.log") val ipNum: Dataset[Long] = data.map(l => { val fields = l.split("[|]") val ip = fields(1) //ip轉十進位制 val ipNum = IpFromUtils.ipToLong(ip) ipNum }) val ipNumDataFrame: DataFrame = ipNum.toDF("ip_num") ipNumDataFrame.createTempView("t_ips") //定義一個sql函式 sparkSession.udf.register("iptoProvince", (ipNum: Long) => { //獲取Driver端廣播的變數 val rulesBroad: Array[(Long, Long, String)] = broadCast.value val index = IpFromUtils.binarySearch(rulesBroad, ipNum) var province = "暫無" if (index != -1) { province = rulesBroad(index)._3 } province }) //broadcastJoin val result = sparkSession.sql("select iptoProvince(ip_num) province , count(*) as times from t_ips group by province order by times desc") result.show() sparkSession.stop() } }
結果 :
+--------+-----+
|province|times|
+--------+-----+
| 陝西| 1824|
| 北京| 1535|
| 重慶| 868|
| 河北| 383|
| 雲南| 126|
+--------+-----+