Spark Streaming通過直連的方式消費Kafka中的資料
阿新 • • 發佈:2019-01-22
為什麼採用直連(createDirectStream)的方式,主要有以下幾個原因:
1.createDirectStream的方式從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,實現零資料丟失,保證不重複消費,比createStream更高效;
但是採用直連(createDirectStream)的方式有一個缺點,就是不再向zookeeper中更新offset資訊。2.建立的DStream的rdd的partition做到了和Kafka中topic的partition一一對應。
因此,在採用直連的方式消費kafka中的資料的時候,大體思路是首先獲取儲存在zookeeper中的偏移量資訊,根據偏移量資訊去建立stream,消費資料後再把當前的偏移量寫入zookeeper中。在建立stream時需要考慮以下幾點:
1.zookeeper中沒有偏移量資訊,此時按照自定義的kafka引數的配置建立stream;
2.zookeeper中儲存了偏移量資訊,但由於各種原因kafka清理掉了該處偏移量的資料,此時需要對偏移量進行修正,否則在執行時會出現偏移量越界的異常。 解決方法是呼叫spark-streaming-kafka API 中 KafkaCluster這個類中的方法獲取broker中實際的最大最小偏移量,和zookeeper中偏移量進行對比來修正偏移量資訊。在2.0以前的版本中KafkaCluster這個類是private許可權的,需要把它拷貝到專案裡使用。2.0以後的版本中修改KafkaCluster的許可權為public,可以盡情呼叫了。
為了方便呼叫,本人在使用時寫了一個KafkaHelper的類,將建立stream和更新zookeeper中offset的程式碼封裝了起來,程式碼如下:
import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.SparkException import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.{KafkaCluster, KafkaUtils, OffsetRange} import org.apache.spark.streaming.kafka.KafkaCluster.Err /** * KafkaHelper類提供兩個共有方法,一個用來建立direct方式的DStream,另一個用來更新zookeeper中的消費偏移量 * @param kafkaPrams kafka配置引數 * @param zkQuorum zookeeper列表 * @param group 消費組 * @param topic 消費主題 */ class KafkaHelper(kafkaPrams:Map[String,String],zkQuorum:String,group:String,topic:String) extends Serializable{ private val kc = new KafkaCluster(kafkaPrams) private val zkClient = new ZkClient(zkQuorum) private val topics = Set(topic) /** * 獲取消費組group下的主題topic在zookeeper中的儲存路徑 * @return */ private def getZkPath():String={ val topicDirs = new ZKGroupTopicDirs(group,topic) val zkPath = topicDirs.consumerOffsetDir zkPath } /** * 獲取偏移量資訊 * @param children 分割槽數 * @param zkPath zookeeper中的topic資訊的路徑 * @param earlistLeaderOffsets broker中的實際最小偏移量 * @param latestLeaderOffsets broker中的實際最大偏移量 * @return */ private def getOffsets(children:Int,zkPath:String,earlistLeaderOffsets:Map[TopicAndPartition, KafkaCluster.LeaderOffset],latestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset]): Map[TopicAndPartition, Long] = { var fromOffsets: Map[TopicAndPartition, Long] = Map() for(i <- 0 until children){ //獲取zookeeper記錄的分割槽偏移量 val zkOffset = zkClient.readData[String](s"${zkPath}/${i}").toLong val tp = TopicAndPartition(topic,i) //獲取broker中實際的最小和最大偏移量 val earlistOffset: Long = earlistLeaderOffsets(tp).offset val latestOffset: Long = latestLeaderOffsets(tp).offset //將實際的偏移量和zookeeper記錄的偏移量進行對比,如果zookeeper中記錄的偏移量在實際的偏移量範圍內則使用zookeeper中的偏移量, //反之,使用實際的broker中的最小偏移量 if(zkOffset>=earlistOffset && zkOffset<=latestOffset) { fromOffsets += (tp -> zkOffset) }else{ fromOffsets += (tp -> earlistOffset) } } fromOffsets } /** * 建立DStream * @param ssc * @return */ def createDirectStream(ssc:StreamingContext):InputDStream[(String, String)]={ //----------------------獲取broker中實際偏移量--------------------------------------------- val partitionsE: Either[Err, Set[TopicAndPartition]] = kc.getPartitions(topics) if(partitionsE.isLeft) throw new SparkException("get kafka partitions failed:") val partitions = partitionsE.right.get val earlistLeaderOffsetsE: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]] = kc.getEarliestLeaderOffsets(partitions) if(earlistLeaderOffsetsE.isLeft) throw new SparkException("get kafka earlistLeaderOffsets failed:") val earlistLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = earlistLeaderOffsetsE.right.get val latestLeaderOffsetsE: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]] = kc.getLatestLeaderOffsets(partitions) if(latestLeaderOffsetsE.isLeft) throw new SparkException("get kafka latestLeaderOffsets failed:") val latestLeaderOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = latestLeaderOffsetsE.right.get //----------------------建立kafkaStream---------------------------------------------------- var kafkaStream:InputDStream[(String, String)]=null val zkPath: String = getZkPath() val children = zkClient.countChildren(zkPath) //根據zookeeper中是否有偏移量資料判斷有沒有消費過kafka中的資料 if(children > 0){ val fromOffsets:Map[TopicAndPartition, Long] = getOffsets(children,zkPath,earlistLeaderOffsets,latestLeaderOffsets) val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message()) //如果消費過,根據偏移量建立Stream kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( ssc, kafkaPrams, fromOffsets, messageHandler) }else{ //如果沒有消費過,根據kafkaPrams配置資訊從最早的資料開始建立Stream kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaPrams, topics) } kafkaStream } /** * 更新zookeeper中的偏移量 * @param offsetRanges */ def updateZkOffsets(offsetRanges:Array[OffsetRange])={ val zkPath: String = getZkPath() for( o <- offsetRanges){ val newZkPath = s"${zkPath}/${o.partition}" //將該 partition 的 offset 儲存到 zookeeper ZkUtils.updatePersistentPath(zkClient, newZkPath, o.fromOffset.toString) } } }
測試程式碼如下:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object TestKafkaHelper {
def main(args: Array[String]): Unit = {
if(args.length<5){
println("Usage:<timeInterval> <brokerList> <zkQuorum> <topic> <group>")
System.exit(1)
}
val Array(timeInterval,brokerList,zkQuorum,topic,group) = args
val conf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(timeInterval.toInt))
//kafka配置引數
val kafkaParams = Map(
"metadata.broker.list" -> brokerList,
"group.id" -> group,
"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
)
val kafkaHelper = new KafkaHelper(kafkaParams,zkQuorum,topic,group)
val kafkaStream: InputDStream[(String, String)] = kafkaHelper.createDirectStream(ssc)
var offsetRanges = Array[OffsetRange]()
kafkaStream.transform( rdd =>{
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}).map( msg => msg._2)
.foreachRDD( rdd => {
rdd.foreachPartition( partition =>{
partition.foreach( record =>{
//處理資料的方法
println(record)
})
})
kafkaHelper.updateZkOffsets(offsetRanges)
})
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}