1. 程式人生 > >SparkStreaming整合Kafka-0.8的官方文件要點翻譯

SparkStreaming整合Kafka-0.8的官方文件要點翻譯

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)
Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.

Here we explain how to configure Spark Streaming to receive data from Kafka. There are two approaches to this - the old approach using Receivers and Kafka’s high-level API, and a new approach (introduced in Spark 1.3) without using Receivers.

They have different programming models, performance characteristics, and semantics guarantees, so read on for more details. Both approaches are considered stable APIs as of the current version of Spark.

SparkStreaming整合kafka有兩種方式:

第一種就是:using Receivers and Kafka’s high-level API

第二種就是:without using Receivers

Approach 1: Receiver-based Approach


This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.

第一種消費方式告訴我們,使用kafka的high-level comsumer API結合Receiver資料接收器接收資料進行sparkStreaming的程式處理

However, under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See Deploying section in the streaming programming guide for more details on Write Ahead Logs.

這種方式有可能在壓力過載下導致丟失資料,如果要保證不丟失資料,可以額外的開啟sparkStreaming的WAL機制,這樣,sparkStreaming程式就會在一個例如HDFS的分散式檔案系統中對消費進度進行持久化儲存。如果應用程式重啟,那麼sparkStreaming應用程式的狀態就可以得到恢復

Next, we discuss how to use this approach in your streaming application.

Linking: For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see Linking section in the main programming guide for further information).

 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.3.1

這是編寫scala或者javad程式碼時,要引入的sparkStreaming整合kafka-0.8的依賴


For Python applications, you will have to add this above library and its dependencies when deploying your application. See the Deploying subsection below.

Programming: In the streaming application code, import KafkaUtils and create an input DStream as follows.

Scala

import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])


You can also specify the key and value classes and their corresponding decoder classes using variations of createStream. See the API docs.

這種方式可以指定從kafka中讀取到的資料的key-value的型別, 以及他們的解碼器

Points to remember:

要點:

Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the KafkaUtils.createStream() only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.

Kafka的分割槽數和SparkStreaming應用程式中生成的RDD的分割槽資料不是對應的。所以增加Kafka的Topic的分割槽數,僅僅只能給每個接收資料的Receiver的消費多增加消費執行緒, 這並沒有增加SparkStreaming消費資料的並行度

Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.

If you have enabled Write Ahead Logs with a replicated file system like HDFS, the received data is already being replicated in the log. Hence, the storage level in storage level for the input stream to StorageLevel.MEMORY_AND_DISK_SER (that is, use KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)).

Deploying: As with any Spark applications, spark-submit is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.

For Scala and Java applications, if you are using SBT or Maven for project management, then package spark-streaming-kafka-0-8_2.11 and its dependencies into the application JAR. Make sure spark-core_2.11 and spark-streaming_2.11 are marked as provided dependencies as those are already present in a Spark installation. Then use spark-submit to launch your application (see Deploying section in the main programming guide).

For Python applications which lack SBT/Maven project management, spark-streaming-kafka-0-8_2.11 and its dependencies can be directly added to spark-submit using --packages (see Application Submission Guide). That is,

 ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.1 ...
Alternatively, you can also download the JAR of the Maven artifact spark-streaming-kafka-0-8-assembly from the Maven repository and add it to spark-submit with --jars.

Approach 2: Direct Approach (No Receivers)


This new receiver-less “direct” approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.

這種在Spark-1.3引出的新的不基於Receriver的"Direct"方式確保更強大的端對端保證。跟過去的基於Receiver接收資料相比,這種新的方式週期性的查詢kafka中的每個partition的最新offset便宜範圍。通過這個offset範圍來指定這個時間段內批次資料的範圍。當對應的處理這個批次資料的task啟動起來的時候,預設會使用Kafka's simple Consumer API來按照這個offset範圍讀取資料

This approach has the following advantages over the receiver-based approach (i.e. Approach 1).

Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.

跟以前建立多個並行的KafkaStream來結合處理相對比,DirectStream的方式中,SparkStreaming應用程式將建立和Kafka中這個Topic的parititons的數量一樣的RDD,這些消費者講並行的進行所有資料的讀取。所以這種方式就是相對容易理解的一對一對映關係:RDD的Partition和Topic的Partition一一對應

Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.

