1. 程式人生 > >Spark Streaming 和kafka 整合指導(kafka 0.8.2.1 或以上版本)

Spark Streaming 和kafka 整合指導(kafka 0.8.2.1 或以上版本)

本節介紹一下如何配置Spark Streaming 來接收kafka的資料。有兩個方法:
1、老的方法 -使用Receivers 和kafka的高階API
2、新的方法( Spark 1.3 開始引入)-不適用Receivers。這兩個方式擁有不同的程式設計模型,效能特徵和語義保證,為了獲得更多細節,繼續往下讀。對於目前的版本的spark。這兩個方式都是穩定的。

方法1 基於Receiver的 方式

這個方法使用了一個Receiver 接收資料。這個Receiver 是用kafka 高階的 consumer的api實現的。對於所有的receiver,通過Receiver 接收的kafka的資料會被儲存到Spark的executors,然後 Spark Streaming 啟動jobs處理資料。
然而 預設配置下,這個方式在失敗的情況下回丟失資料(參考 

receiver reliability.
)。為了保證零資料丟失,你必須在Spark Streaming (introduced in Spark 1.2)額外的開啟Write Ahead Logs。這會同步的把接受的到kafka的資料寫入到分散式系統(比如 HDFS) ahead logs 中,因此 所有的資料都可以在失敗的時候進行恢復。 參考 Deploying section
以獲取更多的關於 Write Ahead Logs.的資訊。

下面, 我們討論下如何在你的streaming的應用中 使用這個方法。
1、Linking: 對於Scala/Java 應用使用SBT/MAven的專案定義,連線到你的streaming的應用使用如下的artifact。

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

2、程式設計: 如下所示

 import org.apache.spark.streaming.kafka._
 val kafkaStream = KafkaUtils.createStream(streamingContext,
 [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

你也可以通過使用 createStream 的引數指定key和value的類和以及它們響應的解碼的類。參考See the 

API docsand the example.

需要記住的點

  • kafka中topic的分割槽和streaming中的RDD的分割槽是不一致的。因此增加在KafkaUtils.createStream()中增加topic的分割槽數只能增加使用消費這些topic的單臺機器上的執行緒數。
  • 為了接受並行的資料可以使用多個receiver 。不同的分組和topics可以建立多個kafka的DStreams.
  • 如果你在一個可複製的檔案系統內(比如hdfs)開啟了Write Ahead Logs ,接收到的資料,接收到的資料已經在log中被複制了。因而,輸入流的儲存的等級是 StorageLevel.MEMORY_AND_DISK_SER。也就是說KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)).

3、 部署
和其他的Spark的應用一樣,spark-submit 用來提交你的應用。然而 Scala/java 與python 在細節方面有稍微的不同。
對於Scala和Java的應用,如果你正在使用SBT或者maven管理專案,會把spark-streaming-kafka-0-8_2.11 和他的依賴打包到應用的JAR。需要確保spark-core_2.11 和spark-streaming_2.11 被標記為provided,因為這些在Spark的安裝中就已經有了。然後使用spark-submit 提交你的應用(參見主程式設計指南中的部署部分)。
Python應用沒有SBT和maven專案管理,spark-streaming-kafka-0-8_2.11 和它的依賴可以直接新增到Spark 要提交的包中(見應用提交指南)。

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...

或者,你也可以從maven的倉庫下載 Maven 的 spark-streaming-kafka-0-8-assembly ,把它加到你要提交 spark-submit 通過--jars的形式。

方法2 :Direct Approcach(沒有接收器)

在Spark1.3中引入了這種新的接收器較少的“直接”方法,以確保更強的端到端保證。該方法不使用接收者接收資料,而是週期性地查詢Kafka在每個主題+分割槽中的最新偏移量,並據此定義每批(batch)處理的偏移範圍。當處理資料的作業被啟動時,Kafka的簡單消費者API用於讀取Kafka定義的偏移範圍(類似於檔案系統中的讀取檔案)。注意這個功能被引入是Spark1.3 在Scala和java API,Spark 1.4 才提供Python API。

這種方法比基於接收者的方法(即方法1)具有以下優點。

  • 簡化的並行性:無需建立多個輸入Kfaka流並將它們合併。使用DirectStream, Spark Streaming 將建立和Kafka分割槽一樣多的RDD分割槽進行消費,將所有kafka的資料並行讀。所以Kafka和RDD的分割槽之間是一對一的對映,這樣比較容易理解和調整。
  • 效率:在第一種方法中實現零資料丟失要求將資料儲存在Write Ahead Log,中,從而進一步複製資料。
    這實際上是低效的,因為資料複製了兩次,一次是Kafka,第二次是 Write Ahead Log。第二種方法消除了問題,因為沒有接收器,因此不需要Write Ahead Log。只要Kafka有儲存,訊息就可以從Kafka中恢復。
  • Exactly-once 語義:第一種方法使用Kafka的高層API在ZooKeeper儲存消費的偏移量。從傳統上來說,這是從Kafka獲取資料的方式。雖然這種方法(與write ahead logs相結合)可以確保零資料丟失(即至少一次語義),但有些記錄在某些故障下可能會被消費 兩次。這是因為Spark Streaming 接收的資料之間的不一致性以及Zookeeper中的偏移量。因而,在第二個方法中,我們使用簡單的不使用Zookeeper 的Kafka的API。Spark Streaming用checkpoints跟蹤偏移量 。這樣消除了Spark Streaming 和Zookeeper/kafka的不一致性,所以無論是否失敗,Spark Streaming 每個記錄都只會被恰好接收一次。為了實現輸出結果的exactly-once 的語義,你儲存資料的操作必須要麼是冪等的,要麼是原子的(儲存結果和偏移量)。(請參閱主程式設計指南中的輸出操作的語義資訊)。
    注意,這種方法的一個缺點是它不更新ZK中的偏移量, 因此Zookeeper-based Kafka 監控器不會顯示進度。然而,你可以通過這個方式在每個batch中訪問偏移量和更新Zookeeper (見下文)。
    接下來,我們將討論如何在Streaming 的應用程式中使用此方法。

- 連結:這種方法只支援在scala/ java應用程式。你的SBT/Maven專案使用如下的artifact(見連結,在進一步資訊的主要程式設計指南部分)。

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

- 程式設計: 在streaming的應用的程式碼中,引入kafkautils,並建立如下輸入dstream。

import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
 [key class], [value class], [key decoder class], [value decoder class] ](
 streamingContext, [map of Kafka parameters], [set of topics to consume])

你也可以傳入messagehandler到createdirectstream訪問messageandmetadata包含有關當前訊息元資料,並將它轉換為任意型別。請參見API文件和示例。

在Kafka的引數中,您必須指定metadata.broker.list或bootstrap.servers。預設情況下,它將從每個Kafka分割槽的最新偏移量開始消費。如果你設定的配置auto.offset.reset 引數smallest,它將從最小的偏移處開始消費。
你也可以使用的其他變化kafkautils.createdirectstream,從任意的偏移開始消費。此外,如果您希望訪問每個batch中消耗的Kafka偏移量,您可以執行以下操作。

// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array.empty[OffsetRange]

directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
       ...
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
   println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
 }
 ...
 }