1. 程式人生 > >Kafka結合Spark-streaming 的兩種連線方式(AWL與直連)

Kafka結合Spark-streaming 的兩種連線方式(AWL與直連)

kafka結合spark-streaming的用法及說明之前部落格有些,這裡就不贅述了。

這篇文章說下他們結合使用的兩種連線方式。(AWL與直連)

先看一張圖:

這是kafka與streaming結合的基本方式,如圖spark叢集中的 worker節點中 exeutor執行緒裡的 receiver介面會一直消費kafka中的資料,那麼問題來了,假如我們定義5秒消費一次,如果spark叢集定義了每個worker使用的cpu資源不足以消費完了這5秒的資料,那麼就會出現資料的丟失,消費不了的那些資料就沒了,並且streaming一經啟動會一直迴圈消費拉取資源,如果出現上述問題,分配的cpu不足以消費5秒拉取的資料,那麼丟失的資料便會越積越多,這在程式裡是嚴重的bug。

現在處理這種情況有兩種方式,write ahead logs (AWL) 方式與直連方式。

上面那種圖說的就是awl方式,這種方式是以前常用的方式,我們在程式裡對streamingContext初始化後,得到她的物件進行checkpoint("hdfs://xxxx/xx") 即可,這樣程式會預設把日誌偏移量存到hdfs上面做備份,防止資料丟失,但是這樣會影響效能。

另外一種方式也是公司常用的方式就是 直連方式,如圖:

直連方式就是使用executor直接連線kakfa節點,我們自定義偏移量的使用大小及儲存備份方法。

1.直連方式從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,實現零資料丟失,保證不重複消費,比createStream更高效;

2.建立的DStream的rdd的partition做到了和Kafka中topic的partition一一對應。

1.3版本可以直接通過低階API從kafka的topic消費訊息,並且不再向zookeeper中更新consumer offsets,使得基於zookeeper的consumer offsets的監控工具都會失效。所以更新zookeeper中的consumer offsets還需要自己去實現,並且官方提供的兩個createDirectStream過載並不能很好的滿足我的需求,需要進一步封裝。

因此,在採用直連的方式消費kafka中的資料的時候,大體思路是首先獲取儲存在zookeeper中的偏移量資訊,根據偏移量資訊去建立stream,消費資料後再把當前的偏移量寫入zk中。

下面是在網上搜的並親測可用的直連方式的工具程式碼,大家有需要可以參考下。(wordcount求和程式)


package cn.itcast.spark.day5

import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object DirectKafkaWordCount {

  /*  def dealLine(line: String): String = {
      val list = line.split(',').toList
  //    val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函式當做split即可
      list.get(0).substring(0, 10) + "-" + list.get(26)
    }*/

  def processRdd(rdd: RDD[(String, String)]): Unit = {
    val lines = rdd.map(_._2)
    val words = lines.map(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.foreach(println)
  }

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(
        s"""
           |Usage: DirectKafkaWordCount <brokers> <topics> <groupid>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |  <groupid> is a consume group
           |
        """.stripMargin)
      System.exit(1)
    }

    Logger.getLogger("org").setLevel(Level.WARN)

    val Array(brokers, topics, groupId) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    sparkConf.setMaster("local[*]")//最大cpu核數
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5") //每秒拉取5條
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val ssc = new StreamingContext(sparkConf, Seconds(2)) //兩秒一次

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,  //直接連線 brokers,不用通過zk連線管理元資料了
      "group.id" -> groupId,
      "auto.offset.reset" -> "smallest"
    )

    val km = new KafkaManager(kafkaParams)

    val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    messages.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        // 先處理訊息
        processRdd(rdd)
        // 再更新offsets
        km.updateZKOffsets(rdd)
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

package org.apache.spark.streaming.kafka

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset

import scala.reflect.ClassTag

