1. 程式人生 > >Hadoop實踐(三)---Hadoop資料型別

Hadoop實踐(三)---Hadoop資料型別

適合Hadoop的資料型別

Hadoop使用派生於Writable介面的類作為MapReduce計算的資料型別,這些資料型別用於整個MapReduce計算流的資料吞吐過程,這個過程從讀取輸入資料開始,到傳輸map和reduce任務之間的中間資料,一直到最後寫入輸出資料為止;為輸入資料、中間資料和輸出資料選擇合適的Writable資料型別面對MapReduce程式的可程式設計性和效能有很大的影響

  1. 為了用作MapReduce計算的value資料型別,資料型別必須實現org.apache.hadoop.io.Writable介面;Writable介面定義了當需要資料傳輸和資料儲存時,Hadoop應該如何序列化和反序列化

  2. 為了用作MapReduce計算的key資料型別,資料型別必須實現org.apache.hadoop.io.WritableComparable<T>介面,除了Writable介面的功能之外,有一種WritableComparable介面更進一步定義瞭如何將這種型別的鍵相互比較,以達到排序的目的

Hadoop帶有一些預定於的類用於實現WritableComparable,包括基本資料型別的封裝類:

描述
BooleanWritable 標準布林變數的封裝
ByteWritable 單位元組數的封裝
DoubleWritable 雙位元組數的封裝
FloatWritable 浮點數的封裝
IntWritable 整數的封裝
LongWritable 長整型的封裝
Text 使用UTF-8格式的文字封裝
NullWritable 無鍵值時的佔位符

1.使用泛型型別變數為mapper的鍵值對指定輸入資料型別(鍵:LongWritable,值:Text)和輸出資料型別(鍵:Text,值:IntWritable)

public class SampleMapper extends Mapper<LongWritable,Text,Text,IntWritable
>{
public void map(LongWritable key, Text value,Context context). . .{ . . . } }

2.使用通用型變數為reducer的鍵值對指定輸入資料型別(鍵:Text,值:IntWritable)和輸出資料型別(鍵:Text,值:IntWritable)【reducer的輸入鍵值對資料型別應該和mapper的輸出鍵值對資料型別相匹配】

public class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
    public void reduce(Text key, IntWritable value,Context context). . .{
        . . .
    }
}

3.使用Job物件指定MapReduce計算輸出資料型別,當mapper和reducer的輸出型別相同時:

Job job = new Job(. . .);
(Job job = Job.getInstance();)
. . .
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

4.當mapper和reducer的輸出型別相同時:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

Hadoop提供一些基本資料型別,IntWritable,LongWritable,BooleanWritable,FloatWritable,ByteWritable,這是它們各自的Java基本資料型別的Writable版本,可以使用這些型別作為key型別和value型別

下面是幾種Hadoop內建的資料型別,可以作為作為key型別和value型別:

Text:儲存UTF8文字
BytesWritable:儲存一個位元組序列
VIntWritable和VLongWritable:儲存變長整型和長整型值
NullWritable:這是零長度的Writable型別(可以在不希望使用key或value型別的時候使用)

下面是Hadoop內建的集合資料型別【只能作為value型別】:

ArrayWritable:儲存屬於Writable型別的值陣列(要使用ArrayWritable型別作為reduce輸入的value型別,需要建立ArrayWritable的子類來指定儲存在其中的Writable值的型別):

 public class LongArrayWritable extends ArrayWritable{ 
     public LongArrayWritable(){
         super(LongWritable.class);
    }
}

TwoDArrayWritable:儲存屬於同一個Writable型別的值矩陣(要使用TwoDArrayWritable型別作為reduce輸入的value型別,需要建立與ArrayWritable型別相似的TwoDArrayWritable型別的子類來指定儲存的值的型別)
MapWritable:儲存鍵值對的對映(鍵和值應該是Writable資料型別)
SortedMapWritable:儲存鍵值對的有序對映(鍵應該事先WritableComparable介面)

實現自定義的Hadoop Writable資料型別

通過org.apache.hadoop.io.Writable介面編寫一個定製Writable資料型別用於定義資料型別的序列化格式(基於Writable的介面型別可以用來作為Hadooop MapReduce計算的value型別)

假設日誌包含5個部分:請求的主機,時間戳,請求的URL,相應大小,HTTP狀態碼,如下:

192.168.0.2 -- [01/Jul.1995:00:00:01-0400] "GET/history/appollo/HTTP/1.0" 200 6245

實現日誌條目的自定義Hadoop Writable資料型別的步驟:
1.血一個新的LogWritable類實現org.apache.hadoop.io.Writable介面

public class LogWritable implements Writable{
    private Text userIP;
    private Text timestamp;
    private Text request;
    private IntWritable responseSize;
    private IntWritable status;

    public LogWritable(){
        this.userIP = new Text();
        this.timestamp = new Text();
        this.request = new Text();
        this.responseSize = new IntWritable();
        this.status = new IntWritable();
    }
    public void readFields(DataInput in)throws IOException{
        userIP.readFields(in);
        timestamp.readFields(in);
        request.readFields(in);
        responseSize.readFields(in);
        status.readFields(in);
    }
    public void write(DataOuput out)throws IOException{
        userIP.write(out);
        timestamp.write(out);
        request.write(out);
        responseSize.write(out);
        status.write(out);
    }
    . . .//getters and setters for the fields
}

2.使用新的LogWritable型別作為MapReduce計算的value型別
例如,使用LogWritable型別作為Map輸出值的型別

public class LogProcessMap extends Mapper<LongWritable,Text,Text,LogWritable>{
    ...
}
public class LogProcessReduce extends Reducer<Text,LogWritable,Text,IntWritable>{
    public void reduce(Text key,Iterable<LogWritable> values,Context context){
        ...
    }
}

3.配置相應的作用的輸出型別

Job job = new Job(...);
(Job job = Job.getInstance();)
...
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LogWritable.class);

工作原理
Writable介面包含2個方法:readFields()he write(),在readFields()方法中我們反序列化輸入資料,並填充Writable物件的欄位:

public void readFields(DataInput in)throws IOException{
        userIP.readFields(in);
        timestamp.readFields(in);
        request.readFields(in);
        responseSize.readFields(in);
        status.readFields(in);
    }

在上面的示例中,使用Writable型別作為自定義的Writable型別的欄位,並且使用readFields()方法從DataInput物件的資料欄位中反序列化資料,【當然也可以使用Java的基本資料型別作為Writabel型別的欄位,並使用DataInput物件的想要讀取方法從基礎流中讀取值】,如下面的程式碼所示:

int responseSize = in.readInt();
String userIP = in.readUTF();

在Write()方法中,在底層流中寫入Writable物件的欄位

public void write(DataOuput out)throws IOException{
        userIP.write(out);
        timestamp.write(out);
        request.write(out);
        responseSize.write(out);
        status.write(out);
    }

如果使用的是Java基本資料型別作為Writable物件的欄位,可以使用DataOutPut物件中的對應寫入方法寫入底層流的值:

out.writeInt(responseSize);
out.writeUTF(userIP);

在實現自定義的Writable資料型別時,需要注意一下問題:

1.如果要新增一個自定義的建構函式用於自定義的Writable類,一定要保持預設的空建構函式
2.TextOutputFormat使用toString()方法來序列化key和value型別,如果使用的是TextOutputFormat序列化自定義的Writable型別的例項,那麼要確保用於自定義Writable資料型別的是一個有意義的toString()實現
3.在讀取輸入資料時,Hadoop可多次重複使用Writable類的一個例項,在ReadFields()方法裡面填充欄位時,不應該依賴於該物件的現有狀態

實現自定義Hadoop key型別

Hadoop MapReduce的key型別的例項應該可以進行相互比較來滿足排序的目的,為了在一個MapReduce計算中用作鍵型別,Hadoop的Writable資料型別應該實現org.apache.hadoop.io.WritableComparable<T>介面,WritableComparable介面繼承於org.apache.hadoop.io.Writable介面,並增加了compareTo()方法來執行比較
使用日誌資料來實現Hadoop WritableComparable資料型別的步驟:
1.修改LogWritable類來實現org.apache.hadoop.io.WritableComparable介面

public class LogWritable implements WritableComparable<LogWritable>{
    private Text userIP;
    private Text timestamp;
    private Text request;
    private IntWritable responseSize;
    private IntWritable status;

    public LogWritable(){
        this.userIP = new Text();
        this.timestamp = new Text();
        this.request = new Text();
        this.responseSize = new IntWritable();
        this.status = new IntWritable();
    }
    public void readFields(DataInput in)throws IOException{
        userIP.readFields(in);
        timestamp.readFields(in);
        request.readFields(in);
        responseSize.readFields(in);
        status.readFields(in);
    }
    public void write(DataOuput out)throws IOException{
        userIP.write(out);
        timestamp.write(out);
        request.write(out);
        responseSize.write(out);
        status.write(out);
    }
    public int compareTo(LogWritable o){
        if(userIP.compareTo(o.userIP) == 0){
            return (timestamp.compareTo(o.timestamp));
        }else return (userIP.compareTo(o.userIP));
    }
    public boolean equals(Object o){
        if(o instanceof LogWritable){
            LogWritable other = (LogWritable) o;
            return userIP.equals(other.userIP)&&timestamp.equals(other.timestamp);
            }
        return false;
    }
    public int hashCode(){
        return userIP.hashCode();
    }       
    . . .//getters and setters for the fields
}

2.使用LogWritable型別作為MapReduce計算中key型別或value型別
例如,使用LogWritable型別作為Map輸出key型別

public class LogProcessMap extends Mapper<LongWritable,Text,LogWritableIntWritable>{
    ...
}
public class LogProcessReduce extends Reducer<LogWritable,IntWritableText,IntWritable>{
    public void reduce(LogWritable key,Iterable<IntWritable> values,Context context){
        ...
    }
}

3.配置相應的作業輸出型別

Job job = new Job(...);
(Job job = Job.getInstance();)
...
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(LogWritable.class);
job.setMapOutputValueClass(IntWritable.class);

工作原理:
除了Writable介面的readFields()he write()方法之外,WritableComparable介面還引入了compareTo()方法,compareTo()方法的返回值有3種類型:負整數,0或正整數,分別表示當前物件小於、等於或大於被比較物件,在LogWritable實現中,如果2個使用者的IP地址和時間戳是相同的,那麼就認為這2個物件是相等的,如果物件不相等,就決定排序,首先根據IP地址排序,在根據時間戳排序:

public int compareTo(LogWritable o){
        if(userIP.compareTo(o.userIP) == 0){
            return (timestamp.compareTo(o.timestamp));
        }else return (userIP.compareTo(o.userIP));
    }

Hadoop使用HashPartitioner作為預設Partitioner實現,來計算中間資料在reducer中的分佈,HashPartitioner需要鍵物件的hashcode()方法來滿足一下2個屬性:

1.在不同JVM例項提供相同雜湊值
2.提供雜湊值的均勻分佈

因此,必須實現一個穩定的hashcode()方法,以使自定義的Hadoop key型別滿足上述2個要求。在logWritable實現中,使用請求的主機名/IP地址的雜湊嗎作為LogWritable例項的雜湊程式碼,保證了中間LogWritable資料能基於該請求的主機名/IP地址被正確地區分:

public int hashCode(){
        return userIP.hashCode();
    }   

從mapper中輸出不同值型別的資料

在執行reducer端join操作時,或者在多個MapReduce計算中將不同屬性型別的資料聚合成一個數據集合時需要避免複製性時,從mapper中輸出屬於多個值型別的資料集合,是非常有用的,然而,Hadoop reducer 不允許有徐對各輸入值型別,在這種情況下,可以使用GenericWritable類來包裝屬於不同資料型別的多個value例項

還是基於日誌資料,將彙總從web伺服器到特定直接的總位元組數,同時輸出一個由特定主機請求、由製表符分發的URL列表,使用IntWritable從mapper輸出總位元組數,同時,使用Text輸出請求URL

下面步驟顯示如何實現Hadoop GenericWritable資料型別,該資料型別可以封裝資料型別為IntWritable或Text例項:
1.寫一個類擴充套件org.apache.hadoop.io.GenericWritable類,實現getType()方法,返回將要使用的Witable類的陣列,如果要新增一個自定義的建構函式,那麼要確保新增的是一個無引數的預設建構函式

public class MultiValueWritable extends GenericWritable{
    private static Class[] CLASSES = new class[]{IntWritable.class,Text.class};
    public  MultiValueWritable(){
    }
    public  MultiValueWritable(Writable value){
        set(value);
    }
    protected Class[] getType(){
        return CLASSES;
    }
}

2.設定MultiValueWritable作為mapper的輸值型別,使用MultiValueWritable類的例項,封裝mapper的輸出Writable值

public class LogProcessMap extends Mapper<Object,Text,Text,MultiValueWritable>{
    private Text userHostText = new Text();
    private Text requestText = new Text();
    private IntWritable responseSize = new IntWritable();
}
public void map(Object key, Text value, Context context)...{
    ...//parse the value (log entry)using a regex.
    userHostText.set(userHost);
    requestText.set(request);
    byteWritable.set(responseSize);
    context.write(userHostText,new MultiValueWritable(requestText));
    context.write(userHostText,new MultiValueWritable(responseSize));
    }
}

3.將reducer的輸入值型別設定為MultiValueWritable,實現reduce()方法來處理多個值型別

public class LogProcessReduce extends Reducer<TextMultiValueWritableTextText>{
    private Text result = new Text();
    public void reduce(Text key, Iterable<MultiValueWritable> values, Context context)...{
        int sum = 0;
        StringBuilder requests = new StringBuilder();
        for(MultiValueWritable multiValueWritable : values){
            Writable writable = multiValueWritable.get();
            if(writable instanceof IntWritable){
                sum += ((IntWritable)writable).get();
            }else{
                    requests.append((Text)writable).toString();
                    requests.append("\t");
                }
            }
        result.set(sum + "\t" + requests);
        context.write(key,result);
    }
}

4.設定MultiValueWritable作為本次計算的Map輸出值

Configuration conf = new Configuration();
Job job = new Job(conf, "log-analysis");
...
job.setMapOutputValueClass(MultiValueWritable.class);

工作原理
GenericWritable實現應該繼承org.apache.hadoop.io.GenericWritable,並應該指定一組Writable值型別來進行封裝,實現從getType()方法返回CLASSES陣列,GenericWritable實現使用類陣列之歌的索引類序列化和反序列化資料:

private static Class[] CLASSES = new class[]{IntWritable.class,Text.class};
protected Class[] getType(){
    return CLASSES;
}

在這個mapper中,使用GenericWritable的實現來封裝每個值的例項:

private Text requestText = new Text();
context.write(userHostText,new MultiValueWritable(requestText));

reducer實現必須手動處理不同值型別

if(writable instanceof IntWritable){
        sum += ((IntWritable)writable).get();
}else{
        requests.append((Text)writable).toString();
        requests.append("\t");
    }
}

org.apache.hadoop.io.ObjectWritable是另一個類,可以用來實現和GenericWritable一樣的目標,ObjectWritable類可以處理Java基本型別,字串和陣列等除了Writable封裝需要的型別,Hadoop通過將例項類名寫入每個序列化條目的方式來序列化ObjectWritable例項,這使得它與GenericWritable這種基於類的方式比起來不夠高效。

為輸入資料個數選擇合適的Hadoop InputFormat

Hadoop通過InputFormat來支援許多不同的格式和型別的資料處理,Hadoop MapReduce計算的InputFormat通過解析輸入資料類生成用於mapper的鍵值對輸入InputFormat還執行將輸入資料分割成邏輯分割槽,基本上決定了MapReduce計算的Map任務數,並間接的決定了Map任務的執行位置,Hadoop為每個邏輯分割槽生成map任務,並使用鍵值對作為邏輯切分,呼叫相應的mapper

基於FileInputFormat的KeyValueTextInputFormat作為Hadoop Mapreduce計算的InputFormat
1.指定KeyValueTextInputFormat作為InputFormat,Hadoop MapReduce計算使用Job物件如下:

Configuration conf = new Configuration();
Job job = new Job(conf,"log-analysis");
...
SetInputFormat(KeyValueTextInputFormat.class);

2.設定作業的輸入路徑

FileInputFormat。setInputPath(job,new Path(inputpath));

工作原理:
KeyValueTextInputFormat是一種純文字的輸入格式,它為輸入文字的每一行生成一個鍵值記錄,輸入資料的每一行使用分隔符生成了鍵(Text)、值(Text)對;預設的分隔符是製表符;如果某行不包含分隔符,整行文字將被視為鍵和值為空,可以通過設定作業的配置物件的屬性指定自定義分隔符,如下所示:

conf.set("key.value.separator.in.input.line",",");

KeyValueTextInputFormat基於FileInputFormat,FileInputFormat則是一種基於檔案的InputFormat的基類;使用FileInputFormat類的SetInputPaths()方法指定MapReduce計算的輸入路徑;使用任何基於FileInputFormat類的InputFormat時,必須執行如下步驟:

FileInputFormat.setInputPaths(Job,new Path(inputpath));

可以通過提供一個逗號分隔的路徑列表來為MapReduce計算提供多個HDFS中的輸入路徑,也可以使用FileInputFormat類的addInputPath靜態方法新增額外的計算輸入路徑:

public static void setInputPaths(JobConf conf, Path. . . inputpaths);
public static void addInputPath(JobConf conf, Path path);

確保mapper輸入的資料型別與MapReduce計算所使用的InputFormat產生的資料型別相匹配
Hadoop提供的InputFormat實現,以支援多個公共資料格式:

TextInputFormat:用於純文字檔案;TextInputFormat為輸入文字檔案的每一行生成一個鍵值對記錄;對於每一行,鍵(LongWritable)是行在檔案中的位元組偏移量,值(Text)是行的文字內容;TextInputFormat是Hadoop預設的InputFormat
NLineInputFormat:用於純文字檔案; NLineInputFormat將輸入檔案轉換為固定數目行的邏輯切分;當map任務需要輸入固定數目的行的時候,可以使用 NLineInputFormat;鍵(LongWritable)和值(Text)以類似於TextInputFormat風分割產生用於每個行(Text)的記錄;預設情況下,NLineInputFormat為每行建立一個邏輯切分(對應一個Map任務),可以按如下方式制定每個切分行(或每個Map任務的鍵值對)的數目,NLineInputFormat為輸入文字檔案的每行生成一個鍵值對記錄NLineInputFormat.setNumLinesPerSplit(job,50);
SequenceFileInputFormat:用於Hadoop順序檔案輸入資料,Hadoop順序檔案將資料儲存為二進位制鍵值對,並支援資料的壓縮;當一個MapReduce計算的結果是順序檔案格式時,使用順序檔案格式作為MapReduce計算的輸入,SequenceFileInputFormat顯得非常有用

SequenceFileAsBinaryInputFormat:這是一種以原始二進位制格式程式鍵(ByteWritable)值(ByteWritable)對的格式,是SequenceInputFormat的子類
SequenceFileAsTextInputFormat:這是一個以字串形式呈現鍵(Text)和值(Text)的輸入格式,是SequenceInputFormat的子類

DBInputFormat:這是支援從SQL表中讀取輸入資料,用於MapReduce計算的輸入格式,DBInputFormat使用記錄號作為鍵(LongWritable)。使用查詢結果的記錄作為值(DBWrItable)

在一個MapReduce應用程式中,使用多個輸入資料型別和多個mapper實現

可以使用Hadoop的MultipleInputs功能類執行具有多個輸入路徑的MapReduce作業,同時制定用於每個路徑的不同InputFormat和(可選的)mapper;Hadoop將輸出路由到不同的mapper例項上,使用單一型別reducer例項執行MapReduce計算輸出;當需要處理多個具有相同含義的資料集時,如果這些資料集具有不同的輸入格式(逗號分隔的資料集合製表符分隔的資料集),那麼對於多個輸入具有不同的InputFormat實現是非常有用的

可以使用MultipleInputs類的addInputPath靜態方法,來將輸入路徑和各自路徑相應的InputFormat新增到MapReduce計算:

public static void addInputPath(Job,Path path,class<?extendsInputFormat>inputFormatClass)

具體例項如下:

MultipleInputs.addInputPath(job,path1,CSVInputFormat.class);
MultipleInputs.addInputPath(job,path1,TabInputFormat.class);

當為2個或者更多個數據集執行一個reduce端join時,不同mapper和InputFormat擁有多個同輸入路徑的特性,顯得非常有用:

public static void addInputPath(JobConf conf,Path path,class<?extendsInputFormat>inputFormatClassclass<?extends Mapper>mapperClass)

具體例項如下:

MultipleInputs.addInputPath(job,accessLogPath,TextInputFormat.class,AccessLogMapper.class);
MultipleInputs.addInputPath(job,userDarPath,TextInputFormat.class,UserDataMapper.class);

實現自定義的InputFormat

InputFormat實現應該擴充套件org.apache.hadoop.mapreduce.InputFormat<K,V>抽象類,並重寫createRecordReader()和getSplits()方法

實現InputFormat和RecordReader,用於處理日誌檔案,該InputFormat將產生LongWritable例項的鍵和LogWritable例項的值
基於FileInputFormat的自定義InputFormat,用於處理日誌檔案
1.LogFileInputFormat要直接操作儲存在HDFS檔案中的資料,因此實現一個擴充套件自FileInputFormat的LogFileInputFormat

public class LogFileInputFormat extends FileInputFormat<LongWritable,LogWritable>{
        public RecordReader<LongWritable,LogWritable> createRecorder(InputSplit arg0,TaskAttemptContext arg1)throws . . .{
            return new LogFileRecorderReader();
        }
}

2.實現LogFileRecordReader類

Public class LogFileRecordReader extends RecordReader<LongWritable,LogWritable>{
    LineRecordReader lineReader;
    LogWritable value;

    public void initialize(InputSplit inputSplit,TaskAttempContext attempt)...{
        lineReader = new LineRecordReader();
        lineReader.initialize(inputSplit,attempt);
    }

    public boolean nextKeyValue()throws IOException, ... {
        if(!lineReader.nextKeyValue())
            return false;

        String line = lineReader.getCurrentValue().toString();
        ...//Extract the fields from 'line'using a regex

        value = new LogWritable(userIP,timestamp,request,status,bytes);
        return true;
    }

    public LongWritable getCurrentKey() throws ...{
        return lineReader.getCurrentKey();
    }

    public LogWritable getCurrentValue() throws ...{
        return value;
    }

    public float getProgress() throws IOException, ... {
        return lineReader.getProgress();
    }

    public void close() throws IOException{
        lineReader.close();
    }
} 

3.指定LogFileInputFormat作為InputFormat,用於使用Job物件的MapReduce計算,程式碼如下,指定使用底層FileInputFormat用於計算的輸入路徑:

Configuration conf = new Configuration();
Job job = new Job(conf, "log-analysis")l
...
job.setInputFormatClass(LogFileInputFormat.class);
FileInputFormat.setInputPath(job,new Path(inputpath));

4.確保計算的mapper使用LongWritable作為輸入key型別,LogWritable作為輸入的value型別:

public class LogProcessMap extends Mapper<LongWritable, LogWritbale,Text,IntWritable>{
    public void map(LongWritable key, LogWritable value, Context context) throws ...{
        ...
    }
}

工作原理
LogFileInputFormat繼承FileInputFormat,提供了一個通用的基於InputFormat分割HDFS檔案的機制,在
LogFileInputFormat中覆蓋createRecordReader()方法,提供自定義的RecordReader實現,一個LogFileRecordReader例項,或者可以重寫FileInputFormat的isSplitable()方法來控制輸入檔案是否被分割為幾個邏輯分割槽,或作為整個檔案:

public RecordReader<LongWritable,LogWritable> 
    createRecorder(InputSplit arg0,TaskAttemptContext arg1)throws . . .{
            return new LogFileRecorderReader();
    }

LogFileRecordReader類繼承了org.apache.hadoop.mapreduce.InputFormat<K,V>抽象類,並在內部使用LineRecordReader來執行輸入資料的基本分析,LineRecordReader以行的形式讀取輸入資料:

lineReader = new LineRecordReader();
lineReader.initialize(inputSplit,attempt);

通過執行nextKeyValue()方法來進行輸入資料的日誌條目定義解析,使用正則表示式提取日誌條目的各個欄位,併產生LogWritable類一個例項類封裝這個欄位:

public boolean nextKeyValue()throws IOException, ... {
        if(!lineReader.nextKeyValue())
            return false;

        String line = lineReader.getCurrentValue().toString();
        ...//Extract the fields from 'line'using a regex

        value = new LogWritable(userIP,timestamp,request,status,bytes);
        return true;
    }

可以重寫InputFormat類的getSplit()方法執行輸入資料的自定義分割,getSplit()方法返回InputSplit物件的列表;一個InputSplit物件表示輸入資料的一個邏輯分割槽並且將被分配到一個單獨的Map任務上

InputSplit類擴充套件自InputSplit抽象類,應該覆蓋getLocations()和getLength()方法;getLength()方法應該提供分割的長度,而getLocations()方法應該提供列表節點,其中有這種分割所表示的資料的物理儲存

格式化MapReduce計算的結果–使用Hadoop的OutputFormat

使用Hadoop OutputFormat介面來為MapReduce計算的輸出定義資料儲存格式、資料儲存位置和資料組織形式,OutputFormat準備輸出位置,並提供一個RecordWriter的實現來執行實際的資料序列化和儲存

Hadoop使用org.apache.hadoop.mapreduce.lib.output.TextOutputFormat<K,V>作為MapReduce計算的預設OutputFormat;TextOutputFormat將資料記錄輸出到HDFS的文字檔案中,每個單獨的行儲存一條記錄,TextOutputFormat會使用製表符來分割鍵和值;TextOutputFormat擴充套件FileOutputFormat,這是可用於所有的基於檔案的輸出格式的基類

使用基於FileOutputFormat的SequenceFileOutputFormat作為OutputFormat,用於Hadoop MapReduce計算
1.將指定org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat<K,V>作為OutputFormat用於一個Hadoop MapReduce計算使用Job物件:

Configuration conf = new Configuration();
Job job = new Job(conf,"log-analysis");
...
job.setOutputFormat(SequenceFileOutputFormat.class);

2.設定作業的輸出路徑

FileOutputFormat.setOutputPath(job,new Path(outputpath));

工作原理

SequenceFileOutputFormat將資料序列化到Hadoop序列檔案,Hadoop順序檔案將資料儲存為二進位制鍵值對,支援資料壓縮,序列檔案是高效儲存的非文字檔案格式;如果MapReduce計算的輸出要作為另一個MapReduce計算的輸入,可以使用序列化檔案來儲存前一個MpaReduce的計算結果

FileOutputFormat是SequenceFileOutputFormat的基類,它是所有基於檔案的OutputFormat的基類,使用FileOutputFormat的setOutputPath()方法指定輸出路徑,任何基於FileOutputFormat的OutputFormat必須執行如下操作:

FileOutputFormat.setOutputPath(job,new Path(outputpath));

通過繼承org.apache.hadoop.mapreduce.OutputFormat<K,V>抽象類,可以實現自定有點OutputFormat類,把MapReduce計算的輸出寫出專用的或自定義的資料格式,並且/或者儲存在HDFS之外的儲存系統上

Hadoop的中間資料分割槽

Hadoop在整個reduce任務的計算過程中,對map任務生成的中間資料進行分割槽(一個適當的分割槽函式能確保每個reduce任務負載平衡)【分割槽也可以將相關的記錄集分組,傳送給特定的人reduce任務(如果希望某些輸出被加工或組合在一起的話)】

Hadoop基於中間資料鍵空間劃分中間資料,來決定哪個reduce任務將接收哪個中間結果,有序集合分割槽的鍵和它們的值將作為一個reduce任務的輸入(在Hadoop中,分割槽的總數等於Reduce任務的數目)

Hadoop Partitioner應擴充套件 org.apache.hadoop.mapreduce.Partitioner<KEY,VALUE>抽象類【Hadoop使用org.apache.hadoop.mapreduce.lib.partitioner.HashPartitioner作為預設的Partitioner】

HashPartitioner分割槽基於其hashcode()劃分鍵,使用公司key.hashcode() mod r,其中r是reduce的任務數

實現自定義的用於分析日誌的Partitioner,基於地理區域劃分鍵(IP地址)
1.實現IPBasedPartitioner擴充套件Partitioner抽象類

public class IPBasedPartitioner extends Partitioner<Text,IntWritable>{
    public int getPartition(Text ipAdress, IntWritable value, int numPartitions)
    {
        String region = getGeoLocation(ipAdress);
        if(region != null){
            return ((region.hashcode() & Integer.MAX_VALUE)%numPatririons);
        }
        return 0;
    }
}

2.在Job物件總設定Partitioner類的引數

Job job = new Job(conf,"log-analysis");
...
job.setPartitionerClass(IPBasedPartitioner.class);

工作原理
在中間資料上執行分割槽邏輯,使得來自同一個IP的請求被髮送到相同的reduce例項,getGeoLocation()方法返回給定IP地址的地理位置,例子中省略了getGeoLocation()方法的具體實現,然後得到地理位置的hashcode(),執行模運算來䋀請求的reducer

public int getPartition(Text ipAdress, IntWritable value, int numPartitions)
    {
        String region = getGeoLocation(ipAdress);
        if(region != null){
            return ((region.hashcode() & Integer.MAX_VALUE) % numPartitions);
        }
        return 0;
    }

Hadoop的預設分割槽(HashPartitioner)在對中間資料進行分割槽的時候不強制進行排序
TotalOrderPartitioner和KeyFieldPartitioner是有Hadoop提供的2個內建的Partitioner實現:

TotalOrderPartitioner擴充套件org.apache.hadoop.mapreduce.lib.Partitioner.TotalOrderPartitioner<K,V>,reducer的輸入記錄集是有序的,以確保輸入分割槽中有正確排序(如果要確保全域性有序,可以使用TotalOrderPartitioner強制獲得一個全域性順序以減少每個reducer任務的輸入記錄數)【TotalOrderPartitioner需要一個分割槽檔案作為輸入,來定義分割槽的範圍,org.apache.hadoop.mapreduce.lib.partitionerInputSampleer工具執行我們通過取樣輸入資料,來生成用於TotalOrderPartitioner的分割槽檔案】TotalOrderPartitioner主要用於Hadoop的TeraSort基準測試

Job job = new Job(conf,"Sort");
...
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(jonconf,partitionFile);

KeyFieldPartitioner擴充套件org.apache.hadoop.mapreduce.lib.Partitioner.KeyFieldBasedPartitioner<K,V>可以用來劃分基於部分鍵的中間資料,可以使用分隔字串將每個鍵分割成一組欄位,在進行分割槽的時候,可以指定需要考慮欄位的索引,還可以在欄位中指定字元的索引

將共享資源傳播和分發到MapReduce作業的任務中–Hadoop DistributedCache

利用Hadoop DistributedCache來基於資源(這些資源可以是資料檔案,歸檔檔案,Jar檔案)給Map和Reuduce任務分發只讀檔案,它們都是通過mapper或reducer執行計算時需要的

如何將檔案新增到Hadoop DistributedCache以及如何從Map和Reuduce任務中獲取:

1.將資源複製到HDFS,也可以使用已經存在於HDFS中的檔案
bin/hadoop fs -copyFromLocal ip2loc.dat ip2loc.dat
2.從主程式新增資源到DistributedCache
Job job = new Job(conf,"log-analysis");
. . .
DistributedCache.addCacheFile(new URI("ip2loc.dat # ip2location"),job.getConfiguration());

3.從mapper或reducer的setup()方法中獲取資源,並在map()或reduce()函式哄使用這些資料

public class LogProcessMap extends Mapper<Object ,LogWritable, Text, IntWritable>{
    private IPLookup lookupTable;
    public void setup(Context context) throws IOException{
        File lookipDb = new File("ip2location");//load the ip lookup table to memory
        lookupTable = IPLookup.LoadData(lookupDb);
    }
    public void map(...){
        String country = lookupTable.getCountry(value.ipAddress);
        . . .
    }
}   

工作原理
在任意的作業任務執行前,Hadoop會將已經新增到DistributedCache的檔案複製到顒的工作節點,每個作業,DistributedCache複製這些檔案一次;Hadoop還支援通過增加一個具有需要的URI連結名的片段,在當前計算工作目錄建立連結到DistributedCache中檔案的符號連結;在下面的例子中,使用ip2Location作為DistributedCache中ip2loc.dat檔案的符號連結:

DistributedCache.addCacheFile(new URI("/data/ip2loc.dat # ip2location"),job.getConfiguration());

在mapper或reducer的setup()方法中,從DistributedCache中解析和載入資料(帶符號連結的檔案可以使用提供的連結名從工作目錄訪問到):

private IPLookup lookupTable;
    public void setup(Context context) throws IOException{
        File lookupDb = new File("ip2location");//load the ip lookup table to memory
        lookupTable = IPLookup.LoadData(lookupDb);
    }
    public void map(...){
        String country = lookupTable.getCountry(value.ipAddress);
        . . .
    }

也可以使用getLocalCacheFiles()方法,在不適用符號連結的情況下,直接訪問DistributedCache中的資料:

Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);

DistributedCache不能再Hadoop本地模式下工作

使用DistributedCache分發壓縮檔案
Hadoop在工作節點自動提取壓縮檔案中的檔案,也可以提供符號連結來使用URI片段的檔案:

Job job = new Job(conf,"log-analysis");
. . .
DistributedCache.addCacheArchive(new URI("/data/ip2locationdb.tar.gz#ip2locationdb"),job.getConfiguration());`

歸檔檔案的解壓縮目錄可以使用上面提供的符號連結從mapper或reducer所在工作目錄訪問:

public void setup(Context context) throws IOException{
        File lookupDir = new File("ip2locationdb");
        String[] children = lookupDir.list();
        . . .
    }

也可以直接在mapper或reducer中實現,用下面的方法直接訪問未提前的DistributedCache存檔檔案:

Path[] cachePath;
public void setup(Context context)throws IOException{
    Configuration conf = context.getConfiguration();
    cachePath = DistributedCache.getLocationCacheArchives(conf);
    . . .
}

在命令列中將資源新增到DistributedCache
Hadoop支援使用命令列將檔案或壓縮歸檔新增到DistributedCache,前提是MR主程式實現了org.apache.hadoop.util.Tool介面或利用了org.apache.hadoop.GenericOptionsParser類;檔案可以使用-files命令列選項新增到DistributedCache,歸檔檔案可以使用-archives命令列選項新增,JAR檔案使用-libjars選項新增(檔案或存檔可以在Hadoop可以訪問的任意檔案系統中,包括本地檔案系統)【這些選項支援逗號分隔的路徑列表以及使用URI片段符號連結的建立】

bin/hadoop jar C4LogProcessor.jar LogProcessor - files ip2location.dat#ip2location indir outdir
bin/hadoop jar  C4LogProcessor.jar LogProcessor -archives ip2locationdb.tar.gz#ip2locationdb indir outdir
bin/hadoop jar  C4LogProcessor.jar LogProcessor -libjars ip2locationResolvers.jar indir outdir

使用DistributedCache將資源新增到類路徑
可以使用DistributedCache來分發Jar檔案和其他mapper或reducer的依賴庫,可以在客戶端主程式中使用下列方法,將JAR檔案新增到執行mapper或reducer的JVM類路徑中:

public static void addFileToClassPath(Path file, Configuration conf, FileSystem fs)
public static void addArchiveToClassPath(Path archive, Configuration conf, FileSystem fs)

新增MapReduce作業之間的依賴關係

Hadoop的ControlledJob和JobControl來提供了一種機制,通過指定2個MR作業之間的依賴關係來執行MR作業的簡單工作流圖

例:log-anaylsis計算依賴於log-grep計算:
1.為第一個MR作業建立Configuration和Job物件,並填充其他必要配置項:

Job job1 = new Job(getConf(),"log-grep");
job1.setJarByClass(RegexMapper.class);
job1.setMapperClass(RegexMapper.class);
FileInputFormat.setInputPaths(job1,new Path(inputpath));
FileOutputFormat.setOutputPaths(job1,new Path(intermedpath));
. . . 

2.為第二個MR作業建立Configuration和Job物件,並填充其他必要配置項:

Job job2 = new Job(getConf(),"log-analysis");
job2.setJarByClass(LogProcessorMap.class);
job2.setMapperClass(LogProcessorMap.class);
job2.setReducerClass(LogProcessorReduce.class);
FileOutputFormat.setOutputPaths(job2,new Path(outputpath));
. . . 

3.設定第一個作業的輸出目錄,並將該目錄作為第二個作業的輸入目錄

FileInputFormat.setInputPaths(job2,new Path(intermedpath+"part*"));

4.使用上述建立的Job物件來建立ControlledJob物件

ControlledJob controlledJob1 = new ControlJob(job1.getConfiguration());
ControlledJob controlledJob2 = new ControlJob(job2.getConfiguration());

5.將第一個作業新增成第二個作業的依賴項

controlledJob2.addDependingJob(controlledJob1);

6.為作業建立JobControl物件,並將步驟4建立的ControlledJob物件新增到新建立的JobControl物件中

JobControl jobControl = new JobControl("JobControlDemoGroup");
jobControl.addJob(controlledJob1);
jobControl.addJob(controlledJob2);

7.建立一個新的執行緒來執行新增到JobControl物件的作業組,啟動執