1. 程式人生 > >Structured Streaming + Kafka Integration Guide 結構化流+Kafka集成指南 (Kafka broker version 0.10.0 or higher)

Structured Streaming + Kafka Integration Guide 結構化流+Kafka集成指南 (Kafka broker version 0.10.0 or higher)

bsp 次數 集成 重復項 park artifact sse 語義 timestamp

用於Kafka 0.10的結構化流集成從Kafka讀取數據並將數據寫入到Kafka。

1. Linking

對於使用SBT/Maven項目定義的Scala/Java應用程序,用以下工件artifact連接你的應用程序:

技術分享圖片

對於Python應用程序,你需要在部署應用程序時添加上面的庫及其依賴關系。查看Deploying子節點。

2. Reading Data from Kafka 從Kafka讀取數據

2.1 Creating a Kafka Source for Streaming Queries 為流式查詢創建一個Kafka來源

技術分享圖片

2.2 Creating a Kafka Source for Batch Queries 為批處理查詢創建一個Kafka來源

如果你有一個更適合用於批處理的用例,你可以為定義的偏移量範圍創建一個Dataset/DataFrame。

技術分享圖片

來源的每一行有以下格式:

ColumnType
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

對於批處理和流式查詢,必須為Kafka來源設置以下選項。

Optionvaluemeaning
assign json string {"topicA":[0,1],"topicB":[2,4]} 要使用特定TopicPartition。對於Kafka來源,只能指定“assign”,"subscribe"或者“subscribePattern”選項中的一個。
subscribe A comma-separated list of topics 要訂閱的主題列表.對於Kafka來源,只能指定“assign”,"subscribe"或者“subscribePattern”選項中的一個。
subscribePattern Java regex string 用於訂閱主題的格式。對於Kafka來源,只能指定“assign”,"subscribe"或者“subscribePattern”選項中的一個。
kafka.bootstrap.servers A comma-separated list of host:port Kafka "bootstrap.servers" 配置。

以下配置是可選的:

Optionvaluedefaultquery typemeaning
startingOffsets "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ "latest"用於流, "earliest" 用於批量

streaming and batch

流和批量

查詢開始時的起始點,可以是最早偏移量的“earliest”,也可以是最近偏移量的“latest”,也可以是指定每個TopicPartition起始偏移量的json字符串。在Json中,-2作為偏移量可以用來指最早的,-1指最新的。註意:對於批量查詢,不允許使用最新(隱式或在json中使用-1)。對於流式查詢,這僅適用於新查詢開始時的情況,並且恢復將始終從查詢停止的地方開始。在查詢期間新發現的分區將從earliest開始。
endingOffsets latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} latest

batch query

批量查詢

批量查詢結束時的結束點,可以是最新引用的“latest”,也可以是指定每個TopicPartition結束偏移量的json字符串。在json中,-1作為偏移量可以用於引用最新的,-2(最早的)作為偏移量是不允許的。
failOnDataLoss true or false true

streaming query

流式查詢

當可能丟失數據時是否讓查詢失敗(例如,主題被刪除或偏移超出範圍)。這可能是一個虛驚。當它不像你期望的那樣工作時,你可以禁用它。如果由於丟失數據而無法從提供的偏移量中讀取任何數據,批量查詢將始終失敗。
kafkaConsumer.pollTimeoutMs long 512 streaming and batch 在executors中輪詢來自Kafka的數據的超時時間(以毫秒為單位)
fetchOffset.numRetries int 3 streaming and batch 在放棄提取Kafka偏移量之前重試的次數。
fetchOffset.retryIntervalMs long 10 streaming and batch 重試去提取Kafka偏移量之前等待的毫秒數。
maxOffsetsPerTrigger long none streaming and batch 每次觸發間隔處理的最大偏移量的速率限制。指定的偏移量總數將按不同卷的topicPartition成比例地分割。

3. Writing Data to Kafka 將數據寫入到Kafka

