1. 程式人生 > >Hadoop的I/O操作(壓縮、序列化、基於檔案的資料結構)

Hadoop的I/O操作(壓縮、序列化、基於檔案的資料結構)

一、序列化

序列化就是把結構化物件,轉換成位元組流序列或者其他資料傳輸協議以便於永久化儲存和網路傳輸。反序列化是序列化的逆過程,即將收到的位元組流序列、資料傳輸協議等,轉換成結構化的物件。Hadoop中用於多節點間程序通訊的是RPC(遠端過程呼叫)。RPC序列化格式如下:

1、格式緊湊,節省資源,由於頻寬和儲存是資料中心中的最稀缺的資源,我們必須盡一切可能縮小傳遞資訊的大小和儲存量,提高網路頻寬利用率

2、快速,分散式系統的骨架要求程序間通訊儘可能地減少序列化與反序列化的效能開銷。

3、可擴充套件,當前hadoop的序列化有多種選擇可以滿足新老客戶單格式上的不同。

4、支援互操作,來滿足不同語言所寫的伺服器與客戶端之間的資訊互動。

1、writrable介面

Hadoop使用的是自己的序列化格式Writable,它有節約資源、可重用物件、可擴充套件等特點。在叢集中資訊的傳遞主要就是靠這些序列化的位元組序列來傳遞。

Hadoop中,Writable介面定義了兩個方法:

void write(DataOutput out) throws IOException;使用者將其狀態寫入二進位制格式的DataOutput流。

void readFields(DataInput in) throws IOException;用於從二進位制格式的DataInput流讀取其狀態。

2、writrable

3.1Hadoop自帶的I/O包中的Writable

類的層次結構。


3.1Writable類的層次結構

Java基本資料型別的Writable類如下面列表所示:

Java 基本資料型別 

Writable實現

序列化大小(位元組)

boolean

BooleanWritable

1

byte

ByteWritable

1

int

IntWritable

VIntWritable

4

1~5

float

FloatWritable

4

long

LongWritable

VLongWritable

8

1~9

double

DoubleWritable

8

String

Text

*

Array

ArrayWritable

*

二、SequenceFile順序檔案

HDFS和MapReduce是專為大檔案設計,為了在處理海量小檔案時也能體現Hadoop的優勢,我們可以使用SequenceFile作為小檔案的容器,即將小檔案包裝成SequenceFile型別的檔案,來提高儲存和處理的效率。

1、SequenceFile格式


3.2SequenceFile結構

在儲存結構上,SequenceFile主要由一個Header、多條Record以及位界定識別符號組成。記錄與記錄之間穿插位界定識別符號,可實現從檔案任意位置讀取資料。Header主要包括鍵型別、值型別、壓縮演算法以及使用者自定義元資料等。而每條record以鍵/值對方式進行儲存,包括記錄長度、鍵長度、鍵、值。SequenceFile有三種模式,分別為:1、無壓縮模式;2、記錄壓縮模式;3、塊壓縮模式。記錄壓縮模式與無壓縮模式差別不大,主要去區別在於value將被壓縮,而key不做任何處理,圖解如圖3.2所示。而塊壓縮則是將一系列記錄合併到一起,統一壓縮成一個數據塊,如下圖所示。資料壓縮有利於資料儲存與網路傳輸,但是不利於直接讀取。具體如圖3.3所示。

圖3.3SequenceFile塊壓縮格式

2、SequenceFile的讀寫操作

SequenceFile是Hadoop平臺下的一種二進位制檔案,用來將key/value序列化為二進位制檔案,通常用於小檔案合併的容器。Sequence.writer和SequenceFile.reader是它的兩個主要內部類。

對於SequenceFile的寫操作,可通過createWriter()方法建立SequenceFile物件。該方法有多個過載版本,其中,fs、conf、keyClass、valueClass為必選引數,fs指配置檔案系統,conf指相應配置,keyClass為鍵型別,value為值型別。可選引數包括compressionType、codec、metadata,compressionType指壓縮型別,codec指壓縮程式碼,metadata指檔案元資料。SequenceFile的kay、value並不一定是Writable型別,也可以是其他型別,只要能被序列化與反序列化。Progressable()用來向控制檯顯示目前進度,append()用來向檔案末尾新增鍵值對,close()用來關閉資料流。

對於SequenceFile的讀操作,可以通過reader()例項化用呼叫next()反覆讀取。若序列化框架為Writable型別,next()方法可以讀取下一條鍵值對,若成功,則返回true,失敗則返回false。若序列化框架為其他型別,可以採用以下兩種方法:

 public Object next( Object key ) throwsIOException

 public Object getCurrentValue(Obiect val)  throws IOException

若返回值非null,則可以繼續讀取資料。否則,表示檔案已讀取到末尾。

三、FileInputFormat類

FileInputFormat繼承InputFormat,主要用於:一、指定任務的輸入檔案路徑;二、實現輸入檔案生成分片。把Split分割成record的任務由其子類實現。下圖所示為InputFormat類的層次結構。


圖3.4InputFormat類的層次結構

四、OutputFormat

1、OutputFormat方法

OutputFormat主要用來描述MapReduce作業的輸出規範,如檢查輸出目錄是否已經存在,也可通過RecordWriter來寫出作業的輸出檔案,該輸出檔案儲存在FileSystem中。

OutputFormat主要有三個類,分別為:

public RecordWriter<K,V>getRecordWriter(TaskAttemptContext context)

public void checkOutputSpecs(JobContext context)

public OutputCommittergetOutputCommitter(TaskAttemptContext context)

getRecordWriter()用於獲取給定的RecordWriter,而RecordWriter用於將輸出鍵值對寫入輸出檔案,再將作業輸出寫入檔案系統。

checkOutputSpecs()用於檢查作業輸出規範是否有效,通常再作業提交時驗證,若輸出路徑已經存在將報錯。由於Hadoop平臺處理的都是大規模資料,若將輸出路徑覆蓋,則原先已處理的結果將丟失,這個損失不可估量。所以提交作業時,輸出路徑必須是不存在的。

 getOutputCommitter()用於獲取輸出格式的輸出提交者,來負責確保輸出提交正確。關於OutputCommitter類詳見下一節的解釋。

其中,JobConf是一個重要的引數,主要用來向Hadoop框架描述MapReduce作業執行的主要介面。Hadoop將按JobConference來執行工作,但是一些已標記為final的配置引數將無法更改。

JobConf通常指定要使用的Mapper,組合器(如果有),Partitioner,Reducer,InputFormat和OutputFormat實現等。JobConf也可用於指定作業的其他高階構面,如要使用的比較器,要放入DistributedCache的檔案,是否要壓縮中間和/或作業輸出(以及如何),通過使用者提供的可除錯性指令碼(setMapDebugScript(String)/ setReduceDebugScript(String)),用於在stdout,stderr,syslog上進行後期處理。等等。

2、OutputCommitter

Hadoop使用一個提交協議來確認任務成功完成或完成失敗。這個確認機制通過對任務使用OutputCommitter來實現,通過OutputFormat的getOutputCommitter()來設定。

OutputCommitter有如下幾個常用API:

public abstract voidsetupJob(JobContext jobContext)  throwsIOException

public voidcleanupJob(JobContext jobContext)throws IOException

public voidcommitJob(JobContext jobContext)   throwsIOException

public voidabortJob(JobContext jobContext, int status)throws IOException

MapReduce依賴於OutputCommitter工作:

1、在初始化過程中,通過setupJob()設定作業來執行初始化操作。例如,在作業初始化期間建立作業的臨時輸出目錄,並且為task輸出建立一個臨時工作空間。

2、工作成功處理後,呼叫commitJob()方法完成清理工作。系統預設中,它在作業完成後刪除臨時輸出目錄以告知作業成功完成,同時刪除臨時空間。

3、設定任務臨時輸出。

4、在作業提交時呼叫,用於檢查任務是否需要提交。

5、將任務的臨時輸出提升至最終輸出位置。如果needsTaskCommit(TaskAttemptContext)返回true,則將任務輸出標記為完成。

6、從任務的程序呼叫,來清理尚未提交的單個任務的輸出,對於同一個任務可能會被多次呼叫。

這幾個方法可以從多個不同的程序和幾個不同的上下文中呼叫。每一種方法都應在其檔案中作相應的標記,但並不是所有的方法都保證只調用一次和一次。如果一個方法不能保證有這個屬性,那麼輸出提交者需要適當地處理這個問題。只在極少數情況下,它們可能會被多次呼叫同一個任務。

3、多檔案自定義輸出

hadoop,reduce支援多個檔案輸出,其檔名通過MultipleOutputFormat,重寫generateFileNameForKey方法實現控制。如果只是想做到輸出結果的檔名可控,實現自定義輸出型別,通過設定OutputFormat,使用LogNameMultipleTextOutputFormat.class就可以了,但是這種方式只限於使用舊版本的hadoop api。本論文中,我們希望處理影象檔案,以檔名為key,輸出結果也是圖片,需要重寫MultipleOutputFormat。

首先構造多檔案輸出類MyMultipleOutputFormat類,然後自定義應該LineRecordWriter類,實現記錄寫入器LineRecordWriter,自定義輸出格式。