1. 程式人生 > >PySpark關於HDFS檔案(目錄)輸入、資料格式的探討 ####3

PySpark關於HDFS檔案(目錄)輸入、資料格式的探討 ####3

背景 平臺HDFS資料儲存規則是按照“資料集/天目錄/小時目錄/若干檔案”進行的,其中資料集是依據產品線或業務劃分的。 使用者分析資料時,可能需要處理以下五個場景: (一)分析指定資料集、指定日期、指定小時、指定檔案的資料;(二)分析指定資料集、指定日期、指定小時的資料;(三)分析指定資料集、指定日期的資料(24個小時目錄的資料);(四)分析多個數據集、多個日期或多個小時的資料;(五)多種儲存格式(textfile、sequencefile、rcfile等)。 目前我們平臺提供給使用者的分析工具為PySpark(Spark、Spark SQL、Python),本文討論的就是使用PySpark如果應對上述場景。 示例

 假設HDFS存在一個wordcount目錄,包含三個子目錄:data1、data2、data3,  這三個子目錄下均含有一個文字檔案words,各自的內容如下:  解決方案 這裡我們暫時只考慮文字檔案。 1. 分析指定資料集、指定日期、指定小時、指定檔案的資料; 2. 分析指定資料集、指定日期、指定小時的資料; SparkContext textFile預設情況下可以接收一個文字檔案路徑或者僅僅包含文字檔案的目錄,使用該方法可以應對場景(一)、(二)。 (1)分析指定文字檔案的資料,這裡我們假設待分析的資料為“/user/hdfs/yurun/wordcount/data1/words”:  可以得到如下輸出: 
 (2)分析指定目錄的資料,且該目錄僅包含文字檔案,這裡我們假設待分析的資料為“/user/hdfs/yurun/wordcount/data1/”:  執行上述程式碼會得到與(1)相同的結果。 3. 分析指定資料集、指定日期的資料(24個小時目錄的資料); 針對我們的示例,也就是需要“遞迴”分析wordcount目錄下三個子目錄data1、data2、data3中的資料。我們嘗試將上面示例中的輸入路徑修改為“/user/hdfs/yurun/wordcount/”,  執行上述程式碼會出現異常:  根據1、2、3中的示例,如果我們指定的輸入路徑是一個目錄,而這個目錄中存在子目錄就會出現上述異常。究其原因,實際上與FileInputFormat(org.apache.hadoop.mapred.FileInputFormat)的“某個”配置屬性值有關。 注意:原始碼中有兩個模組中都包含這個類, 
 這裡使用的是模組hadoop-mapreduce-client中的類。 根據丟擲的異常資訊,我們可以在FileInputFormat的原始碼中找到如下程式碼塊:  可以看出,如果files中的任何一個FileStatus例項(由file表示)為目錄,便會引發“Not a file”的異常,而files來源於方法listStatus,  我們需要注意兩個很重要的變數: (1)dirs dirs為我們指定的輸入檔案或目錄,它是由getInputPaths計算而來的,  可以看出,我們通過SparkContext textfile指定的輸入路徑實際是儲存在Configuration中的,以屬性FileInputFormat.INPUT_DIR(mapreduce.input.fileinputformat.inputdir)表示,從程式碼邏輯可知,它的值可以為多個以“,”分隔的字串。 與就是說,理論上我們是可以指定多個輸入檔案或目錄的(SparkContext textfile僅支援單個檔案路徑或目錄路徑)。 (2)recursive 如果我們指定的是一個目錄路徑,recursive表示著是否允許在後續的切片計算過程中“遞迴”處理該路徑中的子目錄,它的值由屬性INPUT_DIR_RECURSIVE(mapreduce.input.fileinputformat.input.dir.recursive)決定,預設值為false。 也就是說,預設情況下,FileInputFormat是不會以“遞迴”的形式處理指定目錄中的子目錄的,這也是引發上述異常的根本原因。 如果我們需要處理“遞迴”目錄的場景,可以採用下述兩個方法: (1)在Hadoop的配置檔案mapred-site.xml中新增屬性mapreduce.input.fileinputformat.input.dir.recursive,並指定值為true;  僅僅需要在Spark Client(提交Spark Application的機器)相關機器上操作即可,不需要修改Hadoop叢集配置。 修改配置檔案後,再次提交上述程式,即可正常執行。 (2)使用hadoopRDD;  這種方式不需要修改配置檔案,而是在程式碼中通過Hadoop Configuration(hadoopConf)直接指定相關屬性: mapreduce.input.fileinputformat.inputdir:hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/wordcount/;mapreduce.input.fileinputformat.input.dir.recursive:true; 此外還需要注意hadoopRDD的幾個引數: inputFormatClass:org.apache.hadoop.mapred.TextInputFormatkeyClass:org.apache.hadoop.io.LongWritablevalueClass:org.apache.hadoop.io.Text 這僅僅是針對textfile設定的引數值,對於其它的資料格式會有所不同,後面會討論。 使用hadoopRDD的執行結果會有所不同:  這是因為SparkContext textfile省略了TextInputFormat中的“key”,它表示每一行文字在各自檔案中的起始偏移量。 4. 分析多個數據集、多個日期或多個小時的資料; 這種場景要求我們能夠指定多個目錄或檔案,其中還可能需要“遞迴”處理子目錄,SparkContext textfile只能接收一個目錄或檔案,此時我們只能使用hadoopRDD。 前面提到過,“mapreduce.input.fileinputformat.inputdir”可以以“,”分隔的形式接收多個目錄或檔案路徑。假設我們需要分析的資料為“hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/wordcount/data1”、“hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/wordcount/data2”、“hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/wordcount/data3/words”,程式碼示例如下:  執行結果同上。 5. 多種儲存格式; 1-4的討論僅僅侷限於textfile,textfile易於人的閱讀和分析,但儲存開銷很大,即使採用相應的壓縮,效果也並是很理想。在實踐中我們發現rcfile採用列式壓縮效果顯著,因此也需要考慮如何使得PySpark支援rcfile。 為什麼這個地方要有專門的“考慮”? 簡單來講,Hadoop是使用Java構建的,Spark是使用Scala構建的,而我們現在使用的開發語言為Python,這就帶來一個問題:Java/Scala中的資料型別如何轉換為相應的Python資料型別? 如TextInputFormat返回的鍵值對型別為LongWritable、Text,可以被“自動”轉換為Python中的int、str(基本資料型別均可以被“自動”轉換),RCFileInputFormat返回的鍵值對型別為LongWritable、BytesRefArrayWritable,BytesRefArrayWritable不是基本資料型別,它應該如何被轉換呢? 我們檢視SparkContext hadoopRDD的文件可知,  keyConverter、valueConverter就是用來負責完成鍵值型別的轉換的。 假設我們有一個RCFile格式的檔案:  RCFileInputFormat的鍵型別為LongWritable,可以自動被轉換;RCFileInputFormat的值型別為BytesRefArrayWritable,無法被自動轉換,需要一個Converter,這裡我們把每一個BytesRefArrayWritable例項轉換為一個Text例項,其中三列資料以空格分隔。 我們將Converter定義為BytesRefArrayWritableToStringConverter(com.sina.dip.spark.converter.BytesRefArrayWritableToStringConverter),程式碼如下: 

!!!!!!關鍵點outputformatclass也可以用inputformatclass的方式實現~!!!!!!!!

其實Converter的邏輯非常簡單,就是將BytesRefArrayWritable中的資料提取、轉換為基本資料型別Text。 將上述程式碼編譯打包為converter.jar。 PySpark程式碼如下:  重點注意幾個引數值: mapreduce.input.fileinputformat.inputdir:hdfs://dip.cdh5.dev:8020/user/hdfs/yurun/rcfile/datainputFormatClass:org.apache.hadoop.hive.ql.io.RCFileInputFormatkeyClass:org.apache.hadoop.io.LongWritablevalueClass:org.apache.hadoop.io.TextvalueConverter:com.sina.dip.spark.converter.BytesRefArrayWritableToStringConverter 執行命令:  結果輸出:  三行資料,每行資料均為字串輸出,且以空格分隔,可見資料得到正常轉換。 通過上述方式,我們可以通過SparkContext hadoopRDD支援多種資料格式資料的分析。 總結 本文通過五種常見應用場景的討論,可以得出使用PySpark可以支援靈活的資料輸入路徑,還可以根據需求擴充套件支援多種資料格式。