1. 程式人生 > >SparkStreaming《三》讀取kafka資料,增量儲存在Mysql裡

SparkStreaming《三》讀取kafka資料,增量儲存在Mysql裡

一、SparkStreaming讀取kafka資料

package org.apache.spark.examples.streaming
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定義狀態更新函式
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
    StreamingExamples.setStreamingLogLevels()  //設定log4j日誌級別
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/dstreamoutput/")    //設定檢查點,檢查點具有容錯機制
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
        //下面是新增的語句,把DStream儲存到MySQL資料庫中     
     stateDstream.foreachRDD(rdd => {
      //內部函式
      def func(records: Iterator[(String,Int)]) {
        var conn: Connection = null
        var stmt: PreparedStatement = null
        try {
          val url = "jdbc:mysql://localhost:3306/spark"
          val user = "root"
          val password = "hadoop"  //筆者設定的資料庫密碼是hadoop,請改成你自己的mysql資料庫密碼
          conn = DriverManager.getConnection(url, user, password)
          records.foreach(p => {
            val sql = "insert into wordcount(word,count) values (?,?)"
            stmt = conn.prepareStatement(sql);
            stmt.setString(1, p._1.trim)
                        stmt.setInt(2,p._2.toInt)
            stmt.executeUpdate()
          })
        } catch {
          case e: Exception => e.printStackTrace()
        } finally {
          if (stmt != null) {
            stmt.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }

      val repartitionedRDD = rdd.repartition(3)
      repartitionedRDD.foreachPartition(func)
    })

    sc.start()
    sc.awaitTermination()
  }
}

二、使用sbt編譯:

  1. cd /usr/local/spark/mycode/streaming/dstreamoutput
  2. rm simple.sbt
  3. vim simple.sbt
  4. name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.3.1"
    libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.3.1"

    我的spark是2.3.1 scala是2.11.8

  5. 使用sbt進行打包

 

  1. cd /usr/local/spark/mycode/streaming/dstreamoutput
  2. /usr/local/sbt/sbt package

6.開始執行

/usr/local/spark/bin/spark-submit --class "org.apache.spark.examples.streaming.NetworkWordCountStateful" --jars /usr/local/spark/jars/mysql-connector-java-5.1.42-bin.jar /usr/local/spark/mycode/streaming/dstreamoutput/target/scala-2.11/simple-project_2.11-1.0.jar

7.開啟另一個終端:

nc -lk 9999
//現在你就可以在當前視窗內隨意輸入單詞,輸入一個單詞就回車,比如輸入下面單詞
hello
hadoop
spark
hello
spark

這時可以去mysql資料庫檢視資料庫裡的資料是否填充進去了。

三、貼一下pom的配置檔案:

  <properties>
        <spark.version>2.3.1</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--<dependency>-->
         <!--<groupId>org.apache.redis</groupId>-->
        <!--<artifactId>redis</artifactId>-->
        <!--<version>1.0</version>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.0.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!--<dependency>-->
            <!--<groupId>org.apache.spark</groupId>-->
            <!--<artifactId>spark-mllib_${scala.version}</artifactId>-->
            <!--<version>${spark.version}</version>-->
        <!--</dependency>-->


    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>

在這裡有對於許可權需要注意:

chmod -R 777  *  是遞迴修改目錄及其下面的許可權

rwx 對應於421   第一個字元代表檔案(-)、目錄(d),連結(l)

第一個是目錄或檔案所有者的許可權,第二個是使用者所屬的組的許可權,與檔案所有者同一組的使用者的許可權是讀、寫但不能執行

第三個是:不與檔案所有者同組的其他使用者的許可權是讀不能寫和執行