1. 程式人生 > >Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(直接讀取方式)

Spark Streaming整合Kafka,Mysql,實時儲存資料到Mysql(直接讀取方式)

叢集分配如下:

192.168.58.11	spark01
192.168.58.12	spark02
192.168.58.13	spark03
spark版本:spark-2.1.0-bin-hadoop2.7
kafka版本:kafka_2.11-2.0.0

Spark Streaming程式

package com.kk.sparkstreaming.kafka

import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.Connection

object KafkaDirect {

  def main(args: Array[String]): Unit = {

    // 減少日誌輸出
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val sparkConf = new SparkConf().setAppName("KafkaDirect").setMaster("local[2]")

    val sparkStreaming = new StreamingContext(sparkConf, Seconds(1))

    // 建立topic名稱
    val topic = Set("kevin")

    // 制定Kafka的broker地址
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.58.11:9092")

    // 建立DStream,接受kafka資料irectStream[String, String, StringDecoder,StringDecoder](sparkStreaming, kafkaParams, topic)
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](sparkStreaming, kafkaParams, topic)

    val line = kafkaStream.map(e => {
      new String(e.toString())
    })

    // 獲取資料
    val logRDD = kafkaStream.map(_._2)
    // 將資料列印在螢幕
    logRDD.print()

    // 對接受的資料進行分詞處理
    val datas = logRDD.map(line => {
      // 1,201.105.101.108,http://mystore.jsp/?productid=1,2017020029,2,1
      val index: Array[String] = line.split(",")
      val ip = index(1);
      (ip, 1)
    })

    // 列印在螢幕
    datas.print()

    // 將資料儲存在mysql資料庫
    datas.foreachRDD(cs => {
      var conn: Connection = null;
      var ps: PreparedStatement = null;
      try {
        Class.forName("com.mysql.jdbc.Driver").newInstance();
        cs.foreachPartition(f => {
          conn = DriverManager.getConnection("jdbc:mysql://192.168.58.14:3306/storm?useUnicode=true&characterEncoding=utf8", "root", "kevin");
          ps = conn.prepareStatement("insert into result values(?,?)");
          f.foreach(s => {
            ps.setString(1, s._1);
            ps.setInt(2, s._2);
            ps.executeUpdate();
          })
        })
      } catch {
        case t: Throwable => t.printStackTrace() // TODO: handle error
      } finally {
        if (ps != null) {
          ps.close()
        }
        if (conn != null) {
          conn.close();
        }
      }
    })

    sparkStreaming.start()
    sparkStreaming.awaitTermination()
  }
}

pom.xml檔案

<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<spark.version>2.2.1</spark.version>
	<scala.version>2.11.1</scala.version>
</properties>

<dependencies>

	<dependency>
		<groupId>org.scala-lang</groupId>
		<artifactId>scala-library</artifactId>
		<version>${scala.version}</version>
	</dependency>
	
	<dependency>
		<groupId>org.scala-lang</groupId>
		<artifactId>scala-compiler</artifactId>
		<version>${scala.version}</version>
	</dependency>
	
	<dependency>
		<groupId>org.scala-lang</groupId>
		<artifactId>scala-reflect</artifactId>
		<version>${scala.version}</version>
	</dependency>
	
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.11</artifactId>
		<version>${spark.version}</version>
	</dependency>
	
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming_2.11</artifactId>
		<version>${spark.version}</version>
	</dependency>
	
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-sql_2.11</artifactId>
		<version>${spark.version}</version>
	</dependency>
	
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
		<version>2.1.1</version>
	</dependency>

	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.8</version>
	</dependency>

</dependencies>

測試
1.啟動kafka叢集,建立訊息的生產者
2.將程式提交至叢集執行
叢集執行出現異常,jar包衝突

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
	at scala.Option.map(Option.scala:145)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:87)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:86)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
	at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:86)
	at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:85)
	at scala.util.Either$RightProjection.flatMap(Either.scala:523)

解決方法
需將kafak的jar包替換成kafka_2.11-0.8.2.1.jar,複製在saprk每個節點的jars目錄下