1. 程式人生 > >Hadoop權威指南---MapReduce的型別與格式

Hadoop權威指南---MapReduce的型別與格式

目錄

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Mapper() {
    }

    protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        context.write(key, value);
    }

    protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKeyValue()) {
                this.map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

map函式的預設實現:直接把使用者傳進來的key和value直接寫出去傳遞給reduce函式

 protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {         context.write(key, value);     }

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

@Checkpointable
@Public
@Stable
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Reducer() {
    }

    protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        Iterator var4 = values.iterator();

        while(var4.hasNext()) {
            VALUEIN value = var4.next();
            context.write(key, value);
        }

    }

    protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator<VALUEIN> iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

reduce函式的預設實現:把從map中獲取的key和value迴圈寫出去

 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {         Iterator var4 = values.iterator();

        while(var4.hasNext()) {             VALUEIN value = var4.next();             context.write(key, value);

        }

    }

簡單總結:如果使用者使用預設的mapper和reducer,其結果是原樣輸出輸入的資料。

1、MapReduce的型別簡介

* 代表的含義???

2、 輸入格式 

2.1、輸入分片與記錄 

// 輸入檔案路徑 FileInputFormat.addInputPath(job, new Path(args[1]));

執行作業的客戶端首先呼叫InputFormat的List<InputSplit> getSplits(JobContext job)計算分片,然後由application master來根據InputSplit儲存的分片資訊來排程叢集處理這些資料。map的個數由輸入分片的個數決定

1)、FileInputFormat類

 

2)、FileInputFormat類的輸入路徑

3)、FileInputFormat類的輸入分片

4)、小檔案和combineFileInputFormat

每個小檔案都需要一個map任務進行處理

5)、避免切分

6)、mapper中的檔案資訊



package org.apache.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapred.SplitLocationInfo;

@Public
@Stable
public abstract class InputSplit {
    public InputSplit() {
    }

    public abstract long getLength() throws IOException, InterruptedException;

    public abstract String[] getLocations() throws IOException, InterruptedException;

    @Evolving
    public SplitLocationInfo[] getLocationInfo() throws IOException {
        return null;
    }
}
package org.apache.hadoop.mapreduce.lib.input;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;

@Public
@Stable
public class FileSplit extends InputSplit implements Writable {
    private Path file;
    private long start;
    private long length;
    private String[] hosts;
    private SplitLocationInfo[] hostInfos;

    public FileSplit() {
    }

    public FileSplit(Path file, long start, long length, String[] hosts) {
        this.file = file;
        this.start = start;
        this.length = length;
        this.hosts = hosts;
    }

    public FileSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
        this(file, start, length, hosts);
        this.hostInfos = new SplitLocationInfo[hosts.length];

        for(int i = 0; i < hosts.length; ++i) {
            boolean inMemory = false;
            String[] var10 = inMemoryHosts;
            int var11 = inMemoryHosts.length;

            for(int var12 = 0; var12 < var11; ++var12) {
                String inMemoryHost = var10[var12];
                if (inMemoryHost.equals(hosts[i])) {
                    inMemory = true;
                    break;
                }
            }

            this.hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
        }

    }

    public Path getPath() {
        return this.file;
    }

    public long getStart() {
        return this.start;
    }

    public long getLength() {
        return this.length;
    }

    public String toString() {
        return this.file + ":" + this.start + "+" + this.length;
    }

    public void write(DataOutput out) throws IOException {
        Text.writeString(out, this.file.toString());
        out.writeLong(this.start);
        out.writeLong(this.length);
    }

    public void readFields(DataInput in) throws IOException {
        this.file = new Path(Text.readString(in));
        this.start = in.readLong();
        this.length = in.readLong();
        this.hosts = null;
    }

    public String[] getLocations() throws IOException {
        return this.hosts == null ? new String[0] : this.hosts;
    }

    @Evolving
    public SplitLocationInfo[] getLocationInfo() throws IOException {
        return this.hostInfos;
    }
}

7)、把整個檔案當成一條記錄處理

 

 

2.2、文字輸入 

 

2.3、二進位制輸入 

2.4、多個輸入(MultipleInputs 為每條輸入路徑指定InputFormat和mapper)

2.5、資料庫輸入和輸出(DBInputFormat和DBOutputFormat,TableInputFormat和TableOutputFormat)

3、輸出格式 

3.1、文字輸出 

3.2、二進位制輸出 

3.3、多個輸出 

3.4、 延遲輸出 

參考:

《Hadoop權威指南.大資料的儲存與分析.第4版》---第8章 MapReduce的型別與格式