Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(直接讀取方式)
阿新 • • 發佈:2018-12-04
叢集分配如下:
192.168.58.11 spark01
192.168.58.12 spark02
192.168.58.13 spark03
spark版本:spark-2.1.0-bin-hadoop2.7
kafka版本:kafka_2.11-2.0.0
Spark Streaming程式
package com.kk.sparkstreaming.kafka import org.apache.log4j.Level import org.apache.log4j.Logger import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import kafka.serializer.StringDecoder import org.apache.spark.streaming.kafka.KafkaUtils import java.sql.DriverManager import java.sql.PreparedStatement import java.sql.Connection object KafkaDirect { def main(args: Array[String]): Unit = { // 減少日誌輸出 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) val sparkConf = new SparkConf().setAppName("KafkaDirect").setMaster("local[2]") val sparkStreaming = new StreamingContext(sparkConf, Seconds(1)) // 建立topic名稱 val topic = Set("kevin") // 制定Kafka的broker地址 val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.58.11:9092") // 建立DStream,接受kafka資料irectStream[String, String, StringDecoder,StringDecoder](sparkStreaming, kafkaParams, topic) val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](sparkStreaming, kafkaParams, topic) val line = kafkaStream.map(e => { new String(e.toString()) }) // 獲取資料 val logRDD = kafkaStream.map(_._2) // 將資料列印在螢幕 logRDD.print() // 對接受的資料進行分詞處理 val datas = logRDD.map(line => { // 1,201.105.101.108,http://mystore.jsp/?productid=1,2017020029,2,1 val index: Array[String] = line.split(",") val ip = index(1); (ip, 1) }) // 列印在螢幕 datas.print() // 將資料儲存在mysql資料庫 datas.foreachRDD(cs => { var conn: Connection = null; var ps: PreparedStatement = null; try { Class.forName("com.mysql.jdbc.Driver").newInstance(); cs.foreachPartition(f => { conn = DriverManager.getConnection("jdbc:mysql://192.168.58.14:3306/storm?useUnicode=true&characterEncoding=utf8", "root", "kevin"); ps = conn.prepareStatement("insert into result values(?,?)"); f.foreach(s => { ps.setString(1, s._1); ps.setInt(2, s._2); ps.executeUpdate(); }) }) } catch { case t: Throwable => t.printStackTrace() // TODO: handle error } finally { if (ps != null) { ps.close() } if (conn != null) { conn.close(); } } }) sparkStreaming.start() sparkStreaming.awaitTermination() } }
pom.xml檔案
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spark.version>2.2.1</spark.version> <scala.version>2.11.1</scala.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.8</version> </dependency> </dependencies>
測試
1.啟動kafka叢集,建立訊息的生產者
2.將程式提交至叢集執行
叢集執行出現異常,jar包衝突
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86) at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85) at scala.util.Either$RightProjection.flatMap(Either.scala:523)
解決方法
需將kafak的jar包替換成kafka_2.11-0.8.2.1.jar,複製在saprk每個節點的jars目錄下