1. 程式人生 > >大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)

大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)

我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。

接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東西效率也就越高。

使用之前的方式是要連線到zookeeper的,而現在的方式則不需要。

程式碼如下:

package com.test.sparkStreaming



import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}


object KafkaDirectStream {
  val updateFunc = (it : Iterator[(String, Seq[Int], Option[Int])]) => {
    it.map{case (w,s,o) => (w,s.sum + o.getOrElse(0))}
  }
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[*]")
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(5))
    ssc.checkpoint("./ck")
    //跟Kafka整合(直連方式,呼叫Kafka底層的消費資料的API)
    val brokerList = "marshal:9092,marshal01:9092,marshal02:9092,marshal03:9092,marshal04:9092,marshal05:9092"
    val kafkaParams = Map[String,Object](
        "bootstrap.servers" -> brokerList,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
                "group.id" -> "g100",
      //這個代表,任務啟動之前產生的資料也要讀
                "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> (false:java.lang.Boolean)
    )
    val topics = Array("wordcount")
    /**
      *   指定kafka資料來源
      *   ssc:StreamingContext的例項
      *   LocationStrategies:位置策略,如果kafka的broker節點跟Executor在同一臺機器上給一種策略,不在一臺機器上給另外一種策略
      *       設定策略後會以最優的策略進行獲取資料
      *       一般在企業中kafka節點跟Executor不會放到一臺機器的,原因是kakfa是訊息儲存的,Executor用來做訊息的計算,
      *       因此計算與儲存分開,儲存對磁碟要求高,計算對記憶體、CPU要求高
      *       如果Executor節點跟Broker節點在一起的話使用PreferBrokers策略,如果不在一起的話使用PreferConsistent策略
      *       使用PreferConsistent策略的話,將來在kafka中拉取了資料以後儘量將資料分散到所有的Executor上
      *   ConsumerStrategies:消費者策略(指定如何消費)
      *
      */
    val directStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](topics,kafkaParams)
    )
    val result: DStream[(String, Int)] = directStream.map(_.value()).flatMap(_.split(" "))
      .map((_, 1))
      .updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    result.print()
    directStream.foreachRDD(rdd => {
      val offsetRange: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record =>(record.key,record.value))
      //計算邏輯
      maped.foreach(println)
      //迴圈輸出
      for(o<-offsetRange){
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }

}

執行結果如下: