1. 摘要

在之前的一篇部落格中,我們介紹了Clustering(聚簇)的表服務來重新組織資料來提供更好的查詢效能,而不用降低攝取速度,並且我們已經知道如何部署同步Clustering,本篇部落格中,我們將討論近期社群做的一些改進以及如何通過HoodieClusteringJobDeltaStreamer工具來部署非同步Clustering

2. 介紹

通常講,Clustering根據可配置的策略建立一個計劃,根據特定規則對符合條件的檔案進行分組,然後執行該計劃。Hudi支援併發寫入,並在多個表服務之間提供快照隔離,從而允許寫入程式在後臺執行Clustering時繼續攝取。有關Clustering的體系結構的更詳細概述請檢視上一篇博文。

3. Clustering策略

如前所述Clustering計劃和執行取決於可插拔的配置策略。這些策略大致可分為三類:計劃策略執行策略更新策略

3.1 計劃策略

該策略在建立Clustering計劃時發揮作用。它有助於決定應該對哪些檔案組進行Clustering。讓我們看一下Hudi提供的不同計劃策略。請注意,使用此配置可以輕鬆地插拔這些策略。

  • SparkSizeBasedClusteringPlanStrategy:根據基本檔案的小檔案限制選擇檔案切片並建立Clustering組,最大大小為每個組允許的最大檔案大小。可以使用此配置指定最大大小。此策略對於將中等大小的檔案合併成大檔案非常有用,以減少跨冷分割槽分佈的大量檔案。

  • SparkRecentDaysClusteringPlanStrategy:根據以前的N天分割槽建立一個計劃,將這些分割槽中的小檔案片進行Clustering,這是預設策略,當工作負載是可預測的並且資料是按時間劃分時,它可能很有用。

  • SparkSelectedPartitionsClusteringPlanStrategy:如果只想對某個範圍內的特定分割槽進行Clustering,那麼無論這些分割槽是新分割槽還是舊分割槽,此策略都很有用,要使用此策略,還需要在下面設定兩個配置(包括開始和結束分割槽):

    hoodie.clustering.plan.strategy.cluster.begin.partition
    hoodie.clustering.plan.strategy.cluster.end.partition

注意:所有策略都是分割槽感知的,後兩種策略仍然受到第一種策略的大小限制的約束。

3.2 執行策略

在計劃階段構建Clustering組後,Hudi主要根據排序列和大小為每個組應用執行策略,可以使用此配置指定策略。

SparkSortAndSizeExecutionStrategy是預設策略。使用此配置進行Clustering時,使用者可以指定資料排序列。除此之外我們還可以為Clustering產生的Parquet檔案設定最大檔案大小。該策略使用bulk_insert將資料寫入新檔案,在這種情況下,Hudi隱式使用一個分割槽器,該分割槽器根據指定列進行排序。通過這種策略改變資料佈局,不僅提高了查詢效能,而且自動平衡了重寫開銷。

現在該策略可以作為單個Spark作業或多個作業執行,具體取決於在計劃階段建立的Clustering組的數量。預設情況下Hudi將提交多個Spark作業併合並結果。如果要強制Hudi使用單Spark作業,請將執行策略類配置設定為SingleSparkJobExecutionStrategy

3.3 更新策略

目前只能為未接收任何併發更新的表/分割槽排程Clustering。預設情況下更新策略的配置設定為SparkRejectUpdateStrategy。如果某個檔案組在Clustering期間有更新,則它將拒絕更新並引發異常。然而在某些用例中,更新是非常稀疏的,並且不涉及大多數檔案組。簡單拒絕更新的預設策略似乎不公平。在這種用例中使用者可以將配置設定為SparkAllowUpdateStregy

我們討論了關鍵策略配置,下面列出了與Clustering相關的所有其他配置。在此列表中一些非常有用的配置包括:

配置項 解釋 預設值
hoodie.clustering.async.enabled 啟用在表上的非同步執行Clustering服務。 false
hoodie.clustering.async.max.commits 通過指定應觸發多少次提交來控制非同步Clustering的頻率。 4
hoodie.clustering.preserve.commit.metadata 重寫資料時保留現有的_hoodie_commit_time。這意味著使用者可以在Clustering資料上執行增量查詢,而不會產生任何副作用。 false

4. 非同步Clustering

之前我們已經瞭解了使用者如何設定同步Clustering。此外使用者可以利用HoodiecClusteringJob設定兩步非同步Clustering

4.1 HoodieClusteringJob

隨著Hudi版本0.9.0的釋出,我們可以在同一步驟中排程和執行Clustering。我們只需要指定-mode-m選項。有如下三種模式:

  • schedule(排程):制定一個Clustering計劃。這提供了一個可以在執行模式下傳遞的instant

  • execute(執行):在給定的instant執行Clustering計劃,這意味著這裡需要instant

  • scheduleAndExecute(排程並執行):首先制定Clustering計劃並立即執行該計劃。

請注意要在原始寫入程式仍在執行時執行作業請啟用多寫入:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider

使用spark submit命令提交HoodieClusteringJob示例如下:

spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path /path/to/hudi_table/basePath \
--table-name hudi_table_schedule_clustering \
--spark-memory 1g

clusteringjob.properties配置檔案示例如下

hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=column1,column2

4.2 HoodieDeltaStreamer

接著看下如何使用HudiDeltaStreamer。現在我們可以使用DeltaStreamer觸發非同步Clustering。只需將hoodie.clustering.async.enabledtrue,並在屬性檔案中指定其他Clustering配置,在啟動Deltastreamer時可以將其位置設為-props(與HoodieClusteringJob配置類似)。

使用spark submit命令提交HoodieDeltaStreamer示例如下:

spark-submit \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.9.0-SNAPSHOT.jar \
--props /path/to/config/clustering_kafka.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
--source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
--source-ordering-field impresssiontime \
--table-type COPY_ON_WRITE \
--target-base-path /path/to/hudi_table/basePath \
--target-table impressions_cow_cluster \
--op INSERT \
--hoodie-conf hoodie.clustering.async.enabled=true \
--continuous

4.3 Spark Structured Streaming

我們還可以使用Spark結構化流啟用非同步Clustering,如下所示。

val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
) def getAsyncClusteringOpts(isAsyncClustering: String,
clusteringNumCommit: String,
executionStrategy: String):Map[String, String] = {
commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,
HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy
)
} def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = {
val streamingInput = // define the source of streaming
Future {
println("streaming starting")
streamingInput
.writeStream
.format("org.apache.hudi")
.options(hudiOptions)
.option("checkpointLocation", basePath + "/checkpoint")
.mode(Append)
.start()
.awaitTermination(10000)
println("streaming ends")
}
} def structuredStreamingWithClustering(): Unit = {
val df = //generate data frame
val hudiOptions = getClusteringOpts("true", "1", "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy")
val f1 = initStreamingWriteFuture(hudiOptions)
Await.result(f1, Duration.Inf)
}

5. 總結和未來工作

在這篇文章中,我們討論了不同的Clustering策略以及如何設定非同步Clustering。未來的工作包括:

  • Clustering支援更新。

  • 支援Clustering的CLI工具。

另外Flink支援Clustering已經有相應Pull Request,有興趣的小夥伴可以關注該PR。

可以檢視JIRA瞭解更多關於此問題的開發,我們期待社會各界的貢獻,希望你喜歡這個部落格!