為了確保資料零丟失,第一種方式需要把資料通過WAL方式儲存在日誌中。並追加備份。第二種方式沒有Receiver,所以也就麼有必要記錄操作日誌了。相比來說,效能有很大提升。

Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).

ExactlyOnce語義:第一種使用receiver的傳統的從Kafka消費資料的方式,是通過Kafka's High-Level API儲存offset到zookeeper來確保ExactlyOnce的語義的。因為這種語義使用了WAL機制記錄消費日誌,所以可以確保零資料丟失。但是會在某些異常情況下導致個別訊息被消費了兩次。這種異常情況出現在SparkStreaming應用程式的Receiver接收了資料以後,和傳送回執給ZooKeeper之間。因此,第二種方式,我們使用不需要整合ZooKeeper的Simple Kafka API。Offset被儲存在sparkStreaming應用程式的checkPointDirectory中了。這就消除了SparkStreaming和ZooKeeper/Kafka之間的衝突。所以不管是否有異常,SparkStreaming都可以高效的從Kafka中獲取資料消費。但是如果要保證,那麼就必須要輸出手段保證冪等性或者事務操作

Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).

這種方式也有一個缺點,就是基於zookeeper的kafka管理工具,將不顯示topic的消費進度。然而,使用者可以自己儲存topic的每個partition的offset到zookeeper中。

Next, we discuss how to use this approach in your streaming application.

Linking: This approach is supported only in Scala/Java application. Link your SBT/Maven project with the following artifact (see Linking section in the main programming guide for further information).

 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.3.1


Programming: In the streaming application code, import KafkaUtils and create an input DStream as follows.

 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

You can also pass a messageHandler to createDirectStream to access MessageAndMetadata that contains metadata about the current message and transform it to any desired type. See the API docs.

In the Kafka parameters, you must specify either metadata.broker.list or bootstrap.servers. By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration auto.offset.reset in Kafka parameters to smallest, then it will start consuming from the smallest offset.

如果使用KafkaUtils.createDirectStream的方式,那麼必須傳入一個表示kafka的broker的地址列表Kafka的引數,key可以使用"metadata.broker.list"或者"bootstrap.servers"。預設情況下,將從parition的最近的offset開始消費。如果你設定“auto.offset.reset”=“smallest”, 那麼sparkStreaming應用程式將從該partition的最小offset開始消費

You can also start consuming from any arbitrary offset using other variations of KafkaUtils.createDirectStream. Furthermore, if you want to access the Kafka offsets consumed in each batch, you can do the following.

你也可以使用KafkaUtils.createDirectStream的其他過載方法從任意offset開始消費。如果你想獲取kafka的被消費的每個批次的offset,那麼可以使用下面這種API:
Scala實現:

// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.

如果你能獲取到offset,那麼就可以自己手動儲存該offset到ZooKeeper,這樣,基於zookeeper的kafka管理工具就可以顯示流式處理應用程式的消費資料的進度了。

1、Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

注意型別轉換HasOffsetRanges 只會成功,如果是在第一個方法中呼叫的結果directKafkaStream,不是後來一系列的方法。請注意,RDD分割槽和Kafka分割槽之間的一對一對映在任何隨機或重新分割槽的方法(例如reduceByKey()或window())後不會保留。

2、Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, configurations of the form spark.streaming.receiver.* ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the configurations spark.streaming.kafka.*. An important one is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API.

在基於Receiver實現的消費方式中,spark.streaming.receiver.* 這樣的引數都將失效。不基於Receiver的方式中,如果想使用一些引數控制程式的執行,那麼請使用spark.streaming.kafka.*。其中有一個最重要的引數就是控制消費速率的引數:spark.streaming.kafka.maxRatePerPartition, 它表示在使用Direct API消費資料的時候,每秒鐘,每個分割槽 最多能被消費的訊息數量。

Deploying: This is same as the first approach.

借鑑其他大牛的一個總結:

-1,基於接收者的方法

