Kafka結合Spark-streaming 的兩種連線方式(AWL與直連)
kafka結合spark-streaming的用法及說明之前部落格有些,這裡就不贅述了。
這篇文章說下他們結合使用的兩種連線方式。(AWL與直連)
先看一張圖:
這是kafka與streaming結合的基本方式,如圖spark叢集中的 worker節點中 exeutor執行緒裡的 receiver介面會一直消費kafka中的資料,那麼問題來了,假如我們定義5秒消費一次,如果spark叢集定義了每個worker使用的cpu資源不足以消費完了這5秒的資料,那麼就會出現資料的丟失,消費不了的那些資料就沒了,並且streaming一經啟動會一直迴圈消費拉取資源,如果出現上述問題,分配的cpu不足以消費5秒拉取的資料,那麼丟失的資料便會越積越多,這在程式裡是嚴重的bug。
現在處理這種情況有兩種方式,write ahead logs (AWL) 方式與直連方式。
上面那種圖說的就是awl方式,這種方式是以前常用的方式,我們在程式裡對streamingContext初始化後,得到她的物件進行checkpoint("hdfs://xxxx/xx") 即可,這樣程式會預設把日誌偏移量存到hdfs上面做備份,防止資料丟失,但是這樣會影響效能。
另外一種方式也是公司常用的方式就是 直連方式,如圖:
直連方式就是使用executor直接連線kakfa節點,我們自定義偏移量的使用大小及儲存備份方法。
1.直連方式從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,實現零資料丟失,保證不重複消費,比createStream更高效;
2.建立的DStream的rdd的partition做到了和Kafka中topic的partition一一對應。
1.3版本可以直接通過低階API從kafka的topic消費訊息,並且不再向zookeeper中更新consumer offsets,使得基於zookeeper的consumer offsets的監控工具都會失效。所以更新zookeeper中的consumer offsets還需要自己去實現,並且官方提供的兩個createDirectStream過載並不能很好的滿足我的需求,需要進一步封裝。
因此,在採用直連的方式消費kafka中的資料的時候,大體思路是首先獲取儲存在zookeeper中的偏移量資訊,根據偏移量資訊去建立stream,消費資料後再把當前的偏移量寫入zk中。
下面是在網上搜的並親測可用的直連方式的工具程式碼,大家有需要可以參考下。(wordcount求和程式)
package cn.itcast.spark.day5
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectKafkaWordCount {
/* def dealLine(line: String): String = {
val list = line.split(',').toList
// val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函式當做split即可
list.get(0).substring(0, 10) + "-" + list.get(26)
}*/
def processRdd(rdd: RDD[(String, String)]): Unit = {
val lines = rdd.map(_._2)
val words = lines.map(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.foreach(println)
}
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(
s"""
|Usage: DirectKafkaWordCount <brokers> <topics> <groupid>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <groupid> is a consume group
|
""".stripMargin)
System.exit(1)
}
Logger.getLogger("org").setLevel(Level.WARN)
val Array(brokers, topics, groupId) = args
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
sparkConf.setMaster("local[*]")//最大cpu核數
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5") //每秒拉取5條
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sparkConf, Seconds(2)) //兩秒一次
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, //直接連線 brokers,不用通過zk連線管理元資料了
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)
val km = new KafkaManager(kafkaParams)
val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
messages.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
// 先處理訊息
processRdd(rdd)
// 再更新offsets
km.updateZKOffsets(rdd)
}
})
ssc.start()
ssc.awaitTermination()
}
}
package org.apache.spark.streaming.kafka
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import scala.reflect.ClassTag
/**
* 自己管理offset
*/
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {
private val kc = new KafkaCluster(kafkaParams)
/**
* 建立資料流
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](
ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
// 在zookeeper上讀取offsets前先根據實際情況更新offsets
setOrUpdateOffsets(topics, groupId)
//從zookeeper上讀取offset開始消費message
val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
messages
}
/**
* 建立資料流前,根據實際消費情況更新消費offsets
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) {// 消費過
/**
* 如果streaming程式執行的時候出現kafka.common.OffsetOutOfRangeException,
* 說明zk上儲存的offsets已經過時了,即kafka的定時清理策略已經將包含該offsets的檔案刪除。
* 針對這種情況,只要判斷一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets還小的話,說明consumerOffsets已過時,
* 這時把consumerOffsets更新為earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分割槽consumerOffsets過時,所以只更新過時分割槽的consumerOffsets為earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case(tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已經過時,更新為" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {// 沒有消費過
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else {
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}
/**
* 更新zookeeper上的消費offsets
* @param rdd
*/
def updateZKOffsets(rdd: RDD[(String, String)]) : Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}
Direct 讀取問題
在實際的應用中,Direct Approach方式很好地滿足了我們的需要,與Receiver-based方式相比,有以下幾方面的優勢:
1,降低資源。Direct不需要Receivers,其申請的Executors全部參與到計算任務中;而Receiver-based則需要專門的Receivers來讀取Kafka資料且不參與計算。因此相同的資源申請,Direct 能夠支援更大的業務。
2,降低記憶體。Receiver-based的Receiver與其他Exectuor是非同步的,並持續不斷接收資料,對於小業務量的場景還好,如果遇到大業務量時,需要提高Receiver的記憶體,但是參與計算的Executor並無需那麼多的記憶體。而Direct 因為沒有Receiver,而是在計算時讀取資料,然後直接計算,所以對記憶體的要求很低。實際應用中我們可以把原先的10G降至現在的2-4G左右。
3,魯棒性更好。Receiver-based方法需要Receivers來非同步持續不斷的讀取資料,因此遇到網路、儲存負載等因素,導致實時任務出現堆積,但Receivers卻還在持續讀取資料,此種情況很容易導致計算崩潰。Direct 則沒有這種顧慮,其Driver在觸發batch 計算任務時,才會讀取資料並計算。隊列出現堆積並不會引起程式的失敗。
但是也存在一些不足,具體如下:
1,提高成本。Direct需要使用者採用checkpoint或者第三方儲存來維護offsets,而不像Receiver-based那樣,通過ZooKeeper來維護Offsets,此提高了使用者的開發成本。
2,監控視覺化。Receiver-based方式指定topic指定consumer的消費情況均能通過ZooKeeper來監控,而Direct則沒有這種便利,如果做到監控並可視化,則需要投入人力開發。