SparkStreaming《三》讀取kafka資料,增量儲存在Mysql裡
阿新 • • 發佈:2018-12-31
一、SparkStreaming讀取kafka資料
package org.apache.spark.examples.streaming import java.sql.{PreparedStatement, Connection, DriverManager} import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCountStateful { def main(args: Array[String]) { //定義狀態更新函式 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } StreamingExamples.setStreamingLogLevels() //設定log4j日誌級別 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful") val sc = new StreamingContext(conf, Seconds(5)) sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/") //設定檢查點,檢查點具有容錯機制 val lines = sc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) stateDstream.print() //下面是新增的語句,把DStream儲存到MySQL資料庫中 stateDstream.foreachRDD(rdd => { //內部函式 def func(records: Iterator[(String,Int)]) { var conn: Connection = null var stmt: PreparedStatement = null try { val url = "jdbc:mysql://localhost:3306/spark" val user = "root" val password = "hadoop" //筆者設定的資料庫密碼是hadoop,請改成你自己的mysql資料庫密碼 conn = DriverManager.getConnection(url, user, password) records.foreach(p => { val sql = "insert into wordcount(word,count) values (?,?)" stmt = conn.prepareStatement(sql); stmt.setString(1, p._1.trim) stmt.setInt(2,p._2.toInt) stmt.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } val repartitionedRDD = rdd.repartition(3) repartitionedRDD.foreachPartition(func) }) sc.start() sc.awaitTermination() } }
二、使用sbt編譯:
- cd /usr/local/spark/mycode/streaming/dstreamoutput
- rm simple.sbt
- vim simple.sbt
-
name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1" libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.1" libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.3.1"
我的spark是2.3.1 scala是2.11.8
-
使用sbt進行打包
- cd /usr/local/spark/mycode/streaming/dstreamoutput
- /usr/local/sbt/sbt package
6.開始執行
/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCountStateful" --jars /usr/local/spark/jars/mysql-connector-java-5.1.42-bin.jar /usr/local/spark/mycode/streaming/dstreamoutput/target/scala-2.11/simple-project_2.11-1.0.jar
7.開啟另一個終端:
nc -lk 9999
//現在你就可以在當前視窗內隨意輸入單詞,輸入一個單詞就回車,比如輸入下面單詞
hello
hadoop
spark
hello
spark
這時可以去mysql資料庫檢視資料庫裡的資料是否填充進去了。
三、貼一下pom的配置檔案:
<properties>
<spark.version>2.3.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.redis</groupId>-->
<!--<artifactId>redis</artifactId>-->
<!--<version>1.0</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-mllib_${scala.version}</artifactId>-->
<!--<version>${spark.version}</version>-->
<!--</dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
在這裡有對於許可權需要注意:
chmod -R 777 * 是遞迴修改目錄及其下面的許可權
rwx 對應於421 第一個字元代表檔案(-)、目錄(d),連結(l)
第一個是目錄或檔案所有者的許可權,第二個是使用者所屬的組的許可權,與檔案所有者同一組的使用者的許可權是讀、寫但不能執行
第三個是:不與檔案所有者同組的其他使用者的許可權是讀不能寫和執行