【八】Spark Streaming 用foreachRDD把結果寫入Mysql中Local模式(使用Scala語言)
阿新 • • 發佈:2019-01-03
DStream 的foreachRDD是允許把資料傳送到外部檔案系統中。然而使用不當會導致各種問題。
錯誤示範1:在driver建立連線,在woker使用。會報錯connection object not serializable。
錯誤示範2:rdd每個記錄都建立連線,成本非常高。
正確示範:拿到rdd以後foreachPartition,每個partition建立連線,而且使用資料庫連線池。
專案目錄
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sid.spark</groupId> <artifactId>spark-train</artifactId> <version>1.0</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.11.8</scala.version> <kafka.version>0.9.0.0</kafka.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.9.0</hadoop.version> <hbase.version>1.4.4</hbase.version> </properties> <repositories> <repository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>scala-tools.org</id> <name>Scala-Tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.hbase</groupId>--> <!--<artifactId>hbase-clinet</artifactId>--> <!--<version>${hbase.version}</version>--> <!--</dependency>--> <!--<dependency>--> <!--<groupId>org.apache.hbase</groupId>--> <!--<artifactId>hbase-server</artifactId>--> <!--<version>${hbase.version}</version>--> <!--</dependency>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project>
程式碼
package com.sid.spark import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Created by jy02268879 on 2018/7/17. * * 使用Spark Streaming完成詞頻統計 * 使用foreachRDD將結果寫入MySQL * */ object ForeachRDDWriteMySQL { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("UpdateStateByKey")setMaster("local[2]") val ssc = new StreamingContext(sparkConf,Seconds(5)) val lines = ssc.socketTextStream("node1",6789) val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //TODO... 將結果寫入MySQL result.foreachRDD ( rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createConnection() partitionOfRecords.foreach(record => { val querySql = "SELECT t.word_count FROM wordcount t WHERE t.word = '"+record._1+"'" val queryResultSet = connection.createStatement().executeQuery(querySql) val hasNext = queryResultSet.next() print("MySQL had word:"+record._1+ " already : "+hasNext) if(!hasNext){ val insertSql = "insert into wordcount(word,word_count) values('" + record._1 + "'," + record._2 + ")" connection.createStatement().execute(insertSql) }else{ val newWordCount = queryResultSet.getInt("word_count") + record._2 val updateSql = "UPDATE wordcount SET word_count = "+newWordCount+" where word = '"+record._1+"'" connection.createStatement().execute(updateSql) } }) connection.close() }) }) ssc.start() ssc.awaitTermination() } /** * 獲取MySQL的連線 * */ def createConnection()={ Class.forName("com.mysql.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/sid","root","資料庫密碼") } }
在node1上啟動nc
nc -lk 6789
IDEA執行專案
在nc輸入
IDEA控制檯
mysql結果
在nc又輸入 a a a此時有5個了
IDEA控制檯
mysql結果
在nc又輸入 b b
IDEA控制檯
mysql結果
啟動專案報錯
java.lang.NoSuchMethodError: javax.servlet.http.HttpServletRequest.isAsyncSupported()Z at org.spark_project.jetty.servlet.DefaultServlet.sendData(DefaultServlet.java:936) at org.spark_project.jetty.servlet.DefaultServlet.doGet(DefaultServlet.java:525) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845) at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180) at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511) at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112) at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461) at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213) at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) at org.spark_project.jetty.server.Server.handle(Server.java:524) at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319) at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253) at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273) at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95) at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) at java.lang.Thread.run(Thread.java:745)
描述:沒有找到javax.servlet.http.HttpServletRequest類的isAsyncSupported方法
解決
搜尋HttpServletRequest類
刪除所有引用的servlet-api-2.X.jar,它的javax.servlet.http.HttpServletRequest類沒有isAsyncSupported方法,且覆蓋了有該方法的servlet-api-3.X.jar