1. 程式人生 > >大資料學習之路107-spark streaming基於mysql歷史state統計

大資料學習之路107-spark streaming基於mysql歷史state統計

package com.test.sparkStreaming

import java.sql.{DriverManager, PreparedStatement}

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

object MyNetWorkWordCountMysqlState {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //載入配置檔案,會去載入resources下面的配置檔案,
    // 預設規則:application.conf -> application.json -> application.properties
    val config: Config = ConfigFactory.load()
    //建立Streamingcontext物件
    val conf = new SparkConf().setAppName("MyNetWorkWordCountMysqlState").setMaster("local[2]")
    //定義一個取樣時間,每隔2秒鐘採集一次資料,這個時間不能隨意設定
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
    //建立一個離散流
    val lines = ssc.socketTextStream("marshal",5678)

    /**
      * 插入當前批次計算結果
      * foreachRDD在Driver端執行
      * foreachPartition,foreach在worker端執行
      */

    lines.foreachRDD(
      rdd =>{
        //計算當前批次結果
        val current_result: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        //插入當前批次計算出來的結果
        current_result.foreachPartition(
          partition => {
            //建立一個連線
            val url = config.getString("db.url")
            val user = config.getString("db.user")
            val password = config.getString("db.password")
            val conn = DriverManager.getConnection(url,user,password)
            //將當前分割槽裡面的所有資料都插入到mysql資料庫中
            partition.foreach(
              tp =>{
                val word = tp._1
                //判斷即將插入的資料是否之前已經插入過,如果已經插入過,則進行更新操作,否則就是插入
                val pst = conn.prepareStatement("select * from wordcount where words=?")
                pst.setString(1,word)
                val rs = pst.executeQuery()
                var flag = false
                while(rs.next()){
                  flag = true
                  //即將插入的單詞已經存在,可以進行更新操作
                  println("已經存在")
                  val i: Int = rs.getInt("total")
                  val i2 = i + tp._2
                  //更新
                  val update = conn.prepareStatement("update wordcount set total = ? where words = ?")
                  update.setInt(1,i2)
                  update.setString(2,word)
                  update.executeUpdate()
                  update.close()

                }
                if(!flag){
                  println("單詞不存在,需要插入")
                  //插入一條資料
                  val pst: PreparedStatement = conn.prepareStatement("insert into wordcount values(?,?)")
                  pst.setString(1,tp._1)
                  pst.setInt(2,tp._2)
                  pst.executeUpdate()
                  pst.close()
                }

              })
            if (conn != null)
              conn.close()
          })

      })
    ssc.start()
    ssc.awaitTermination()
  }
}

執行結果: