1. 程式人生 > >Spark Structured Streaming入門程式設計指南

Spark Structured Streaming入門程式設計指南

Spark結構式流程式設計指南

概覽

Structured Streaming 是一個可拓展,容錯的,基於Spark SQL執行引擎的流處理引擎。使用小量的靜態資料模擬流處理。伴隨流資料的到來,Spark SQL引擎會逐漸連續處理資料並且更新結果到最終的Table中。你可以在Spark SQL上引擎上使用DataSet/DataFrame API處理流資料的聚集,事件視窗,和流與批次的連線操作等。最後Structured Streaming 系統快速,穩定,端到端的恰好一次保證,支援容錯的處理。

小樣例

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder .appName("StructuredNetworkWordCount") .getOrCreate() import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count
val wordCounts = words.groupBy("value").count() val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination()

程式設計模型

結構化流的關鍵思想是將實時資料流視為一個連續附加的表

基本概念

將輸入的資料當成一個輸入的表格,每一個數據當成輸入表的一個新行。

"Output"是寫入到外部儲存的寫方式,寫入方式有不同的模式:

  • Complete模式: 將整個更新表寫入到外部儲存,寫入整個表的方式由儲存聯結器決定。
  • Append模式:只有自上次觸發後在結果表中附加的新行將被寫入外部儲存器。這僅適用於結果表中的現有行不會更改的查詢。
  • Update模式:只有自上次觸發後在結果表中更新的行將被寫入外部儲存器(在Spark 2.0中尚不可用)。注意,這與完全模式不同,因為此模式不輸出未更改的行。

處理事件時間和延遲資料

事件時間是嵌入在資料本身中的時間。對於許多應用程式,您可能希望在此事件時間操作。例如,如果要獲取IoT裝置每分鐘生成的事件數,則可能需要使用生成資料的時間(即資料中的事件時間),而不是Spark接收的時間他們。此事件時間在此模型中非常自然地表示 - 來自裝置的每個事件都是表中的一行,事件時間是該行中的一個列值。這允許基於視窗的聚合(例如每分鐘的事件數)僅僅是偶數時間列上的特殊型別的分組和聚合 - 每個時間視窗是一個組,並且每一行可以屬於多個視窗/組。因此,可以在靜態資料集(例如,來自收集的裝置事件日誌)以及資料流上一致地定義這種基於事件時間窗的聚合查詢,使得使用者的生活更容易。

此外,該模型自然地處理基於其事件時間比預期到達的資料。由於Spark正在更新結果表,因此當存在延遲資料時,它可以完全控制更新舊聚合,以及清除舊聚合以限制中間狀態資料的大小。由於Spark 2.1,我們支援水印,允許使用者指定後期資料的閾值,並允許引擎相應地清除舊的狀態。稍後將在“視窗操作”部分中對此進行詳細說明。

容錯語義

提供端到端的一次性語義是結構化流的設計背後的關鍵目標之一。為了實現這一點,我們設計了結構化流源,接收器和執行引擎,以可靠地跟蹤處理的確切進展,以便它可以通過重新啟動和/或重新處理來處理任何型別的故障。假定每個流源具有偏移量(類似於Kafka偏移量或Kinesis序列號)以跟蹤流中的讀取位置。引擎使用檢查點和預寫日誌來記錄每個觸發器中正在處理的資料的偏移範圍。流接收器被設計為用於處理再處理的冪等。結合使用可重放源和冪等宿,結構化流可以確保在任何故障下的端到端的一次性語義。

使用DataFrame和DataSet API

從Spark 2.0開始,DataFrames和Datasets可以表示靜態,有界資料,以及流式,無界資料。與靜態DataSets/ DataFrames類似,您可以使用公共入口點SparkSession(Scala / Java / Python文件)從流源建立流DataFrames /DataSets,並對它們應用與靜態DataFrames / Datasets相同的操作。如果您不熟悉Datasets / DataFrames,強烈建議您使用DataFrame / Dataset程式設計指南熟悉它們。

建立資料框流和資料集流

Streaming DataFrames可以通過SparkSession.readStream()返回的DataStreamReader介面(Scala / Java / Python docs)建立。類似於用於建立靜態DataFrame的讀取介面,您可以指定源 - 資料格式,模式,選項等的詳細資訊。

資料來源