運算元:KafkaUtils.createStream 
方法:PUSH,從topic中去推送資料,將資料推送過來 
API:呼叫的Kafka高階API 
效果:SparkStreaming中的Receivers,恰好Kafka有釋出/訂閱 ,然而:此種方式企業不常用,說明有BUG,不符合企業需求。因為:接收到的資料儲存在Executor的記憶體,會出現資料漏處理或者多處理狀況 
解釋:這種方法使用Receiver來接收資料。Receiver是使用Kafka高階消費者API實現的。與所有的接收者一樣,通過Receiver從Kafka接收的資料儲存在Spark執行程式exector中,然後由Spark Streaming啟動的作業處理資料。但是,在預設配置下,這種方法可能會在失敗時丟失資料。為了確保零資料丟失,您必須在Spark Streaming(在Spark 1.2中引入)中額外啟用寫入日誌,同時儲存所有接收到的Kafka資料寫入分散式檔案系統(例如HDFS)的預先寫入日誌,以便所有資料都可以在失敗時恢復。 
缺點: 
①、Kafka中的主題分割槽與Spark Streaming中生成的RDD的分割槽不相關。因此,增加主題特定分割槽KafkaUtils.createStream()的數量只會增加在單個接收器中使用哪些主題消耗的執行緒的數量。在處理資料時不會增加Spark的並行性 
②、多個kafka輸入到DStream會建立多個group和topic,用於使用多個接收器並行接收資料 
③、如果已經使用HDFS等複製檔案系統啟用了寫入日誌,則接收到的資料已經在日誌中複製。因此,輸入流的儲存級別為儲存級別StorageLevel.MEMORY_AND_DISK_SER

-2,直接方法(無接收者)

運算元:KafkaUtils.createDirectStream 
方式:PULL,到topic中去拉取資料。 
API:kafka低階API 
效果:每次到Topic的每個分割槽依據偏移量進行獲取資料,拉取資料以後進行處理,可以實現高可用 
解釋:在Spark 1.3中引入了這種新的無接收器“直接”方法,以確保更強大的端到端保證。這種方法不是使用接收器來接收資料,而是定期查詢Kafka在每個topic+分partition中的最新偏移量,並相應地定義要在每個批次中處理的偏移量範圍。當處理資料的作業啟動時,Kafka簡單的客戶API用於讀取Kafka中定義的偏移範圍(類似於從檔案系統讀取檔案)。請注意,此功能在Spark 1.3中為Scala和Java API引入,在Spark 1.4中針對Python API引入。 
優勢: 
①、簡化的並行性:不需要建立多個輸入Kafka流並將其合併。與此同時directStream,Spark Streaming將建立與使用Kafka分割槽一樣多的RDD分割槽,這些分割槽將全部從Kafka並行讀取資料。所以在Kafka和RDD分割槽之間有一對一的對映關係,這更容易理解和調整

②、效率:在第一種方法中實現零資料丟失需要將資料儲存在預寫日誌中,這會進一步複製資料。這實際上是效率低下的,因為資料被有效地複製了兩次,一次是由Kafka,另一次是由預先寫入日誌(Write Ahead Log)複製。此方法消除了這個問題,因為沒有接收器,因此不需要預先寫入日誌。只要你有足夠的kafka保留,訊息可以從kafka恢復

③、精確語義:第一種方法是使用Kafka的高階API在Zookeeper中儲存消耗的偏移量。傳統上這是從Kafka消費資料的方式。雖然這種方法(合併日誌)可以確保零資料丟失,但在某些失敗情況下,很小的機率兩次資料都同時丟失,發生這種情況是因為Spark Streaming可靠接收到的資料與Zookeeper跟蹤的偏移之間的不一致。因此,在第二種方法中,我們使用不使用Zookeeper的簡單Kafka API。在其檢查點內,Spark Streaming跟蹤偏移量。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致性,因此Spark Streaming每次記錄都會在發生故障時有效地接收一次。

請注意,這種方法的一個缺點是它不會更新Zookeeper中的偏移量,因此基於Zookeeper的Kafka監控工具將不會顯示進度。但是,您可以在每個批次中訪問由此方法處理的偏移量,並自己更新Zookeeper