1. 程式人生 > >SparkSteaming中直連與receiver兩種方式的區別

SparkSteaming中直連與receiver兩種方式的區別

SparkStreaming的Receiver方式和直連方式有什麼區別?

Receiver接收固定時間間隔的資料(放在記憶體中的),使用高階API,自動維護偏移量,達到固定的時間才去進行處理,效率低並且容易丟失資料,靈活性特別差,不好,而且它處理資料的時候,如果某一刻的資料量過大,那麼就會造成磁碟溢寫的情況,他通過WALS進行磁碟寫入。

Receiver實現方式:

程式碼如下:

object KafkaWC02 {


  def main(args: Array[String]): Unit = {


    val conf = new SparkConf().setAppName("kafkaWC").setMaster("local[2]") //設定執行緒數
    val ssc = new StreamingContext(conf, Seconds(5))

    //設定檢查點
    ssc.checkpoint("D:\\data\\checpoint\\checpoint1")
    //接下來編寫kafka的配置資訊
    val zks = "spark01:2181"
    //然後是kafka的消費組
    val groupId = "gp1"
    //Topic的名字  Map的key是Topic名字,第二個引數是執行緒數
    val topics = Map[String, Int]("test02" -> 1)
    //建立kafka的輸入資料流,來獲取kafka中的資料
    val data = KafkaUtils.createStream(ssc, zks, groupId, topics)
    //獲取到的資料是鍵值對的格式(key,value)
    //獲取到的資料是 key是偏移量  value是資料
    //接下來開始處理資料


    val lines = data.flatMap(_._2.split(" "))
    val words = lines.map((_, 1))
    val res = words.updateStateByKey(updateFunc,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)
    res.print()
    //val result = words.reduceByKey(_ + _)
    //val res = result.updateStateByKey[Int](updateFunc)
    //res.print()
    //列印輸出
    //result.print()
    //啟動程式
    ssc.start()
    //等待停止
    ssc.awaitTermination()


  }
  //(iterator:Iteratot[(K,Seq[V]),Option[S]]))
  //傳過來的值是Key   Value型別
  //第一個引數,是我們從kafka獲取到的元素,key  ,String型別
  //第二個引數,是我們進行單詞統計的value值,Int型別
  //第三個引數,是我們每次批次提交的中間結果集
  val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{
    iter.map(t=>{
      (t._1,t._2.sum+t._3.getOrElse(0))
    })
  }
}

  

 

Direct直連方式,

它使用的是底層API實現Offest我們開發人員管理,這樣的話,它的靈活性特別好。並且可以保證資料的安全性,而且不用擔心資料量過大,因為它有預處理機制,進行提前處理,然後再批次提交任務。

Direct實現方式:

程式碼如下:

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}

/**
  * 重要!!!  Direct直連方式
  */
object KafkaDirectWC {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Direct").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Duration(5000))
    //指定組名
    val groupId = "gp01"
    //指定消費的topic名字
    val topic = "tt"
    //指定kafka的Broker地址(SparkStreaming的Task直接連線到Kafka分割槽上,用的是底層API消費)
    val brokerList ="spark:9092"
    //接下來我們要自己維護offset了,將offset儲存到ZK中
    val zkQuorum = "spark:2181"
    //建立stream時使用的topic名字集合,SparkStreaming可以同時消費多個topic
    val topics:Set[String] = Set(topic)
    //建立一個ZkGroupTopicDirs物件,其實是指定往Zk中寫入資料的目錄
    // 用於儲存偏移量
    val TopicDirs = new ZKGroupTopicDirs(groupId,topic)
    //獲取zookeeper中的路徑“/gp01/offset/tt/”
    val zkTopicPath = s"${TopicDirs.consumerOffsetDir}"
    //準備kafka引數
    val kafkas = Map(
      "metadata.broker.list"->brokerList,
      "group.id"->groupId,
      //從頭開始讀取資料
      "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString
    )
    // zookeeper 的host和ip,建立一個client,用於更新偏移量
    // 是zookeeper客戶端,可以從zk中讀取偏移量資料,並更新偏移量
    val zkClient = new ZkClient(zkQuorum)
    //"/gp01/offset/tt/0/10001"
    //"/gp01/offset/tt/1/20001"
    //"/gp01/offset/tt/2/30001"
    val clientOffset = zkClient.countChildren(zkTopicPath)
    // 建立KafkaStream
    var kafkaStream :InputDStream[(String,String)]= null
    //如果zookeeper中有儲存offset 我們會利用這個offset作為KafkaStream的起始位置
    //TopicAndPartition  [/gp01/offset/tt/0/ , 8888]
    var fromOffsets:Map[TopicAndPartition,Long] = Map()
    //如果儲存過offset
    if(clientOffset > 0){
      //clientOffset 的數量其實就是 /gp01/offset/tt的分割槽數目
      for(i<-0 until clientOffset){
        // /gp01/offset/tt/  0/10001
        val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")
        // tt/0
        val tp = TopicAndPartition(topic,i)
        //將不同partition 對應得offset增加到fromoffset中
        // tt/0 -> 10001
        fromOffsets += (tp->partitionOffset.toLong)
      }
      // key 是kafka的key value 就是kafka資料
      // 這個會將kafka的訊息進行transform 最終kafka的資料都會變成(kafka的key,message)這樣的Tuple
      val messageHandler = (mmd:MessageAndMetadata[String,String])=>
        (mmd.key(),mmd.message())
      // 通過kafkaUtils建立直連的DStream
      //[String,String,StringDecoder, StringDecoder,(String,String)]
      // key    value  key解碼方式     value的解碼方式   接收資料的格式
      kafkaStream = KafkaUtils.createDirectStream
        [String,String,StringDecoder,
          StringDecoder,(String,String)](ssc,kafkas,fromOffsets,messageHandler)
    }else{
      //如果未儲存,根據kafkas的配置使用最新的或者最舊的offset
      kafkaStream = KafkaUtils.createDirectStream
        [String,String,StringDecoder,StringDecoder](ssc,kafkas,topics)
    }
    //偏移量範圍
    var offsetRanges = Array[OffsetRange]()
    //從kafka讀取的資料,是批次提交的,那麼這塊注意下,
    // 我們每次進行讀取資料後,需要更新維護一下偏移量
    //那麼我們開始進行取值
    //    val transform = kafkaStream.transform{
    //      rdd=>
    //        //得到該RDD對應得kafka訊息的offset
    //        // 然後獲取偏移量
    //        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //        rdd
    //    }
    //    val mes = transform.map(_._2)
    // 依次迭代DStream中的RDD
    kafkaStream.foreachRDD{
      //對RDD進行操作 觸發Action
      kafkardd=>

        offsetRanges = kafkardd.asInstanceOf[HasOffsetRanges].offsetRanges

        //下面 你就可以怎麼寫都行了,為所欲為
        val maps = kafkardd.map(_._2)

        maps.foreach(println)

        for(o<-offsetRanges){
          // /gp01/offset/tt/  0
          val zkpath = s"${TopicDirs.consumerOffsetDir}/${o.partition}"
          //將該partition的offset儲存到zookeeper中
          // /gp01/offset/tt/  0/88889
          ZkUtils.updatePersistentPath(zkClient,zkpath,o.untilOffset.toString)
        }
    }
    // 啟動
    ssc.start()
    ssc.awaitTermination()
  }
}