【轉】Spark Streaming和Kafka整合開發指南
基於Receivers的方法
這個方法使用了Receivers來接收數據。Receivers的實現使用到Kafka高層次的消費者API。對於所有的Receivers,接收到的數據將會保存在Spark executors中,然後由Spark Streaming啟動的Job來處理這些數據。
然而,在默認的配置下,這種方法在失敗的情況下會丟失數據,為了保證零數據丟失,你可以在Spark Streaming中使用WAL日誌,這是在Spark 1.2.0才引入的功能,這使得我們可以將接收到的數據保存到WAL中(WAL日誌可以存儲在HDFS上),所以在失敗的時候,我們可以從WAL中恢復,而不至於丟失數據。
下面,我將介紹如何使用這種方法來接收數據。
1、引入依賴。
對於Scala和Java項目,你可以在你的pom.xml文件引入以下依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2. 10 </artifactId>
<version> 1.3 . 0 </version>
</dependency>
|
如果你是使用SBT,可以這麽引入:
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.0"
|
2、編程
在Streaming程序中,引入KafkaUtils,並創建一個輸入DStream:
import org.apache.spark.streaming.kafka. _
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) |
在創建DStream的時候,你也可以指定數據的Key和Value類型,並指定相應的解碼類。
需要註意的是:1、Kafka中Topic的分區和Spark Streaming生成的RDD中分區不是一個概念。所以,在
KafkaUtils.createStream()
增加特定主題分區數僅僅是增加一個receiver中消費Topic的線程數。並不增加Spark並行處理數據的數量;
2、對於不同的Group和tpoic我們可以使用多個receivers創建不同的DStreams來並行接收數據;
3、如果你啟用了WAL,這些接收到的數據將會被持久化到日誌中,因此,我們需要將storage level 設置為StorageLevel.MEMORY_AND_DISK_SER
,也就是:
KafkaUtils.createStream(..., StorageLevel.MEMORY _ AND _ DISK _ SER)
|
3、部署
對應任何的Spark 應用,我們都是用spark-submit
來啟動你的應用程序,對於Scala和Java用戶,如果你使用的是SBT或者是Maven,你可以將spark-streaming-kafka_2.10及其依賴打包進應用程序的Jar文件中,並確保spark-core_2.10和 spark-streaming_2.10標記為provided,因為它們在Spark 安裝包中已經存在:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming _ 2.10 </artifactId>
<version> 1.3 . 0 </version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core _ 2.10 </artifactId>
<version> 1.3 . 0 </version>
<scope>provided</scope>
</dependency>
|
然後使用spark-submit來啟動你的應用程序。
當然,你也可以不在應用程序Jar文件中打包spark-streaming-kafka_2.10及其依賴,我們可以在spark-submit後面加上--jars參數也可以運行你的程序:
[[email protected] spark]$ spark-1.3.0-bin-2.6.0 /bin/spark-submit --master yarn-cluster
--class iteblog.KafkaTest
--jars lib /spark-streaming-kafka_2 .10-1.3.0.jar,
lib /spark-streaming_2 .10-1.3.0.jar,
lib /kafka_2 .10-0.8.1.1.jar,lib /zkclient-0 .3.jar,
lib /metrics-core-2 .2.0.jar . /iteblog-1 .0-SNAPSHOT.jar
|
下面是一個完整的例子:
object KafkaWordCount {
def main(args : Array[String]) {
if (args.length < 4 ) {
System.err.println( "Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>" )
System.exit( 1 )
}
StreamingExamples.setStreamingLogLevels()
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName( "KafkaWordCount" )
val ssc = new StreamingContext(sparkConf, Seconds( 2 ))
ssc.checkpoint( "checkpoint" )
val topicMap = topics.split( "," ).map(( _ ,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map( _ . _ 2 )
val words = lines.flatMap( _ .split( " " ))
val wordCounts = words.map(x = > (x, 1 L))
.reduceByKeyAndWindow( _ + _ , _ - _ , Minutes( 10 ), Seconds( 2 ), 2 )
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
|
Direct的方法
和基於Receiver接收數據不一樣,這種方式定期地從Kafka的topic+partition中查詢最新的偏移量,再根據定義的偏移量範圍在每個batch裏面處理數據。當作業需要處理的數據來臨時,spark通過調用Kafka的簡單消費者API讀取一定範圍的數據。這個特性目前還處於試驗階段,而且僅僅在Scala和Java語言中提供相應的API。
和基於Receiver方式相比,這種方式主要有一些幾個優點:
(1)、簡化並行。我們不需要創建多個Kafka 輸入流,然後union他們。而使用directStream,Spark Streaming將會創建和Kafka分區一樣的RDD分區個數,而且會從Kafka並行地讀取數據,也就是說Spark分區將會和Kafka分區有一一對應的關系,這對我們來說很容易理解和使用;
(2)、高效。第一種實現零數據丟失是通過將數據預先保存在WAL中,這將會復制一遍數據,這種方式實際上很不高效,因為這導致了數據被拷貝兩次:一次是被Kafka復制;另一次是寫到WAL中。但是本文介紹的方法因為沒有Receiver,從而消除了這個問題,所以不需要WAL日誌;
(3)、恰好一次語義(Exactly-once semantics)。《Spark Streaming和Kafka整合開發指南(一)》文章中通過使用Kafka高層次的API把偏移量寫入Zookeeper中,這是讀取Kafka中數據的傳統方法。雖然這種方法可以保證零數據丟失,但是還是存在一些情況導致數據會丟失,因為在失敗情況下通過Spark Streaming讀取偏移量和Zookeeper中存儲的偏移量可能不一致。而本文提到的方法是通過Kafka低層次的API,並沒有使用到Zookeeper,偏移量僅僅被Spark Streaming保存在Checkpoint中。這就消除了Spark Streaming和Zookeeper中偏移量的不一致,而且可以保證每個記錄僅僅被Spark Streaming讀取一次,即使是出現故障。
但是本方法唯一的壞處就是沒有更新Zookeeper中的偏移量,所以基於Zookeeper的Kafka監控工具將會無法顯示消費的狀況。然而你可以通過Spark提供的API手動地將偏移量寫入到Zookeeper中。如何使用呢?其實和方法一差不多
1、引入依賴。
對於Scala和Java項目,你可以在你的pom.xml文件引入以下依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2. 10 </artifactId>
<version> 1.3 . 0 </version>
</dependency>
|
如果你是使用SBT,可以這麽引入:
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.3.0"
|
2、編程
在Streaming應用程序代碼中,引入KafkaUtils ,並創建DStream輸入流:
import org.apache.spark.streaming.kafka. _
val directKafkaStream = KafkaUtils.createDirectStream[
[key class ], [value class ], [key decoder class ], [value decoder class ] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
|
在 Kafka parameters參數中,你必須指定 metadata.broker.list或者bootstrap.servers參數。在默認情況下,Spark Streaming將會使用最大的偏移量來讀取Kafka每個分區的數據。如果你配置了auto.offset.reset為smallest,那麽它將會從最小的偏移量開始消費。
當然,你也可以使用KafkaUtils.createDirectStream的另一個版本從任意的位置消費數據。如果你想回去每個batch中Kafka的偏移量,你可以如下操作:
directKafkaStream.foreachRDD { rdd = >
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
// offsetRanges.length = # of Kafka partitions being consumed
...
}
|
你可以通過這種方式來手動地更新Zookeeper裏面的偏移量,使得基於Zookeeper偏移量的Kafka監控工具可以使用。
還有一點需要註意,因為這裏介紹的方法沒有使用到Receiver,所以Spark中關於spark.streaming.receiver.*相關的配置參數將不會對創建DStreams 有影響。我們可以使用spark.streaming.kafka.*參數進行配置。
3、部署
對應任何的Spark 應用,我們都是用spark-submit
來啟動你的應用程序,對於Scala和Java用戶,如果你使用的是SBT或者是Maven,你可以將spark-streaming-kafka_2.10及其依賴打包進應用程序的Jar文件中,並確保spark-core_2.10和 spark-streaming_2.10標記為provided,因為它們在Spark 安裝包中已經存在:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming _ 2.10 </artifactId>
<version> 1.3 . 0 </version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core _ 2.10 </artifactId>
<version> 1.3 . 0 </version>
<scope>provided</scope>
</dependency>
|
然後使用spark-submit來啟動你的應用程序。
【轉】Spark Streaming和Kafka整合開發指南