大資料學習之路106-spark streaming統計結果寫入mysql
阿新 • • 發佈:2018-11-09
我們首先將資料庫的配置資訊寫到配置檔案中。
要使用配置檔案的話,首先我們要在pom檔案中匯入配置檔案讀取依賴:
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.3</version>
</dependency>
程式碼如下:
package com.test.sparkStreaming import java.util.Properties import com.typesafe.config.{Config, ConfigFactory} import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions.Second import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 將實時詞頻統計的資料寫入mysql數資料庫 */ object MyNetWorkWordCountMysql { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) //載入配置檔案,會去載入resources下面的配置檔案, // 預設規則:application.conf -> application.json -> application.properties val config: Config = ConfigFactory.load() //建立Streamingcontext物件 val conf = new SparkConf().setAppName("MyNetWorkWordCountMysql").setMaster("local[2]") //定義一個取樣時間,每隔2秒鐘採集一次資料,這個時間不能隨意設定 val ssc: StreamingContext = new StreamingContext(conf,Seconds(2)) //建立一個離散流 val lines = ssc.socketTextStream("marshal",5678) lines.foreachRDD( //插入當前批次計算出來的計算結果 rdd =>{ //建立一個spark Session物件 val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() //將RDD轉換為DataFrame import spark.implicits._ //words是列的名字,表只有一列 val frame: DataFrame = rdd.flatMap(_.split(" ")).toDF("words") //建立臨時檢視 frame.createOrReplaceTempView("wordcount") //執行sql進行詞頻統計 val result: DataFrame = spark.sql("select words,count(*) as total from wordcount group by words") //封裝使用者名稱和口令 val props: Properties = new Properties() props.setProperty("user",config.getString("db.user")) props.setProperty("password",config.getString("db.password")) //建立一個mysql資料庫連線 result.write.mode("append").jdbc(config.getString("db.url"),config.getString("db.table"),props) } ) ssc.start() ssc.awaitTermination() } }
如果執行過程中出現:
Establishing SSL connection without server's identity verification is not re
則改一下配置檔案:
db.url = "jdbc:mysql://marshal:3306/lfr?useSSL=false"
db.user = "root"
db.password = "123456"
db.table = "wordcount"
執行結果如下:
我們還可以加一個過濾條件:
if(!result.rdd.isEmpty()) result.write.mode("append").jdbc(config.getString("db.url"),config.getString("db.table"),props)