1. 程式人生 > >SparkSql執行自定義sql

SparkSql執行自定義sql

package cn.itcast.spark.day3

import java.sql.{Connection, Date, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by root on 2016/5/18.
  * 廣播變數和累加器
  */
object IPLocation {

  val data2MySQL = (iterator: Iterator[(String, Int)]) => {
    var conn: Connection = null
var ps : PreparedStatement = null val sql = "INSERT INTO location_info (location, counts, accesse_date) VALUES (?, ?, ?)" try { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "x5") iterator.foreach(line => { ps = conn.prepareStatement(sql) ps.setString(1
, line._1) ps.setInt(2, line._2) ps.setDate(3, new Date(System.currentTimeMillis())) ps.executeUpdate() }) } catch { case e: Exception => println("Mysql Exception") } finally { if (ps != null) ps.close() if (conn != null) conn.close() } } def
ip2Long(ip: String): Long = { val fragments = ip.split("[.]") var ipNum = 0L for (i <- 0 until fragments.length){ ipNum = fragments(i).toLong | ipNum << 8L } ipNum } def binarySearch(lines: Array[(String, String, String)], ip: Long) : Int = { var low = 0 var high = lines.length - 1 while (low <= high) { val middle = (low + high) / 2 if ((ip >= lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong)) return middle if (ip < lines(middle)._1.toLong) high = middle - 1 else { low = middle + 1 } } -1 } def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("IpLocation") val sc = new SparkContext(conf) val ipRulesRdd = sc.textFile("c://ip.txt").map(line =>{ val fields = line.split("\\|") val start_num = fields(2) val end_num = fields(3) val province = fields(6) (start_num, end_num, province) }) //全部的ip對映規則 val ipRulesArrary = ipRulesRdd.collect() println(111); println(ipRulesArrary.toBuffer); //廣播規則 val ipRulesBroadcast = sc.broadcast(ipRulesArrary) println(222); println(ipRulesBroadcast.value.toBuffer) //載入要處理的資料 val ipsRDD = sc.textFile("c://access_log").map(line => { val fields = line.split("\\|") fields(1) }) val result = ipsRDD.map(ip => { val ipNum = ip2Long(ip) val index = binarySearch(ipRulesBroadcast.value, ipNum) val info = ipRulesBroadcast.value(index) //(ip的起始Num, ip的結束Num,省份名) info }).map(t => (t._3, 1)).reduceByKey(_+_) //向MySQL寫入資料 result.foreachPartition(data2MySQL(_)) //println(result.collect().toBuffer) sc.stop() } }