1. 程式人生 > >【八】Spark Streaming 用foreachRDD把結果寫入Mysql中Local模式(使用Scala語言)

【八】Spark Streaming 用foreachRDD把結果寫入Mysql中Local模式(使用Scala語言)

DStream 的foreachRDD是允許把資料傳送到外部檔案系統中。然而使用不當會導致各種問題。

錯誤示範1:在driver建立連線,在woker使用。會報錯connection object not serializable。

錯誤示範2:rdd每個記錄都建立連線,成本非常高。

正確示範:拿到rdd以後foreachPartition,每個partition建立連線,而且使用資料庫連線池。

專案目錄

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.sid.spark</groupId>
  <artifactId>spark-train</artifactId>
  <version>1.0</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.11.8</scala.version>
    <kafka.version>0.9.0.0</kafka.version>
    <spark.version>2.2.0</spark.version>
    <hadoop.version>2.9.0</hadoop.version>
    <hbase.version>1.4.4</hbase.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${kafka.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <!--<dependency>-->
      <!--<groupId>org.apache.hbase</groupId>-->
      <!--<artifactId>hbase-clinet</artifactId>-->
      <!--<version>${hbase.version}</version>-->
    <!--</dependency>-->

    <!--<dependency>-->
      <!--<groupId>org.apache.hbase</groupId>-->
      <!--<artifactId>hbase-server</artifactId>-->
      <!--<version>${hbase.version}</version>-->
    <!--</dependency>-->

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>net.jpountz.lz4</groupId>
      <artifactId>lz4</artifactId>
      <version>1.3.0</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.31</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

程式碼

package com.sid.spark

import java.sql.DriverManager

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by jy02268879 on 2018/7/17.
  *
  * 使用Spark Streaming完成詞頻統計
  * 使用foreachRDD將結果寫入MySQL
  *
  */
object ForeachRDDWriteMySQL {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("UpdateStateByKey")setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val lines = ssc.socketTextStream("node1",6789)

    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)


    //TODO... 將結果寫入MySQL
    result.foreachRDD ( rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        val connection = createConnection()
        partitionOfRecords.foreach(record => {
          val querySql = "SELECT t.word_count FROM wordcount t WHERE t.word = '"+record._1+"'"
          val queryResultSet = connection.createStatement().executeQuery(querySql)
          val hasNext = queryResultSet.next()
          print("MySQL had word:"+record._1+ " already  :  "+hasNext)
          if(!hasNext){
            val insertSql = "insert into wordcount(word,word_count) values('" + record._1 + "'," + record._2 + ")"
            connection.createStatement().execute(insertSql)

          }else{
            val newWordCount = queryResultSet.getInt("word_count") + record._2
            val updateSql = "UPDATE wordcount SET word_count = "+newWordCount+" where word = '"+record._1+"'"
            connection.createStatement().execute(updateSql)
          }
        })
        connection.close()
      })
    })

    ssc.start()
    ssc.awaitTermination()

  }

  /**
    * 獲取MySQL的連線
    * */
  def createConnection()={
    Class.forName("com.mysql.jdbc.Driver")
    DriverManager.getConnection("jdbc:mysql://localhost:3306/sid","root","資料庫密碼")
  }

}

在node1上啟動nc

nc -lk 6789

IDEA執行專案

在nc輸入

IDEA控制檯

mysql結果

在nc又輸入 a a a此時有5個了

IDEA控制檯

mysql結果

在nc又輸入 b b

IDEA控制檯

mysql結果

啟動專案報錯

java.lang.NoSuchMethodError: javax.servlet.http.HttpServletRequest.isAsyncSupported()Z
	at org.spark_project.jetty.servlet.DefaultServlet.sendData(DefaultServlet.java:936)
	at org.spark_project.jetty.servlet.DefaultServlet.doGet(DefaultServlet.java:525)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
	at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
	at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
	at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
	at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
	at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
	at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
	at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
	at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
	at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
	at org.spark_project.jetty.server.Server.handle(Server.java:524)
	at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
	at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
	at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
	at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
	at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
	at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
	at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
	at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
	at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
	at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
	at java.lang.Thread.run(Thread.java:745)

描述:沒有找到javax.servlet.http.HttpServletRequest類的isAsyncSupported方法

解決

搜尋HttpServletRequest類

刪除所有引用的servlet-api-2.X.jar,它的javax.servlet.http.HttpServletRequest類沒有isAsyncSupported方法,且覆蓋了有該方法的servlet-api-3.X.jar