1. 程式人生 > >spark的JDBC連線池(Scala版)

spark的JDBC連線池(Scala版)

一個Scala版的連線池,並在使用Spark Streaming進行Word Count時,把每批資料都存到mySql中

import java.sql.{Connection, DriverManager}
import java.util

object JDBCConnectePools02 {
  private val max = 10  //設定連線最大數
  private val ConnectionNum = 10   //設定 每次可以獲取幾個Connection
  private var conNum =   0//連線數
  private val pool = new util.LinkedList
[Connection]() //連線池 def getDriver() : Unit = { //載入Driver //載入 if(conNum < max && pool.isEmpty){ Class.forName("com.mysql.jdbc.Driver") }else if(conNum>=max && pool.isEmpty){ print("當前暫無可用Connection") Thread.sleep(2000) getDriver() } } def getConn
(): Connection ={ if(pool.isEmpty){ getDriver() for(i <- 1 to ConnectionNum){ //建立10個連線 val conn = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/updatewordcount","root","root") pool.push(conn) // 把連線放到連線池中,push是LinkedList中的方法 conNum += 1 } } val conn:
Connection = pool.pop()//從執行緒池所在LinkedList中彈出一個Connection,pop 是LinkedList的方法 conn //返回一個Connection } def returnConn( conn :Connection): Unit ={ //還連線 pool.push(conn) } }

一個簡單的使用,使用sparkStreaming,盡心wordCount,每次把結果放到MySql 中

import java.sql.{Connection, PreparedStatement}

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, StreamingContext}

object JDBCWordCont02 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("wc").setMaster("local[2]")
    //新建一個StreamingContext,每個5s是一個批次
    val ssc = new StreamingContext(conf,new Duration(5000))
   //接受hadoop01主機的 8888埠的資料
    val data: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01",8888)
    //進行切分壓平
    val split: DStream[String] = data.flatMap(_.split(" "))
    //單詞和1組合
    val wordAndOne: DStream[(String, Int)] = split.map((_,1))
    //對rdd進行遍歷,想要使用foreachPartition 需要foreachRDD ,
    //對裡面的rdd進行操作,DStream中沒有foreachpartition方法,
    //如果直接使用foreach方法不好,會大量的去連線,還連線,對效能有影響
    wordAndOne.foreachRDD(rdd=>{
    //對RDD中的資料進行聚合
      val reduced: RDD[(String, Int)] = rdd.reduceByKey(_+_)
      reduced.foreachPartition(item =>{
      //獲取連線
        val conn: Connection = JDBCConnectePools02.getConn()
     
        for(one <- item){ //把聚合後的資料存到mysql 中
          val pstm: PreparedStatement = conn.prepareStatement("insert into  wordcount(word,count) values(?,?)")
          pstm.setString(1,one._1)
          pstm.setInt(2,one._2)
          pstm.executeUpdate()
        }
      //還連線
        JDBCConnectePools02.returnConn(conn)



      })
    })

    ssc.start()
    ssc.awaitTermination()
  }


}