1. 程式人生 > >MaxCompute讀取分析OSS非結構化數據的實踐經驗總結

MaxCompute讀取分析OSS非結構化數據的實踐經驗總結

某個文件 普通表 radi 行數 eas 技術 支持 mono 數據導入

摘要: 本文背景 很多行業的信息系統中,例如金融行業的信息系統,相當多的數據交互工作是通過傳統的文本文件進行交互的。此外,很多系統的業務日誌和系統日誌由於各種原因並沒有進入ELK之類的日誌分析系統,也是以文本文件的形式存在的。

1. 本文背景

很多行業的信息系統中,例如金融行業的信息系統,相當多的數據交互工作是通過傳統的文本文件進行交互的。此外,很多系統的業務日誌和系統日誌由於各種原因並沒有進入ELK之類的日誌分析系統,也是以文本文件的形式存在的。隨著數據量的指數級增長,對超大文本文件的分析越來越成為挑戰。好在阿裏雲的MaxCompute產品從2.0版本開始正式支持了直接讀取並分析存儲在OSS上的文本文件,可以用結構化查詢的方式去分析非結構化的數據。

本文對使用MaxCompute分析OSS文本數據的實踐過程中遇到的一些問題和優化經驗進行了總結。作為前提,讀者需要詳細了解MaxCompute讀取OSS文本數據的一些基礎知識,對這篇官方文檔 《訪問 OSS 非結構化數據》最好有過實踐經驗。本文所描述的內容主要是針對這個文檔中提到的自定義Extractor做出的一些適配和優化。

2. 場景實踐

2.1 場景一:分析zip壓縮後的文本文件

場景說明

很多時候我們會對歷史的文本數據進行壓縮,然後上傳到OSS上進行歸檔,那麽如果要對這部分數據導入MaxCompute進行離線分析,我們可以自定義Extractor讓MaxCompute直接讀取OSS上的歸檔文件,避免了把歸檔文件下載到本地、解壓縮、再上傳回OSS這樣冗長的鏈路。

實現思路

如 《訪問 OSS 非結構化數據》文檔中所述,MaxCompute讀取OSS上的文本數據本質上是讀取一個InputStream流,那麽我們只要構造出適當的歸檔字節流,就可以直接獲取這個InputStream中的數據了。

以Zip格式的歸檔文件為例,我們可以參考 DataX 中關於讀取OSS上Zip文件的源碼,構造一個Zip格式的InputStream,代碼見 ZipCycleInputStream.java 。構造出這個Zip格式的InputStream後,在自定義Extractor中獲取文件流的部分就可以直接使用了,例如:

技術分享圖片

優化經驗

大家可能知道,MaxCompute中進行批量計算的時候,可以通過設置 odps.stage.mapper.split.size這個參數來調整數據分片的大小,從而影響到執行計算任務的Mapper的個數,在一定程度上提高Mapper的個數可以增加計算的並行度,進而提高計算效率 (但也不是說Mapper個數越多越好,因為這樣可能會造成較長時間的資源等待,或者可能會造成長尾的後續Reducer任務,反而降低整體的計算效率) 。

同樣道理,對OSS上的文本文件進行解析的時候,也可以通過設置 odps.sql.unstructured.data.split.size 這個參數來達到調整Mapper個數的目的 (註意這個參數可能需要提工單開通使用權限):

set odps.sql.unstructured.data.split.size=16;

上述設定的含義是,將OSS上的文件拆分為若幹個16M左右大小的分片,讓MaxCompute盡力做到每個分片啟動一個Mapper任務進行計算——之所以說是“盡力做到”,是因為MaxCompute默認不會對單個文件進行拆分及分片處理(除非設定了其他參數,我們後面會講到),也就是說,如果把單個分片按照上面的設定為16M,而OSS上某個文件大小假設為32M,則MaxCompute仍然會把這個文件整體(即32M)的數據量作為一個分片進行Mapper任務計算。

註意點

我們在這個場景中處理的是壓縮後的文件,而InputStream處理的字節量大小是不會因壓縮而變小的。舉個例子,假設壓縮比為1:10,則上述這個32M的壓縮文件實際代表了320M的數據量,即MaxCompute會把1個Mapper任務分配給這320M的數據量進行處理;同理假設壓縮比為1:20,則MaxCompute會把1個Mapper任務分配給640M的數據量進行處理,這樣就會較大的影響計算效率。因此,我們需要根據實際情況調整分片參數的大小,並盡量把OSS上的壓縮文件大小控制在一個比較小的範圍內,從而可以靈活配置分片參數,否則分片參數的值會因為文件太大並且文件不會被拆分而失效。

2.2 場景二:過濾文本文件中的特定行

場景說明

對於一些業務數據文件,特別是金融行業的數據交換文件,通常會有文件頭或文件尾的設定要求,即文件頭部的若幹行數據是一些元數據信息,真正要分析的業務數據需要把這些元信息的行過濾掉,只分析業務數據部分的行,否則執行結構化查詢的SQL語句的時候必然會造成任務失敗。

實現思路

在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 中,對 readNextLine() 方法進行一些改造,對讀取的每一個文件,即每個 currentReader 讀取下一行的時候,記錄下來當前處理的行數,用這個行數判斷是否到達了業務數據行,如果未到業務數據行,則繼續讀取下一條記錄,如果已經到達數據行,則將該行內容返回處理;而當跳轉到下一個文件的時候,將 該行數值重置。

代碼示例:

技術分享圖片

此處 dataLineStart 表示業務數據的起始行,可以通過 DataAttributes 在建立外部表的時候從外部作為參數傳入。當然也可以隨便定義其他邏輯來過濾掉特定行,比如本例中的對文件尾的“EOF”行進行了簡單的丟棄處理。

2.3 場景三:忽略文本中的空行

場景說明

在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 中,已可以應對大多數場景下的文本數據處理,但有時候在業務數據文本中會存在一些空行,這些空行可能會造成程序的誤判,因此我們需要忽略掉這些空行,讓程序繼續分析處理後面有內容的行。

實現思路

類似於上述 場景二 ,只需要判斷為空行後,讓程序繼續讀取下一行文本即可。
代碼示例:

技術分享圖片

2.4 場景四:選擇OSS上文件夾下的部分文件進行處理

場景說明

閱讀 《訪問 OSS 非結構化數據》文檔可知,一張MaxCompute的外部表連接的是OSS上的一個文件夾(嚴格來說OSS沒有“文件夾”這個概念,所有對象都是以Object來存儲的,所謂的文件夾其實就是在OSS創建的一個字節數為0且名稱以“/”結尾的對象。MaxCompute建立外部表時連接的是OSS上這樣的以“/”結尾的對象,即連接一個“文件夾”),在處理外部表時,默認會對該文件夾下 所有的文件進行解析處理。該文件夾下所有的文件集合即被封裝為 InputStreamSet ,然後通過其 next() 方法來依次獲得每一個InputStream流、即每個文件流。

但有時我們可能會希望只處理OSS上文件夾下的 部分 文件,而不是全部文件,例如只分析那些文件名中含有“2018_”字樣的文件,表示只分析2018年以來的業務數據文件。

實現思路

在獲取到每一個InputStream的時候,通過 SourceInputStream 類的 getFileName() 方法獲取正在處理的文件流所代表的文件名,然後可以通過正則表達式等方式判斷該文件流是否為所需要處理的文件,如果不是則繼續調用 next() 方法來獲取下一個文件流。

代碼示例:

技術分享圖片

本例中的 patternModel 為通過 DataAttributes 在建立外部表的時候從外部作為參數傳入的正則規則。

寫到這裏可能有讀者會問,如果一個文件夾下有很多文件,比如上萬個文件,整個遍歷一遍後只選擇一小部分文件進行處理這樣的方式會不會效率太低了?其實大可不必擔心,因為相對於MaxCompute對外部表執行批量計算的過程,循環遍歷文件流的時間消耗是非常小的,通常情況下是不會影響批量計算任務的。

2.5 場景五:針對單個大文件進行拆分

場景說明

在 場景一 中提到,要想提高計算效率,我們需要調整 odps.sql.unstructured.data.split.size 參數值來增加Mapper的並行度,但是對於單個大文件來講,MaxCompute默認是不進行拆分的,也就是說OSS上的單個大文件只會被分配給一個Mapper任務進行處理,如果這個文件非常大的話,處理效率將會及其低下,我們需要一種方式來實現對單個文件進行拆分,使其可以被多個Mapper任務進行並行處理。

實現思路

仍然是要依靠調整 odps.sql.unstructured.data.split.size 參數來增加Mapper的並行度,並且設定 odps.sql.unstructured.data.single.file.split.enabled 參數來允許拆分單個文件 (同odps.sql.unstructured.data.split.size,該參數也可能需要提工單申請使用權限) ,例如:

set odps.sql.unstructured.data.split.size=128;

set odps.sql.unstructured.data.single.file.split.enabled=true;

設置好這些參數後,就需要編寫特定的Reader類來進行單個大文件的拆分了。

核心的思路是,根據 odps.sql.unstructured.data.split.size 所設定的值,大概將文件按照這個大小拆分開,但是拆分點極大可能會切在一條記錄的中間,這時就需要調整字節數,向前或向後尋找換行符,來保證最終的切分點落在一整條記錄的尾部。具體的實現細節相對來講比較復雜,可以參考在 《訪問 OSS 非結構化數據》文檔中提到的 代碼示例 來進行分析。

註意點

在計算字節數的過程中,可能會遇到非英文字符造成計算切分點的位置計算不準確,進而出現讀取的字節流仍然沒有把一整行覆蓋到的情況。這需要針對含有非英文字符的文本數據做一些特殊處理。

代碼示例:

技術分享圖片

3. 其他建議

  1. 在編寫自定義Extractor的程序中,適當加入System.out作為日誌信息輸出,這些日誌信息會在MaxCompute執行時輸出在LogView的視圖中,對於調試過程和線上問題排查過程非常有幫助。

  2. 上文中提到通過調整 odps.sql.unstructured.data.split.size 參數值來適當提高Mapper任務的並行度,但是並行度並不是越高越好,具體什麽值最合適是與OSS上的文件大小、總數據量、MaxCompute產品自身的集群狀態緊密聯系在一起的,需要多次調試,並且可能需要與 odps.stage.reducer.num、odps.sql.reshuffle.dynamicpt、odps.merge.smallfile.filesize.threshold 等參數配合使用才能找到最優值。並且由於MaxCompute產品自身的集群狀態也是很重要的因素,可能今天申請500個Mapper資源是很容易的事情,過幾個月就變成經常需要等待很長時間才能申請到,這就需要持續關註任務的執行時間並及時調整參數設定。

  3. 外部表的讀取和解析是依靠Extractor對文本的解析來實現的,因此在執行效率上是遠不能和MaxCompute的普通表相比的,所以在需要頻繁讀取和分析OSS上的文本文件的情況下,建議將OSS文件先 INSERT OVERWRITE 到MaxCompute中字段完全對等的一張普通表中,然後針對普通表進行分析計算,這樣通常會獲得更好的計算效率。

原文鏈接


MaxCompute讀取分析OSS非結構化數據的實踐經驗總結