1. 程式人生 > >Spark中ip對映資料應用庫,二分查詢省份,將結果寫入mysql

Spark中ip對映資料應用庫,二分查詢省份,將結果寫入mysql

def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local") .setAppName(IpLocation3.getClass.getName) val sc = new SparkContext(conf) val ipData: RDD[String] = sc.textFile("ip.txt") val ipRule:RDD[(Long,Long,String)] = ipData.map({ line=> val fields = line.split("[|]") val start = fields(2).toLong val end = fields(3).toLong val province = fields(6) (start,end,province) }) val ipRules = ipRule.collect() //使用廣播變數,將資料廣播到executor。多個task使用這個資料。減少連線請求。 val ipbc: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(ipRules) val accessData: RDD[String] = sc.textFile("access.log") val proAndOne:RDD[(String,Int)] = accessData.map({ line => val ipLong:String = line.split("[|]")(1) //將字串的ip轉為十進位制的ip。 val ip = ip2Long(ipLong) //將廣播變數裡面的資料取出,使用。 val ipArray: Array[(Long, Long, String)] = ipbc.value //二分查詢資料在引用庫中的索引。 val index = search(ip,ipArray) var province = "unknow" if(index != -1){ province = ipArray(index)._3 } (province,1) }) //產生結果。 val res: RDD[(String, Int)] = proAndOne.reduceByKey(_+_).sortBy(-_._2) res.foreachPartition({ it => //將結果寫入到資料庫。獲得連線,寫資料,關流。 var conn: Connection = null var pst: PreparedStatement = null var pst1: PreparedStatement = null try{ val url = "jdbc:mysql://localhost:3306/car?characterEncoding=utf-8" conn = DriverManager.getConnection(url,"root","123456") pst = conn.prepareStatement("create table if not exists access_log(province varchar(20),count int)") pst.execute() pst1 = conn.prepareStatement("insert into access_log values(?,?)") it.foreach({ v => pst1.setString(1,v._1) pst1.setInt(2,v._2) pst1.execute() }) }catch{ case e : Exception => e.printStackTrace() }finally{ if(pst1 != conn) pst1.close() if(pst != conn) pst.close() if(null != conn) conn.close() } }) //釋放資源。 sc.stop() } }