1. 程式人生 > >sparkstreaming和kafka整合的兩種方式(最全)

sparkstreaming和kafka整合的兩種方式(最全)

-1,基於接收者的方法

運算元:KafkaUtils.createStream
方法:PUSH,從topic中去推送資料,將資料推送過來
API:呼叫的Kafka高階API
效果:SparkStreaming中的Receivers,恰好Kafka有釋出/訂閱 ,然而:此種方式企業不常用,說明有BUG,不符合企業需求。因為:接收到的資料儲存在Executor的記憶體,會出現資料漏處理或者多處理狀況
解釋:這種方法使用Receiver來接收資料。Receiver是使用Kafka高階消費者API實現的。與所有的接收者一樣,通過Receiver從Kafka接收的資料儲存在Spark執行程式exector中,然後由Spark Streaming啟動的作業處理資料。但是,在預設配置下,這種方法可能會在失敗時丟失資料。為了確保零資料丟失,您必須在Spark Streaming(在Spark 1.2中引入)中額外啟用寫入日誌,同時儲存所有接收到的Kafka資料寫入分散式檔案系統(例如HDFS)的預先寫入日誌,以便所有資料都可以在失敗時恢復。
缺點


①、Kafka中的主題分割槽與Spark Streaming中生成的RDD的分割槽不相關。因此,增加主題特定分割槽KafkaUtils.createStream()的數量只會增加在單個接收器中使用哪些主題消耗的執行緒的數量。在處理資料時不會增加Spark的並行性
②、多個kafka輸入到DStream會建立多個group和topic,用於使用多個接收器並行接收資料
③、如果已經使用HDFS等複製檔案系統啟用了寫入日誌,則接收到的資料已經在日誌中複製。因此,輸入流的儲存級別為儲存級別StorageLevel.MEMORY_AND_DISK_SER

-2,直接方法(無接收者)

運算元

:KafkaUtils.createDirectStream
方式:PULL,到topic中去拉取資料。
API:kafka低階API
效果:每次到Topic的每個分割槽依據偏移量進行獲取資料,拉取資料以後進行處理,可以實現高可用
解釋:在Spark 1.3中引入了這種新的無接收器“直接”方法,以確保更強大的端到端保證。這種方法不是使用接收器來接收資料,而是定期查詢Kafka在每個topic+分partition中的最新偏移量,並相應地定義要在每個批次中處理的偏移量範圍。當處理資料的作業啟動時,Kafka簡單的客戶API用於讀取Kafka中定義的偏移範圍(類似於從檔案系統讀取檔案)。請注意,此功能在Spark 1.3中為Scala和Java API引入,在Spark 1.4中針對Python API引入。
優勢

①、簡化的並行性:不需要建立多個輸入Kafka流並將其合併。與此同時directStream,Spark Streaming將建立與使用Kafka分割槽一樣多的RDD分割槽,這些分割槽將全部從Kafka並行讀取資料。所以在Kafka和RDD分割槽之間有一對一的對映關係,這更容易理解和調整

②、效率:在第一種方法中實現零資料丟失需要將資料儲存在預寫日誌中,這會進一步複製資料。這實際上是效率低下的,因為資料被有效地複製了兩次,一次是由Kafka,另一次是由預先寫入日誌(Write Ahead Log)複製。此方法消除了這個問題,因為沒有接收器,因此不需要預先寫入日誌。只要你有足夠的kafka保留,訊息可以從kafka恢復

③、精確語義:第一種方法是使用Kafka的高階API在Zookeeper中儲存消耗的偏移量。傳統上這是從Kafka消費資料的方式。雖然這種方法(合併日誌)可以確保零資料丟失,但在某些失敗情況下,很小的機率兩次資料都同時丟失,發生這種情況是因為Spark Streaming可靠接收到的資料與Zookeeper跟蹤的偏移之間的不一致。因此,在第二種方法中,我們使用不使用Zookeeper的簡單Kafka API。在其檢查點內,Spark Streaming跟蹤偏移量。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致性,因此Spark Streaming每次記錄都會在發生故障時有效地接收一次。

請注意,這種方法的一個缺點是它不會更新Zookeeper中的偏移量,因此基於Zookeeper的Kafka監控工具將不會顯示進度。但是,您可以在每個批次中訪問由此方法處理的偏移量,並自己更新Zookeeper