Spark中ip對映資料應用庫,二分查詢省份,將結果寫入mysql
阿新 • • 發佈:2019-02-17
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
.setAppName(IpLocation3.getClass.getName)
val sc = new SparkContext(conf)
val ipData: RDD[String] = sc.textFile("ip.txt")
val ipRule:RDD[(Long,Long,String)] = ipData.map({
line=>
val fields = line.split("[|]")
val start = fields(2).toLong
val end = fields(3).toLong
val province = fields(6)
(start,end,province)
})
val ipRules = ipRule.collect()
//使用廣播變數,將資料廣播到executor。多個task使用這個資料。減少連線請求。
val ipbc: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(ipRules)
val accessData: RDD[String] = sc.textFile("access.log")
val proAndOne:RDD[(String,Int)] = accessData.map({
line =>
val ipLong:String = line.split("[|]")(1)
//將字串的ip轉為十進位制的ip。
val ip = ip2Long(ipLong)
//將廣播變數裡面的資料取出,使用。
val ipArray: Array[(Long, Long, String)] = ipbc.value
//二分查詢資料在引用庫中的索引。
val index = search(ip,ipArray)
var province = "unknow"
if(index != -1){
province = ipArray(index)._3
}
(province,1)
})
//產生結果。
val res: RDD[(String, Int)] = proAndOne.reduceByKey(_+_).sortBy(-_._2)
res.foreachPartition({
it =>
//將結果寫入到資料庫。獲得連線,寫資料,關流。
var conn: Connection = null
var pst: PreparedStatement = null
var pst1: PreparedStatement = null
try{
val url = "jdbc:mysql://localhost:3306/car?characterEncoding=utf-8"
conn = DriverManager.getConnection(url,"root","123456")
pst = conn.prepareStatement("create table if not exists access_log(province varchar(20),count int)")
pst.execute()
pst1 = conn.prepareStatement("insert into access_log values(?,?)")
it.foreach({
v =>
pst1.setString(1,v._1)
pst1.setInt(2,v._2)
pst1.execute()
})
}catch{
case e : Exception => e.printStackTrace()
}finally{
if(pst1 != conn) pst1.close()
if(pst != conn) pst.close()
if(null != conn) conn.close()
}
})
//釋放資源。
sc.stop()
}
}