/**
  * 自己管理offset
  */
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {

  private val kc = new KafkaCluster(kafkaParams)

  /**
    * 建立資料流
    */
  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
                                                                                                            ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] =  {
    val groupId = kafkaParams.get("group.id").get
    // 在zookeeper上讀取offsets前先根據實際情況更新offsets
    setOrUpdateOffsets(topics, groupId)

    //從zookeeper上讀取offset開始消費message
    val messages = {
      val partitionsE = kc.getPartitions(topics)
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft)
        throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
      val consumerOffsets = consumerOffsetsE.right.get
      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    }
    messages
  }

  /**
    * 建立資料流前,根據實際消費情況更新消費offsets
    * @param topics
    * @param groupId
    */
  private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach(topic => {
      var hasConsumed = true
      val partitionsE = kc.getPartitions(Set(topic))
      if (partitionsE.isLeft)
        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
      val partitions = partitionsE.right.get
      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
      if (consumerOffsetsE.isLeft) hasConsumed = false
      if (hasConsumed) {// 消費過
        /**
          * 如果streaming程式執行的時候出現kafka.common.OffsetOutOfRangeException,
          * 說明zk上儲存的offsets已經過時了,即kafka的定時清理策略已經將包含該offsets的檔案刪除。
          * 針對這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
          * 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時,
          * 這時把consumerOffsets更新為earliestLeaderOffsets
          */
        val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
        if (earliestLeaderOffsetsE.isLeft)
          throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
        val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
        val consumerOffsets = consumerOffsetsE.right.get

        // 可能只是存在部分分割槽consumerOffsets過時,所以只更新過時分割槽的consumerOffsets為earliestLeaderOffsets
        var offsets: Map[TopicAndPartition, Long] = Map()
        consumerOffsets.foreach({ case(tp, n) =>
          val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
          if (n < earliestLeaderOffset) {
            println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
              " offsets已經過時,更新為" + earliestLeaderOffset)
            offsets += (tp -> earliestLeaderOffset)
          }
        })
        if (!offsets.isEmpty) {
          kc.setConsumerOffsets(groupId, offsets)
        }
      } else {// 沒有消費過
      val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
        var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
        if (reset == Some("smallest")) {
          val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        } else {
          val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
          if (leaderOffsetsE.isLeft)
            throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
          leaderOffsets = leaderOffsetsE.right.get
        }
        val offsets = leaderOffsets.map {
          case (tp, offset) => (tp, offset.offset)
        }
        kc.setConsumerOffsets(groupId, offsets)
      }
    })
  }

  /**
    * 更新zookeeper上的消費offsets
    * @param rdd
    */
  def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {
      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
      if (o.isLeft) {
        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
      }
    }
  }
}

Direct 讀取問題
在實際的應用中,Direct Approach方式很好地滿足了我們的需要,與Receiver-based方式相比,有以下幾方面的優勢:
1,降低資源。Direct不需要Receivers,其申請的Executors全部參與到計算任務中;而Receiver-based則需要專門的Receivers來讀取Kafka資料且不參與計算。因此相同的資源申請,Direct 能夠支援更大的業務。
2,降低記憶體。Receiver-based的Receiver與其他Exectuor是非同步的,並持續不斷接收資料,對於小業務量的場景還好,如果遇到大業務量時,需要提高Receiver的記憶體,但是參與計算的Executor並無需那麼多的記憶體。而Direct 因為沒有Receiver,而是在計算時讀取資料,然後直接計算,所以對記憶體的要求很低。實際應用中我們可以把原先的10G降至現在的2-4G左右。
3,魯棒性更好。Receiver-based方法需要Receivers來非同步持續不斷的讀取資料,因此遇到網路、儲存負載等因素,導致實時任務出現堆積,但Receivers卻還在持續讀取資料,此種情況很容易導致計算崩潰。Direct 則沒有這種顧慮,其Driver在觸發batch 計算任務時,才會讀取資料並計算。隊列出現堆積並不會引起程式的失敗。

但是也存在一些不足,具體如下:
1,提高成本。Direct需要使用者採用checkpoint或者第三方儲存來維護offsets,而不像Receiver-based那樣,通過ZooKeeper來維護Offsets,此提高了使用者的開發成本。
2,監控視覺化。Receiver-based方式指定topic指定consumer的消費情況均能通過ZooKeeper來監控,而Direct則沒有這種便利,如果做到監控並可視化,則需要投入人力開發。