大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)
阿新 • • 發佈:2018-11-09
我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。
接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東西效率也就越高。
使用之前的方式是要連線到zookeeper的,而現在的方式則不需要。
程式碼如下:
package com.test.sparkStreaming import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} object KafkaDirectStream { val updateFunc = (it : Iterator[(String, Seq[Int], Option[Int])]) => { it.map{case (w,s,o) => (w,s.sum + o.getOrElse(0))} } def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[*]") val ssc: StreamingContext = new StreamingContext(conf,Seconds(5)) ssc.checkpoint("./ck") //跟Kafka整合(直連方式,呼叫Kafka底層的消費資料的API) val brokerList = "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092" val kafkaParams = Map[String,Object]( "bootstrap.servers" -> brokerList, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g100", //這個代表,任務啟動之前產生的資料也要讀 "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false:java.lang.Boolean) ) val topics = Array("wordcount") /** * 指定kafka資料來源 * ssc:StreamingContext的例項 * LocationStrategies:位置策略,如果kafka的broker節點跟Executor在同一臺機器上給一種策略,不在一臺機器上給另外一種策略 * 設定策略後會以最優的策略進行獲取資料 * 一般在企業中kafka節點跟Executor不會放到一臺機器的,原因是kakfa是訊息儲存的,Executor用來做訊息的計算, * 因此計算與儲存分開,儲存對磁碟要求高,計算對記憶體、CPU要求高 * 如果Executor節點跟Broker節點在一起的話使用PreferBrokers策略,如果不在一起的話使用PreferConsistent策略 * 使用PreferConsistent策略的話,將來在kafka中拉取了資料以後儘量將資料分散到所有的Executor上 * ConsumerStrategies:消費者策略(指定如何消費) * */ val directStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics,kafkaParams) ) val result: DStream[(String, Int)] = directStream.map(_.value()).flatMap(_.split(" ")) .map((_, 1)) .updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true) result.print() directStream.foreachRDD(rdd => { val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val maped: RDD[(String, String)] = rdd.map(record =>(record.key,record.value)) //計算邏輯 maped.foreach(println) //迴圈輸出 for(o<-offsetRange){ println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") } }) ssc.start() ssc.awaitTermination() } }
執行結果如下: