1. 程式人生 > >使用Apache Spark和Apache Hudi構建分析資料湖

使用Apache Spark和Apache Hudi構建分析資料湖

## 1. 引入 大多數現代資料湖都是基於某種分散式檔案系統(DFS),如HDFS或基於雲的儲存,如AWS S3構建的。遵循的基本原則之一是檔案的“一次寫入多次讀取”訪問模型。這對於處理海量資料非常有用,如數百GB到TB的資料。 但是在構建分析資料湖時,更新資料並不罕見。根據不同場景,這些更新頻率可能是每小時一次,甚至可能是每天或每週一次。另外可能還需要在最新檢視、包含所有更新的歷史檢視甚至僅是最新增量檢視上執行分析。 通常這會導致使用用於流和批處理的多個系統,前者處理增量資料,而後者處理歷史資料。 ![](https://img2020.cnblogs.com/blog/616953/202006/616953-20200615092545386-1978958420.png) 處理儲存在HDFS上的資料時,維護增量更新的常見工作流程是[這裡](https://community.cloudera.com/t5/Community-Articles/FOUR-STEP-STRATEGY-FOR-INCREMENTAL-UPDATES-IN-APACHE-HIVE-ON/ta-p/246015)所述的Ingest-Reconcile-Compact-Purge策略。 ![](https://img2020.cnblogs.com/blog/616953/202006/616953-20200615092601098-224575856.png) Apache Hudi之類的框架在這裡便可發揮作用。它在後臺為我們管理此工作流程,從而使我們的核心應用程式程式碼更加簡潔,Hudi支援對最新資料檢視的查詢以及查詢在某個時間點的增量更改。 這篇文章將介紹Hudi的核心概念以及如何在Copy-On-Write模式下進行操作。 本篇文章專案原始碼放在[github](https://github.com/oliversavio/hudi-data-lake-example)。 ## 2. 大綱 * 先決條件和框架版本 * Hudi核心概念 * 初始設定和依賴項 * 使用CoW表 ### 2.1 先決條件和框架版本 如果你事先了解如何使用scala編寫spark作業以及讀取和寫入parquet檔案,那麼本篇文章理解起來將非常容易。 框架版本如下 * **JDK:** openjdk 1.8.0_242 * **Scala:** 2.12.8 * **Spark:** 2.4.4 * **Hudi Spark bundle:** 0.5.2-incubating 注意:在撰寫本文時,AWS EMR與Hudi v0.5.0-incubating整合在一起,該軟體包具有一個bug會導致upsert操作卡死或花費很長時間才能完成,可檢視相關[issue](https://github.com/apache/incubator-hudi/issues/1328)瞭解更多,該問題已在當前版本的Hudi(0.5.2-incubating及之後版本)中修復。如果計劃在AWS EMR上執行程式碼,則可能要考慮用最新版本覆蓋預設的整合版本。 ### 2.2 Hudi核心概念 先從一些需要理解的核心概念開始。 **1. 表型別** Hudi支援兩種表型別 * 寫時複製(CoW):寫入CoW表時,將執行Ingest-Reconcile-Compact-Purge週期。每次寫操作後,CoW表中的資料始終是最新記錄,對於需要儘快讀取最新資料的場景,可首選此模式。資料僅以列檔案格式(parquet)儲存在CoW表中,由於每個寫操作都涉及壓縮和覆蓋,因此此模式產生的檔案最少。 * 讀時合併(MoR):MoR表專注於快速寫操作。寫入這些表將建立增量檔案,隨後將其壓縮以生成讀取時的最新資料,壓縮操作可以同步或非同步完成,資料以列檔案格式(parquet)和基於行的檔案格式(avro)組合儲存。 這是Hudi文件中提到的兩種表格格式之間的權衡取捨。 | Trade-off | CoW | MoR | | :------------------ | :--------------------------- | :--------------------------- | | 資料延遲 | Higher | Lower | | 更新開銷 (I/O) | Higher (重寫整個parquet檔案) | Lower (追加到delta log檔案) | | Parquet檔案大小 | Smaller (高update(I/0) 開銷) | Larger (低更新開銷) | | Write Amplification | Higher | Lower (由compaction策略決定) | **2. 查詢型別** Hudi支援兩種主要型別的查詢:“快照查詢”和“增量查詢”。除兩種主要查詢型別外,MoR表還支援“讀優化查詢”。 * 快照查詢:對於CoW表,快照查詢返回資料的最新檢視,而對於MoR表,則返回接近實時的檢視。 對於MoR表,快照查詢將即時合併基本檔案和增量檔案,因此可能會有一些讀取延遲。使用CoW,由於寫入負責合併,因此讀取很快,只需要讀取基本檔案。 * 增量查詢:增量查詢使您可以通過指定“開始”時間或在特定時間點通過指定“開始”和“結束”時間來檢視特定提交時間之後的資料。 * 讀優化查詢:對於MoR表,讀取優化查詢返回一個檢視,該檢視僅包含基本檔案中的資料,而不合並增量檔案。 **3. 以Hudi格式寫入時的關鍵屬性** * `hoodie.datasource.write.table.type`,定義表的型別-預設值為COPY_ON_WRITE。對於MoR表,將此值設定為MERGE_ON_READ。 * `hoodie.table.name`,這是必填欄位,每個表都應具有唯一的名稱。 * `hoodie.datasource.write.recordkey.field`,將此視為表的主鍵。此屬性的值是DataFrame中列的名稱,該列是主鍵。 * `hoodie.datasource.write.precombine.field`,更新資料時,如果存在兩個具有相同主鍵的記錄,則此列中的值將決定更新哪個記錄。選擇諸如時間戳記的列將確保選擇具有最新時間戳記的記錄。 * `hoodie.datasource.write.operation`,定義寫操作的型別。值可以為upsert,insert,bulk_insert和delete,預設值為upsert。 ### 2.3 初始設定和依賴項 **1. 依賴說明** 為了在Spark作業中使用Hudi,需要使用spark-sql,hudi-spark-bundle和spark-avro依賴項,此外還需要將Spark配置為使用KryoSerializer。 pom.xml大致內容如下 ```xml 1.8 1.8 UTF-8 2.12.8 2.12 4.2.0 org.scala-lang scala-library ${scala.version} org.apache.spark spark-sql_${scala.compat.version} 2.4.4 org.apache.hudi hudi-spark-bundle_${scala.compat.version} 0.5.2-incubating
org.apache.spark spark-avro_${scala.compat.version} 2.4.4
``` **2. 設定Schema** 我們使用下面的Album類來表示表的schema。 ```scala case class Album(albumId: Long, title: String, tracks: Array[String], updateDate: Long) ``` **3. 生成測試資料** 建立一些用於upsert操作的資料。 * INITIAL_ALBUM_DATA有兩個記錄,鍵為801。 * UPSERT_ALBUM_DATA包含一個更新的記錄和兩個新的記錄。 ```scala def dateToLong(dateString: String): Long = LocalDate.parse(dateString, formatter).toEpochDay private val INITIAL_ALBUM_DATA = Seq( Album(800, "6 String Theory", Array("Lay it down", "Am I Wrong", "68"), dateToLong("2019-12-01")), Album(801, "Hail to the Thief", Array("2+2=5", "Backdrifts"), dateToLong("2019-12-01")), Album(801, "Hail to the Thief", Array("2+2=5", "Backdrifts", "Go to sleep"), dateToLong("2019-12-03")) ) private val UPSERT_ALBUM_DATA = Seq( Album(800, "6 String Theory - Special", Array("Jumpin' the blues", "Bluesnote", "Birth of blues"), dateToLong("2020-01-03")), Album(802, "Best Of Jazz Blues", Array("Jumpin' the blues", "Bluesnote", "Birth of blues"), dateToLong("2020-01-04")), Album(803, "Birth of Cool", Array("Move", "Jeru", "Moon Dreams"), dateToLong("2020-02-03")) ) ``` **4. 初始化SparkContext** 最後初始化Spark上下文。這裡要注意的重要一點是KryoSerializer的使用。 ```scala val spark: SparkSession = SparkSession.builder() .appName("hudi-datalake") .master("local[*]") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.hive.convertMetastoreParquet", "false") // Uses Hive SerDe, this is mandatory for MoR tables .getOrCreate() ``` ## 2.4 使用CoW表 本節將處理CoW表的記錄,如讀取和刪除記錄。 **1. basePath(基本路徑)和Upsert方法** 定義一個basePath,upsert方法會將表資料寫入該路徑,該方法將以org.apache.hudi格式寫入Dataframe,請確保上面討論的所有Hudi屬性均已設定。 ```scala val basePath = "/tmp/store" private def upsert(albumDf: DataFrame, tableName: String, key: String, combineKey: String) = { albumDf.write .format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, key) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey) .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) // Ignore this property for now, the default is too high when experimenting on your local machine // Set this to a lower value to improve performance. // I'll probably cover Hudi tuning in a separate post. .option("hoodie.upsert.shuffle.parallelism", "2") .mode(SaveMode.Append) .save(s"$basePath/$tableName/") } ``` **2. 初始化upsert** 插入INITIAL_ALBUM_DATA,我們應該建立2條記錄,對於801,該記錄的日期為2019-12-03。 ```scala val tableName = "Album" upsert(INITIAL_ALBUM_DATA.toDF(), tableName, "albumId", "updateDate") spark.read.format("hudi").load(s"$basePath/$tableName/*").show() ``` 讀取CoW表就像使用格式(“hudl”)的常規spark.read一樣簡單。 ```scala // Output +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----------------+--------------------+----------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|albumId| title| tracks|updateDate| +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----------------+--------------------+----------+ | 20200412182343| 20200412182343_0_1| 801| default|65841d0a-0083-447...| 801|Hail to the Thief|[2+2=5, Backdrift...| 18233| | 20200412182343| 20200412182343_0_2| 800| default|65841d0a-0083-447...| 800| 6 String Theory|[Lay it down, Am ...| 18231| +-------------------+--------------------+------------------+----------------------+--------------------+-------+-----------------+--------------------+----------+ ``` 另一種確定的方法是檢視Workload profile的日誌輸出,內容大致如下 ```properties Workload profile :WorkloadProfile {globalStat=WorkloadStat {numInserts=2, numUpdates=0}, partitionStat={default=WorkloadStat {numInserts=2, numUpdates=0}}} ``` **3. 更新記錄** ```scala upsert(UPSERT_ALBUM_DATA.toDF(), tableName, "albumId", "updateDate") ``` 檢視Workload profile的日誌輸出,並驗證它是否符合預期 ```pro Workload profile :WorkloadProfile {globalStat=WorkloadStat {numInserts=2, numUpdates=1}, partitionStat={default=WorkloadStat {numInserts=2, numUpdates=1}}} ``` 查詢輸出如下 ```scala spark.read.format("hudi").load(s"$basePath/$tableName/*").show() //Output +-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|albumId| title| tracks|updateDate| +-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+ | 20200412183510| 20200412183510_0_1| 801| default|65841d0a-0083-447...| 801| Hail to the Thief|[2+2=5, Backdrift...| 18233| | 20200412184040| 20200412184040_0_1| 800| default|65841d0a-0083-447...| 800|6 String Theory -...|[Jumpin' the blue...| 18264| | 20200412184040| 20200412184040_0_2| 802| default|65841d0a-0083-447...| 802| Best Of Jazz Blues|[Jumpin' the blue...| 18265| | 20200412184040| 20200412184040_0_3| 803| default|65841d0a-0083-447...| 803| Birth of Cool|[Move, Jeru, Moon...| 18295| +-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+ ``` **4. 查詢記錄** 我們在上面檢視資料的方式稱為“快照查詢”,這是預設設定,另外還支援“增量查詢”。 **4.1 增量查詢** 要執行增量查詢,我們需要在讀取時將`hoodie.datasource.query.type`屬性設定為`incremental`,並指定`hoodie.datasource.read.begin.instanttime`屬性。 這將在指定的即時時間之後讀取所有記錄,對於本示例,我們將`instantTime`指定為`20200412183510`。 ```scala spark.read .format("hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20200412183510") .load(s"$basePath/$tableName") .show() ``` 這將在提交時間20200412183510之後返回所有記錄。 ```scala +-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path| _hoodie_file_name|albumId| title| tracks|updateDate| +-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+ | 20200412184040| 20200412184040_0_1| 800| default|65841d0a-0083-447...| 800|6 String Theory -...|[Jumpin' the blue...| 18264| | 20200412184040| 20200412184040_0_2| 802| default|65841d0a-0083-447...| 802| Best Of Jazz Blues|[Jumpin' the blue...| 18265| | 20200412184040| 20200412184040_0_3| 803| default|65841d0a-0083-447...| 803| Birth of Cool|[Move, Jeru, Moon...| 18295| +-------------------+--------------------+------------------+----------------------+--------------------+-------+--------------------+--------------------+----------+ ``` **5. 刪除記錄** 我們要檢視的最後一個操作是刪除,刪除類似於upsert,需要一個待刪除記錄的DataFrame,如下面的示例程式碼所示,不需要整行,只需要主鍵即可。 ```scala val deleteKeys = Seq( Album(803, "", null, 0l), Album(802, "", null, 0l) ) import spark.implicits._ val df = deleteKeys.toDF() df.write.format("hudi") .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "albumId") .option(HoodieWriteConfig.TABLE_NAME, tableName) // Set the option "hoodie.datasource.write.operation" to "delete" .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) .mode(SaveMode.Append) // Only Append Mode is supported for Delete. .save(s"$basePath/$tableName/") spark.read.format("hudi").load(s"$basePath/$tableName/*").show() ``` 這是本部分介紹的全部內容。後面我們將探討在MERGE-ON-READ表進行