kafka(六):與spark streaming對接,spark streaming接收kafka資料來源
阿新 • • 發佈:2018-11-08
1.功能實現
spark streaming從kafka接收資料,有兩種方式,receiver和direct兩種方式。
2.pom依賴
針對kafka_2.10-0.8.2.1版本
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.8.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.0</version> </dependency>
3.scala程式碼
(1)receiver方式
package stream import kafka.serializer.StringDecoder import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} //import org.apache.spark.streaming.Kafka.ka /** * */ object UseReceiveKafkaStreaming08 extends App{ val conf = new SparkConf() .setMaster("local[*]") .setAppName("UseReceiveKafkaStreaming") .set("spark.streaming.blockInterval","1s") val sc = SparkContext.getOrCreate(conf) // val sc = SparkUtil.createSparkContext(true,"StreamingWC") val ssc = new StreamingContext(sc,Seconds(10)) //獲取資料來源 /** * s */ val zkQuorum="bigdata.ibeifeng.com:2181/kafka08" val topics=Map[String,Int]("beifeng1"-> 5) val groupId="sparkstreaming" //API1: val kafkaDStream=KafkaUtils .createStream(ssc,zkQuorum,groupId,topics,StorageLevel.MEMORY_AND_DISK_SER_2) .map(word=>(word._2,1)) .reduceByKey(_ + _) //===============================上面是API 1已經被驗證!============================= kafkaDStream.print() ssc.start() ssc.awaitTermination() }
(2)direct方式
package stream import kafka.serializer.StringDecoder import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} //import org.apache.spark.streaming.Kafka.ka /** * Created by Administrator on 2018/8/5. */ object UseReceiveKafkaStreaming08 extends App{ val conf = new SparkConf() .setMaster("local[*]") .setAppName("UseReceiveKafkaStreaming") .set("spark.streaming.blockInterval","1s") val sc = SparkContext.getOrCreate(conf) // val sc = SparkUtil.createSparkContext(true,"StreamingWC") val ssc = new StreamingContext(sc,Seconds(10)) //獲取資料來源 /** * s */ val zkQuorum="bigdata.ibeifeng.com:2181/kafka08" val topics=Map[String,Int]("beifeng1"-> 5) val groupId="sparkstreaming" //===============================下面是API 2============================= /** * def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( ssc: StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel ): ReceiverInputDStream[(K, V)] = { val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) } */ val kafkaParams: Map[String, String] = Map[String,String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000", //largest "auto.offset.reset" -> "smallest") //API2: val kafkaDStream = KafkaUtils.createStream[String,String, StringDecoder,StringDecoder](ssc,kafkaParams,topics,StorageLevel.MEMORY_AND_DISK) .flatMap(line => line._2.split(" ")) .map(word => (word,1)) .reduceByKey(_ + _) kafkaDStream.print() ssc.start() ssc.awaitTermination() //===============================上面是API 2已經被驗證! }