在Spark 2.0,有幾個內建的資料來源:

  • 檔案源:將寫入目錄中的檔案讀取為資料流。支援的檔案格式有text,csv,json,parquet。請參閱DataStreamReader介面的文件以獲取更新的列表,以及每種檔案格式支援的選項。注意,檔案必須原子地放置在給定目錄中,在大多數檔案系統中,可以通過檔案移動操作來實現。
  • Kafka源:從kafka拉取資料,支援kafka broker versions 0.10.0 or higher.從kafka整合指南獲取更多資訊。
  • Socket源(測試用):從套接字連線讀取UTF8文字資料。監聽伺服器套接字在驅動程式。注意,這應該僅用於測試,因為這不提供端到端容錯保證

這些示例生成無型別的流式DataFrames,這意味著在編譯時不檢查DataFrame的模式,僅在提交查詢時在執行時檢查。一些操作,如map,flatMap等,需要在編譯時知道型別。要做到這些,你可以使用與靜態DataFrame相同的方法將這些無型別的流DataFrames轉換為型別化流資料集。有關更多詳細資訊,請參閱SQL程式設計指南。此外,有關支援的流媒體源的更多詳細資訊將在文件中稍後討論。

資料框/資料集流的模式推理和分割槽

預設情況下,基於檔案的源的結構化流要求您指定模式,而不是依靠Spark自動推斷。此限制確保即使在發生故障的情況下,一致的模式也將用於流式查詢。對於臨時用例,可以通過將spark.sql.streaming.schemaInference設定為true來重新啟用模式推斷。
當名為/ key = value /的子目錄存在時,發生分割槽發現,並且列表將自動遞迴到這些目錄中。如果這些列出現在使用者提供的模式中,它們將由Spark根據正在讀取的檔案的路徑填充。當查詢開始時,組成分割槽方案的目錄必須存在,並且必須保持靜態。例如,可以新增/ data / year = 2016 / when / data / year = 2015 /存在,但是更改分割槽列是無效的(即通過建立目錄/ data / date = 2016-04-17 /)。

流式DataFrames/Datasets上的操作

您可以對流式DataFrames /資料集應用各種操作 - 從無型別,類似SQL的操作(例如select,where,groupBy)到型別化的RDD類操作(例如map,filter,flatMap)。有關更多詳細資訊,請參閱SQL程式設計指南。讓我們來看看一些你可以使用的示例操作。

基本操作 - 選擇,投影,聚合

