工具篇-Spark-Streaming獲取kafka數據的兩種方式(轉載)
轉載自:https://blog.csdn.net/wisgood/article/details/51815845
一、基於Receiver的方式
原理
Receiver從Kafka中獲取的數據存儲在Spark Executor的內存中,然後Spark Streaming啟動的job會去處理那些數據,如果突然數據暴增,大量batch堆積,很容易出現內存溢出的問題。
在默認的配置下,這種方式可能會因為底層失敗而丟失數據。如果要讓數據零丟失,就必須啟用Spark Streaming的預寫日誌機制(Write Ahead Log,WAL),該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日誌中,即使底層節點出現了失敗,也可以使用預寫日誌中的數據進行恢復。
要點
1. Kafka中Topic的Partition與Spark中RDD的Partition沒有關系。所以,在KafkaUtils.createStream()中,提高partition的數量只會增加一個Receiver中讀取Partition線程的數量,不會增加Spark處理數據的並行度。可以創建多個Kafka輸入DStream,使用不同的Consumer Group和Topic,來通過多個Receiver並行接收數據。
2. 如果基於容錯的文件系統,比如HDFS,啟用了預寫日誌機制,接收到的數據都會被復制一份到預寫日誌中。此時在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
二、基於Direct的方式
原理
該方式在Spark 1.3中引入,來確保更加健壯的機制。這種方式會周期性地查詢Kafka,獲取每個Topic+Partition的最新的Offset,從而定義每個batch的offset的範圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset範圍的數據。
優點
1. 簡化並行讀取:在Kafka Partition和RDD Partition之間,有一個一對一的映射關系,所以如果要讀取多個partition,不需要創建多個輸入DStream然後對它們進行Union操作,Spark會創建跟Kafka Partition一樣多的RDD Partition,並且會並行從Kafka中讀取數據。
對比(在實際生產環境中大都用Direct方式):
基於Receiver的方式,使用Kafka的高階API在ZooKeeper中保存消費過的offset的,這是消費Kafka數據的傳統方式。這種方式配合WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
基於Direct的方式,使用Kafka的簡單API,Spark Streaming自己就負責追蹤消費的Offset,並保存在Checkpoint中,Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
工具篇-Spark-Streaming獲取kafka數據的兩種方式(轉載)