這裏,我們描述了向Apache Kafka寫入流式查詢和批量查詢的支持。註意Apache Kafka只支持至少一次寫入語義。因此,在向Kafka寫入流式查詢或批量查詢時,可能會復制一些記錄;例如,如果Kafka需要重試未被Broker確認的消息,即使該Broker接收並編寫了消息記錄,也會發生這種情況。由於這些Kafka寫入語義,結構化流不能阻止這種復制發生。然而,如果編寫查詢是成功的,那麽你可以假定查詢輸出至少寫入一次。當讀取寫入的數據刪除重復項的可能解決方法可能是引入主要(唯一)key,當讀取時這key可以用於執行重復數據刪除。

寫入到Kafka的Dataframe應該在模式上有以下列:

ColumnType
key (optional) string or binary
value (required) string or binary
topic (*optional) string

* 如果“topic”配置選項不指定,則topic列是必須的。

value列是唯一要求的選項。如果key列沒有指定,那麽將會自動添加值為null的key列(查看Kafka語義中關於如何處理空值key)。如果topic列存在,那麽在將給予的行寫入到Kafka時,它的值用作topic,除非設置好“topic”配置選項。例如,“topic”配置選項覆蓋topic列。

對於批量和流式查詢,必須為Kafka sink設置以下選項:

Optionvaluemeaning
kafka.bootstrap.servers A comma-separated list of host:port Kafka "bootstrap.servers"配置。

以下選項是可選的:

Optionvaluedefaultquery typemeaning
topic string none streaming and batch 設置所有行將在Kafka中寫入的topic。該選項將覆蓋數據中可能存在的任何topic列。

3.1 Creating a Kafka Sink for Streaming Queries 為流式查詢創建Kafka Sink

技術分享圖片

3.2 Writing the output of Batch Queries to Kafka 將批量查詢的輸出寫入到Kafka中

技術分享圖片

4. Kafka Specific Configurations Kafka特定的配置

Kafka自己的配置可以通過帶有Kafka.prefix的DataStreamReader.option進行設置。例如,stream.option("kafka.bootstrap.servers","host":"port")。有關可能的Kafka參數,參閱Kafka消費者配置文檔以獲取與讀取數據相關的參數,以及Kafka生產者配置文件以獲取與寫入數據相關的參數。

註意以下Kafka參數不能設置,Kafka source或者sink將會拋出錯誤。

  • group.id: Kafka source將會自動為每次查詢創建唯一的分組id。
  • auto.offset.reset: 將source選項startingOffsets設置為指定從哪裏開始。結構化流管理內部消費的偏移量,而不是依賴Kafka消費者來完成。這將確保在topic/partitons動態訂閱時不會遺漏任何數據。註意,只有在啟動新的流式查詢時才會應用startingOffsets,並且恢復操作始終會從查詢停止的位置啟動。.
  • key.deserializer:Keys總是被反序列化為ByteArrayDeserializer的字節數組。使用DataFrame操作顯式反序列化keys。
  • value.deserializer:Values總是被反序列化為ByteArrayDeserializer的字節數組。使用DataFrame操作顯式反序列化values。
  • key.serializer: keys總是使用ByteArraySerializer或StringSerializer進行序列化。使用DataFrame操作將keys顯示序列化為字符串或字節數組。
  • value.serializer: values總是使用ByteArraySerializer或StringSerializer進行序列化使用DataFrame操作將values顯示序列化為字符串或字節數組。
  • enable.auto.commit: Kafka source不提交任何offset。
  • interceptor.classes: Kafka source總是以字節數組的形式讀取key和value。使用ConsumerInterceptor是不安全的,因為它可能會打斷查詢。

5. Deploying 部署

與任何Spark應用程序一樣,spark-submit用於啟動你的應用程序。spark-sql-kafka-0-10_2.11及其依賴關系可以直接添加到使用--packages的spark-submit中,例如,

技術分享圖片

更多關於提交帶有外部依賴項的應用程序的詳細信息參閱Application Submission Guide。

Structured Streaming + Kafka Integration Guide 結構化流+Kafka集成指南 (Kafka broker version 0.10.0 or higher)