Hadoop權威指南---MapReduce的型別與格式
目錄
package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @Public @Stable public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Mapper() { } protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); } protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) { this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } } public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
map函式的預設實現:直接把使用者傳進來的key和value直接寫出去傳遞給reduce函式
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); }
package org.apache.hadoop.mapreduce; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator; import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; @Checkpointable @Public @Stable public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Reducer() { } protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { Iterator var4 = values.iterator(); while(var4.hasNext()) { VALUEIN value = var4.next(); context.write(key, value); } } protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator<VALUEIN> iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } } public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
reduce函式的預設實現:把從map中獲取的key和value迴圈寫出去
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { Iterator var4 = values.iterator();
while(var4.hasNext()) {
VALUEIN value = var4.next();
context.write(key, value);
}
簡單總結:如果使用者使用預設的mapper和reducer,其結果是原樣輸出輸入的資料。
1、MapReduce的型別簡介
* 代表的含義???
2、 輸入格式
2.1、輸入分片與記錄
// 輸入檔案路徑 FileInputFormat.addInputPath(job, new Path(args[1]));
執行作業的客戶端首先呼叫InputFormat的List<InputSplit> getSplits(JobContext job)計算分片,然後由application master來根據InputSplit儲存的分片資訊來排程叢集處理這些資料。map的個數由輸入分片的個數決定。
1)、FileInputFormat類
2)、FileInputFormat類的輸入路徑
3)、FileInputFormat類的輸入分片
4)、小檔案和combineFileInputFormat
每個小檔案都需要一個map任務進行處理
5)、避免切分
6)、mapper中的檔案資訊
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapred.SplitLocationInfo;
@Public
@Stable
public abstract class InputSplit {
public InputSplit() {
}
public abstract long getLength() throws IOException, InterruptedException;
public abstract String[] getLocations() throws IOException, InterruptedException;
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}
package org.apache.hadoop.mapreduce.lib.input;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;
@Public
@Stable
public class FileSplit extends InputSplit implements Writable {
private Path file;
private long start;
private long length;
private String[] hosts;
private SplitLocationInfo[] hostInfos;
public FileSplit() {
}
public FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
public FileSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) {
this(file, start, length, hosts);
this.hostInfos = new SplitLocationInfo[hosts.length];
for(int i = 0; i < hosts.length; ++i) {
boolean inMemory = false;
String[] var10 = inMemoryHosts;
int var11 = inMemoryHosts.length;
for(int var12 = 0; var12 < var11; ++var12) {
String inMemoryHost = var10[var12];
if (inMemoryHost.equals(hosts[i])) {
inMemory = true;
break;
}
}
this.hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
}
}
public Path getPath() {
return this.file;
}
public long getStart() {
return this.start;
}
public long getLength() {
return this.length;
}
public String toString() {
return this.file + ":" + this.start + "+" + this.length;
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, this.file.toString());
out.writeLong(this.start);
out.writeLong(this.length);
}
public void readFields(DataInput in) throws IOException {
this.file = new Path(Text.readString(in));
this.start = in.readLong();
this.length = in.readLong();
this.hosts = null;
}
public String[] getLocations() throws IOException {
return this.hosts == null ? new String[0] : this.hosts;
}
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return this.hostInfos;
}
}
7)、把整個檔案當成一條記錄處理
2.2、文字輸入
2.3、二進位制輸入
2.4、多個輸入(MultipleInputs 為每條輸入路徑指定InputFormat和mapper)
2.5、資料庫輸入和輸出(DBInputFormat和DBOutputFormat,TableInputFormat和TableOutputFormat)
3、輸出格式
3.1、文字輸出
3.2、二進位制輸出
3.3、多個輸出
3.4、 延遲輸出
參考:
《Hadoop權威指南.大資料的儲存與分析.第4版》---第8章 MapReduce的型別與格式