1. 程式人生 > >大資料學習之路106-spark streaming統計結果寫入mysql

大資料學習之路106-spark streaming統計結果寫入mysql

我們首先將資料庫的配置資訊寫到配置檔案中。

要使用配置檔案的話,首先我們要在pom檔案中匯入配置檔案讀取依賴:

 <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.3</version>
        </dependency>

程式碼如下:

package com.test.sparkStreaming

import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Second
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 將實時詞頻統計的資料寫入mysql數資料庫
  */
object MyNetWorkWordCountMysql {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    //載入配置檔案,會去載入resources下面的配置檔案,
    // 預設規則:application.conf -> application.json -> application.properties
    val config: Config = ConfigFactory.load()
    //建立Streamingcontext物件
    val conf = new SparkConf().setAppName("MyNetWorkWordCountMysql").setMaster("local[2]")
    //定義一個取樣時間,每隔2秒鐘採集一次資料,這個時間不能隨意設定
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(2))
    //建立一個離散流
    val lines = ssc.socketTextStream("marshal",5678)
     lines.foreachRDD(
       //插入當前批次計算出來的計算結果
       rdd =>{
         //建立一個spark Session物件
         val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
         //將RDD轉換為DataFrame
         import spark.implicits._
         //words是列的名字,表只有一列
         val frame: DataFrame = rdd.flatMap(_.split(" ")).toDF("words")
         //建立臨時檢視
         frame.createOrReplaceTempView("wordcount")
         //執行sql進行詞頻統計
         val result: DataFrame = spark.sql("select words,count(*) as total from wordcount group by words")
         //封裝使用者名稱和口令
         val props: Properties = new Properties()
         props.setProperty("user",config.getString("db.user"))
         props.setProperty("password",config.getString("db.password"))
         //建立一個mysql資料庫連線
        result.write.mode("append").jdbc(config.getString("db.url"),config.getString("db.table"),props)

       }

     )
    ssc.start()
    ssc.awaitTermination()
  }

}

如果執行過程中出現:

Establishing SSL connection without server's identity verification is not re

則改一下配置檔案:

db.url = "jdbc:mysql://marshal:3306/lfr?useSSL=false"
db.user = "root"
db.password = "123456"
db.table = "wordcount"

執行結果如下:

我們還可以加一個過濾條件:

 if(!result.rdd.isEmpty())
            result.write.mode("append").jdbc(config.getString("db.url"),config.getString("db.table"),props)