case class DeviceData(device: String, type: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("type").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(_.type).agg(typed.avg(_.signal))    // using typed API

事件時間上的視窗操作

滑動事件時間視窗上的聚合通過結構化流直接進行。理解基於視窗的聚合的關鍵思想與分組聚合非常相似。在分組聚合中,為使用者指定的分組列中的每個唯一值維護聚合值(例如計數)。在基於視窗的聚合的情況下,對於行的事件時間落入的每個視窗維持聚合值。讓我們用插圖來理解這一點。

想象一下,我們的快速示例被修改,流現在包含行以及生成行的時間。我們不想執行字數,而是計算10分鐘內的字數,每5分鐘更新一次。也就是說,在10分鐘視窗12:00-12:10,12:05-12:15,12:10-12:20等之間接收的詞中的字數。注意,12:00 -12:10意味著資料在12:00之後但在12:10之前到達。現在,考慮在12:07收到的一個字。這個單詞應該增加對應於兩個視窗12:00 - 12:10和12:05 - 12:15的計數。因此,計數將通過分組鍵(即字)和視窗(可以從事件時間計算)來索引。
結果表將如下所示:

由於此視窗類似於分組,因此在程式碼中,可以使用groupBy()和window()操作來表示視窗化聚合。您可以在Scala / Java / Python中檢視以下示例的完整程式碼。

處理延遲資料和水位線

現在考慮如果事件中的一個到達應用程式的遲到會發生什麼。例如,例如,在12:04(即事件時間)生成的詞可以由應用在12:11接收到。應用程式應使用時間12:04而不是12:11來更新視窗12:00 - 12:10的舊計數。這在我們的基於視窗的分組中自然地發生 - 結構化流可以長時間地保持部分聚合的中間狀態,使得晚期資料可以正確地更新舊視窗的聚集,如下所示。

但是,要執行此查詢的天數,系統必須繫結其累積的中間記憶體中狀態的數量。這意味著系統需要知道何時可以從記憶體中狀態刪除舊聚合,因為應用程式將不再接收該聚合的延遲資料。為了實現這一點,在Spark 2.1中,我們引入了水印,讓我們的引擎自動跟蹤資料中的當前事件時間,並嘗試相應地清理舊的狀態。您可以通過指定事件時間列和根據事件時間預計資料延遲的閾值來定義查詢的水印。對於在時間T開始的特定視窗,引擎將保持狀態並允許後期資料更新狀態,直到(由引擎看到的最大事件時間 - 後期閾值> T)。換句話說,閾值內的晚資料將被聚合,但晚於閾值的資料將被丟棄。讓我們用一個例子來理解這個。我們可以使用withWatermark()在上面的例子中輕鬆定義水印,如下所示。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()

在這個例子中,我們定義查詢的水印對列“timestamp”的值,並且還定義“10分鐘”作為允許資料超時的閾值。如果此查詢在Append輸出模式(稍後在“輸出模式”部分中討論)中執行,則引擎將從列“timestamp”跟蹤當前事件時間,並在最終確定視窗計數和新增之前等待事件時間的額外“10分鐘”他們到結果表。這是一個例證。

如圖所示,由引擎跟蹤的最大事件時間是藍色虛線,並且在每個觸發的開始處設定為(最大事件時間 - '10分鐘')的水印是紅色線。例如,當引擎觀察資料(12:14,狗),它將下一個觸發器的水印設定為12:04。對於視窗12:00 - 12:10,部分計數保持為內部狀態,而系統正在等待延遲資料。在系統發現數據(即(12:21,owl))使得水印超過12:10之後,部分計數被最終確定並附加到表。此計數將不會進一步更改,因為所有超過12:10的“太晚”資料將被忽略。

請注意,在追加輸出模式下,系統必須等待“延遲閾值”時間才能輸出視窗的聚合。如果資料可能很晚,(例如1天),並且您希望部分計數而不等待一天,這可能不是理想的。將來,我們將新增更新輸出模式,這將允許每次更新聚合寫入到每個觸發器。

用於清除聚合狀態的水印的條件重要的是要注意,水印應當滿足以下條件以清除聚合查詢中的狀態(從Spark 2.1開始,將來會改變)。

  • 輸出模式必須為追加。完成模式要求保留所有聚合資料,因此不能使用水印來刪除中間狀態。有關每種輸出模式的語義的詳細說明,請參見“輸出模式”部分。
  • 聚合必須具有事件時列,或事件時列上的視窗。
  • withWatermark必須在與聚合中使用的時間戳列相同的列上呼叫。例如,df.withWatermark(“time”,“1 min”)。groupBy(“time2”)。count()在Append輸出模式下無效,因為水印是在與聚合列不同的列上定義的。
  • 其中在要使用水印細節的聚合之前必須呼叫withWatermark。例如,df.groupBy(“time”).count().withWatermark(“time”,“1 min”)在Append輸出模式中無效。

Join操作

流DataFrames可以與靜態DataFrames連線以建立新的流DataFrames。這裡有幾個例子。

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  // right outer join with a static DF 

不支援的操作

但是,請注意,所有適用於靜態DataFrames /資料集的操作在流式DataFrames /資料集中不受支援。雖然這些不支援的操作中的一些將在未來的Spark版本中得到支援,但還有一些基本上難以有效地在流資料上實現。例如,輸入流資料集不支援排序,因為它需要跟蹤流中接收的所有資料。因此,這在根本上難以有效地執行。從Spark 2.0開始,一些不受支援的操作如下:

  • 在流資料集上還不支援多個流聚集(即,流DF上的聚合鏈)。
  • 在流資料集上不支援限制和獲取前N行。
  • 不支援對流資料集進行不同操作。
  • 排序操作僅在聚合後在完整輸出模式下支援流資料集。
  • 條件支援流式傳輸和靜態資料集之間的外連線。
  • 不支援帶有流資料集的完全外連線
  • 不支援左外部連線與右側的流資料集
  • 不支援左側的流資料集的右外部聯接
  • 尚不支援兩個流資料集之間的任何型別的連線。

此外,還有一些Dataset方法不能用於流資料集。它們是將立即執行查詢並返回結果的操作,這對流資料集沒有意義。相反,這些功能可以通過顯式地啟動流查詢來完成(參見下一部分)。

  • count() - 無法從流資料集返回單個計數。
    相反,使用ds.groupBy.count()返回包含執行計數的流資料集。
  • foreach() - 而是使用ds.writeStream.foreach(...)(參見下一節)。
  • show() - 而是使用控制檯接收器(請參閱下一節)。

如果您嘗試任何這些操作,您將看到一個AnalysisException如“操作XYZ不支援與流DataFrames /資料集”。

啟動流式查詢

一旦定義了最終結果DataFrame / Dataset,剩下的就是啟動流計算。為此,您必須使用通過Dataset.writeStream()返回的DataStreamWriter(Scala / Java / Python文件)。您必須在此介面中指定以下一個或多個。

  • 輸出接收器的詳細資訊:資料格式,位置等
  • 輸出模式:指定寫入輸出接收器的內容。
  • 查詢名稱:(可選)指定查詢的唯一名稱以進行標識。
  • 觸發間隔:可選擇指定觸發間隔。如果未指定,系統將在上一個處理完成後立即檢查新資料的可用性。如果由於先前處理尚未完成而錯過觸發時間,則系統將嘗試在下一觸發點處觸發,而不是在處理完成之後立即觸發。
  • 檢查點位置:對於可以保證端到端容錯的某些輸出接收器,請指定系統將寫入所有檢查點資訊的位置。這應該是HDFS相容的容錯檔案系統中的目錄。檢查點的語義將在下一節中更詳細地討論。

輸出模式

有幾種型別的輸出模式:

  • 附加模式(預設) - 這是預設模式,其中只有自上次觸發後新增到結果表中的新行將輸出到接收器。這僅支援那些新增到結果表中的行從不會更改的查詢。因此,該模式保證每行只輸出一次(假設容錯宿)。例如,只有select,where,map,flatMap,filter,join等的查詢將支援Append模式。
  • 完成模式 - 每次觸發後,整個結果表將輸出到接收器。聚合查詢支援此選項。
  • 更新模式 - (在Spark 2.1中不可用)只有結果表中自上次觸發後更新的行才會輸出到接收器。更多資訊將在未來版本中新增。

不同型別的流查詢支援不同的輸出模式。這裡是相容性矩陣:

查詢型別支援的輸出模式
無聚合的查詢支援完整模式因為不可能保留結果表中的所有資料。
帶有聚合的聚合聚合在帶水印的事件時間聚合附加,完全附加模式使用水印來刪除舊的聚合狀態。但是視窗化聚合的輸出被延遲了在withWatermark()中指定的晚期閾值,如模式語義,在結束表之後,只有在結束表(在水印被交叉之後)才能將行新增一次。有關詳細資訊,請參閱延遲資料部分。完成模式不刪除舊的聚合狀態,因為從定義該模式保留結果表中的所有資料。
其他聚合完全不支援完全附加模式,因為聚合可以更新,因此違反了此​​模式的語義。完成模式不刪除舊的聚合狀態,因為從定義該模式保留結果表中的所有資料。

輸出接收器

有幾種型別的內建輸出接收器:

  • 檔案接收器 - 將輸出儲存到目錄。
  • Foreach sink - 對輸出中的記錄執行任意計算。有關詳細資訊,請參閱後面的部分。
  • 控制檯接收器(用於除錯) - 每次有觸發器時將輸出列印到控制檯/ stdout。這應該用於低資料量上的除錯目的,因為每次觸發後,整個輸出被收集並存儲在驅動程式的記憶體中。
  • 記憶體接收器(用於除錯) - 輸出作為記憶體表儲存在記憶體中。支援附加和完成輸出模式。這應該用於低資料量上的除錯目的,因為每次觸發後,整個輸出被收集並存儲在驅動程式的記憶體中。

下面是所有接收器的表格和相應的設定:

接收器支援的輸出模式用法容錯備註
檔案接收器AppendwriteStream.format("parquet").start()Yes支援對分割槽表的寫入。按時間分割槽可能有用。
Foreach 接收器所有模式writeStream.foreach(...).start()取決於ForeachWriter實現更多細節在下一節
控制檯接收器Append, CompletewriteStream.format("console").start()No
記憶體接收器Append, CompletewriteStream.format("memory").queryName("table").start()No將輸出資料儲存為表,用於互動式查詢。表名是查詢名稱。

最後,你必須呼叫start()才能真正開始執行查詢。這返回一個StreamingQuery物件,它是連續執行的執行的控制代碼。您可以使用此物件來管理查詢,我們將在下一小節中討論。現在,讓我們通過幾個例子來理解這一切。

// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .parquet("path/to/destination/directory")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table

使用foreach

foreach操作允許對輸出資料計算任意操作。從Spark 2.1開始,這隻適用於Scala和Java。要使用這個,你必須實現介面ForeachWriter(Scala / Java docs),它有一個方法,當觸發後產生一系列行作為輸出時被呼叫。請注意以下要點。
編寫器必須是可序列化的,因- 為它將被序列化併發送到執行器以供執行。

  • 所有三個方法,開啟,處理和關閉將被呼叫的執行者。
  • 只有當呼叫open方法時,寫程式必須執行所有的初始化(例如開啟連線,啟動事務等)。請注意,如果在建立物件時在類中有任何初始化,那麼該初始化將在驅動程式中進行(因為這是建立例項的地方),這可能不是您想要的。
  • 版本和分割槽是open中的兩個引數,它們唯一地表示需要被推出的一組行。版本是一個單調增加的id,隨著每個觸發器增加。partition是表示輸出的分割槽的id,因為輸出是分散式的,並且將在多個執行器上處理。
  • open可以使用版本和分割槽來選擇是否需要寫行序列。因此,它可以返回true(繼續寫入)或false(不需要寫入)。如果返回false,那麼將不會在任何行上呼叫程序。例如,在部分故障之後,失敗觸發器的一些輸出分割槽可能已經被提交到資料庫。基於儲存在資料庫中的元資料,寫者可以識別已經提交的分割槽,因此返回false以跳過再次提交它們。
  • 每當呼叫open時,也將呼叫close(除非JVM由於某些錯誤而退出)。即使open返回false,也是如此。如果在處理和寫入資料時出現任何錯誤,將使用錯誤呼叫close。您有責任清除在開放中建立的狀態(例如連線,事務等),以便沒有資源洩漏。

管理流式查詢

啟動查詢時建立的StreamingQuery物件可用於監視和管理查詢。

StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();    // the exception if the query has been terminated with error

query.sourceStatus();  // progress information about data has been read from the input sources

query.sinkStatus();   // progress information about data written to the output sink

您可以在單個SparkSession中啟動任意數量的查詢。他們將同時執行共享叢集資源。您可以使用sparkSession.streams()獲取可用於管理當前活動查詢的StreamingQueryManager(Scala / Java / Python文件)。

SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates

監視流查詢

有兩個API用於以互動式和非同步方式監視和除錯活動的查詢。

互動式API

您可以使用streamingQuery.lastProgress()和streamingQuery.status()直接獲取活動查詢的當前狀態和指標。 lastProgress()在Scala和Java中返回一個StreamingQueryProgress物件,在Python中返回一個具有相同欄位的字典。它具有關於在流的最後觸發中所進行的進展的所有資訊 - 什麼資料被處理,什麼是處理速率,等待時間等。還有streamingQuery.recentProgress,它返回最後幾個進度的陣列。

此外,streamingQuery.status()在Scala和Java中返回StreamingQueryStatus物件,在Python中返回具有相同欄位的字典。它提供有關查詢立即執行的操作的資訊 - 是觸發器活動,正在處理資料等。這裡有幾個例子。

StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/

非同步API

您還可以通過附加StreamingQueryListener(Scala / Java docs)非同步監視與SparkSession相關聯的所有查詢。使用sparkSession.streams.attachListener()附加自定義StreamingQueryListener物件後,當查詢啟動和停止以及活動查詢中有進度時,您將獲得回撥。這裡是一個例子

SparkSession spark = ...

spark.streams.addListener(new StreamingQueryListener() {
    @Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});

使用檢查點從故障中恢復

在故障或故意關閉的情況下,您可以恢復先前查詢的先前進度和狀態,並繼續在其停止的地方。這是通過使用檢查點和預寫日誌來完成的。您可以配置具有檢查點位置的查詢,並且查詢將儲存所有進度資訊(即每個觸發器中處理的偏移範圍)和正在執行的聚合(例如快速示例中的字計數)到檢查點位置。此檢查點位置必須是HDFS相容檔案系統中的路徑,並且可以在啟動查詢時在DataStreamWriter中設定為選項。

aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();