摘要:CarbonData 在 Apache Spark 和儲存系統之間起到中介服務的作用,為 Spark 提供的4個重要功能。
本文分享自華為雲社群《Make Apache Spark better with CarbonData》,原文作者:大資料修行者 。
Spark 無疑是一個強大的處理引擎和一個用於更快處理的分散式叢集計算框架。不幸的是,Spark在一些方面也存在不足。如果我們將 Apache Spark 與 Apache CarbonData 結合使用,它可以克服這些不足:
1. 不支援 ACID transaction
2. 沒有quality enforcement
3. 小檔案問題
4. 低效的data skipping
什麼是ACID?
Spark和ACID
ATOMICITY
ACID 中的 A 代表原子性。基本上,這意味著要麼全部成功要麼全部失敗。因此,當您使用 spark data frame writer API時,它應該寫入完整資料或不寫入任何資料。讓我們快速檢視 Spark 文件。根據 Spark 文件:“It is important to realize that these save mode (overwrite) do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.”
雖然整個情況看起來有點可怕,但實際上並沒有那麼糟糕。 Spark dataframe API在內部執行作業級提交,這有助於實現一定程度的原子性,這與使用 Hadoop 的 FileOutputCommitter 的“append”模式一起工作。但是,預設實現會帶來效能開銷,尤其是在使用雲端儲存 [S3/OBS] 而不是 HDFS 時。
我們現在可以執行以下程式碼來證明 Spark overwrite 不是原子的,它可能導致資料損壞或資料丟失。程式碼的第一部分模仿作業 1,它建立 100 條記錄並將其儲存到 ACIDpath 目錄中。程式碼的第二部分模仿作業 2,它試圖覆蓋現有資料,但在操作過程中引發異常。這兩項工作的結果是資料丟失。最後,我們丟失了第一個作業建立的資料。
由於異常,作業級提交不會發生,因此不會儲存新檔案。由於 Spark 刪除了舊檔案,我們丟失了現有資料。 Spark data frame writer API 不是原子的,但它的行為類似於追加操作的原子操作。
CONSISTENCY
分散式系統通常建立在可用性較低的機器之上。一致性是高可用性系統中的一個關鍵問題。如果所有節點同時看到並返回相同的資料,則系統是一致的。有幾種一致性模型,分散式系統中最常用的一種是強一致性、弱一致性和最終一致性。我們瞭解到,Spark writer API 的覆蓋模式會先刪除舊檔案,然後再放置新檔案。因此,在這兩種狀態之間,會有一段時間沒有資料可用。如果我們的工作失敗,那麼我們將丟失資料。這意味著這兩個操作之間沒有平滑的事務。這是 Spark 覆蓋操作的典型原子性問題。而這個問題也破壞了資料的一致性。 Spark API 缺乏一致性。因此,Spark 寫模式不支援一致性。
Isolation and Durability in Spark
隔離意味著分離。與任何其他併發操作分離。假設我們正在寫入尚未提交的資料集,並且有另一個併發程序正在讀取/寫入同一資料集。根據隔離特性,在這種情況下,不應影響他人。典型的資料庫會提供不同的隔離級別,例如已提交讀和可序列化。雖然 Spark 有任務級提交和作業級提交,但由於寫操作缺乏原子性,Spark 無法提供適當的隔離。
最後,Durability 是系統儲存的已提交狀態/資料,這樣即使在出現故障和系統重啟的情況下,資料也能以正確的狀態使用。永續性由儲存層提供,在 Spark 應用程式的情況下,它是 HDFS 和 S3/OBS 的作用。然而,當 Spark 由於缺乏原子性而沒有提供適當的提交時,如果沒有適當的提交,我們就不能指望永續性。
如果我們仔細觀察,所有這些 ACID 屬性都是相互關聯的。由於缺乏原子性,我們失去了一致性和隔離性,由於缺乏隔離性,我們失去了永續性。
Lack of Schema Enforcement
我們知道 Spark 在讀取時意味著 Schema。因此,當我們寫入任何資料時,如果有任何模式不匹配,它不會丟擲異常。讓我們試著用一個例子來理解這一點。讓我們有一個包含以下記錄的輸入陣列。下面的程式將讀取 csv 並轉換為 DF
該程式從 CSV 檔案中讀取,以鑲木地板格式寫回並顯示資料。輸出如下
讓我們讀取另一個輸入 CSV 檔案,其中“Cost”列具有十進位制值而不是整數(如下所示),並對上述檔案執行追加操作
在這種情況下,我們的程式將讀取 CSV,毫無例外地寫入 Parquet 格式。當我們想要顯示/顯示資料幀時,我們的程式將丟擲錯誤
這是因為 Spark 在寫操作期間從不驗證模式。 “Cost”列的模式在第一次載入期間被推斷為整數,在第二次寫入期間,它會毫無問題地附加雙精度型資料。當我們讀取附加資料並呼叫操作時,由於模式不相容,它會引發錯誤。
How to overcome the above drawbacks of Spark
如果我們使用 Apache Spark 將 CarbonData 作為儲存解決方案的附加層插入,則可以管理上述問題。
What is CarbonData
由於 Hadoop 分散式檔案系統 (HDFS) 和物件儲存類似於檔案系統,因此它們不是為提供事務支援而設計的。在分散式處理環境中實現事務是一個具有挑戰性的問題。例如,實施通常必須考慮鎖定對儲存系統的訪問,這是以整體吞吐量效能為代價的。 Apache CarbonData 等儲存解決方案通過將這些事務語義和規則推送到檔案格式本身或元資料和檔案格式組合中,有效地解決了資料湖的這些 ACID 要求。 CarbonData 在 Apache Spark 和儲存系統之間起到中介服務的作用。現在,遵守 ACID 的責任由 CarbonData 負責。底層儲存系統可以是 HDFS、華為 OBS、Amazon S3 或 Azure Blob Storage 之類的任何東西。 CarbonData 為 Spark 提供的幾個重要功能是:
1. ACID transactions.
2. Schema enforcement/Schema validation.
3. Enables Updates, Deletes and Merge.
4. Automatic data indexing.
CarbonData in Apache Spark: ACID
在上面的程式碼片段中,程式碼的第一部分模仿了 job-1,建立了 100 條記錄並將其儲存到 ACIDpath 目錄中。程式碼的第二部分模仿 job-2,它試圖覆蓋現有資料但在操作過程中丟擲異常。
這兩項工作的結果是資料丟失。最後,我們丟失了第一個作業建立的資料。現在讓我們更改如下所示的程式碼以使用 CarbonData。
執行第一個作業並計算行數。正如預期的那樣,您將獲得 100 行。
如果您檢查資料目錄,您將看到一個snappy compressed CarbonData 檔案。該資料檔案以列式編碼格式儲存 100 行。您還將看到一個包含 tablestatus 檔案的元資料目錄。現在執行第二個作業。你對第二份工作有什麼期望?如前所述,這項工作應該嘗試做以下事情。
1. 刪除之前的檔案。
2. 建立一個新檔案並開始寫入記錄。
3. 在作業中間丟擲執行時異常。
由於異常,作業級別提交不會發生,我們丟失了上述觀察到的現有資料在沒有使用 CarbonData 的情況下。
但是現在如果你執行第二個作業,你仍然會得到一個異常。然後,計算行數。您得到的輸出為 100,並且不會丟失舊記錄。看起來 CarbonData 已經使 Overwrite 原子化了。我們看一下資料目錄,你會發現兩個 CarbonData 檔案。
一個檔案由第一個作業建立,另一個檔案由作業 2 建立。作業 2 沒有刪除舊檔案,而是直接建立了一個新檔案並開始向新檔案寫入資料。這種方法使舊資料狀態保持不變。這就是為什麼我們沒有丟失舊資料的原因,因為舊檔案保持不變。新的不完整檔案也在那裡,但不讀取新的不完整檔案中的資料。該邏輯隱藏在元資料目錄中,並使用 tablestatus 檔案進行管理。第二個作業無法在 tablestatus 檔案中建立成功的條目,因為它在中間失敗了。讀取 API 不會讀取 tablestatus 檔案中的條目被標記為刪除的檔案。
這一次,讓我們無一例外地編寫程式碼邏輯,用50條記錄覆蓋舊的100條記錄。
Now the record count shows 50. As expected. So, you have overwritten the older data set of 100 rows with a new data set of 50 rows.
CarbonData 將元資料管理引入 Apache Spark 並使 Spark 資料編寫器 API 具有原子性,從而解決了資料一致性問題。一旦一致性問題得到解決,CarbonData 將能夠提供更新和刪除功能。
Spark With CarbonData: Schema Enforcement
讓我們考慮一個簡單的使用者場景,其中資料分多批到達以進行轉換。這裡為了簡單起見,讓我們假設只有 2 批資料,第二批資料攜帶一些與第一批資料不同型別的列資料。
為了開始實驗,讓我們從表 1 中讀取資料,並使用和不使用 CarbonData 寫入資料。我們能夠使用“Overwrite”模式在有和沒有 CarbonData 的情況下寫入資料。
現在讓我們讀取具有成本列型別的雙型別資料的第二個表,然後將資料幀寫入 Parquet 和 CarbonTables(注意:_c2 是整數型別,我們正在嘗試附加雙型別資料)。使用 parquet 附加模式不匹配的資料沒有問題,但是當程式嘗試將相同的資料附加到 CarbonData 表時,它會丟擲錯誤:
因此,基於上述實驗,我們可以看到 CarbonData 在寫入底層儲存之前驗證模式,這意味著 CarbonData 在寫入時使用模式驗證。如果型別不相容,則 CarbonData 將取消交易。這將有助於在開始時跟蹤問題,而不是與好的資料混淆,然後嘗試找出根本原因。
英文連結:https://brijoobopanna.medium.com/making-apache-spark-better-with-carbondata-d37f98d235de
作者: Brijoobopanna