MapReduce的典型程式設計場景3
阿新 • • 發佈:2019-01-13
1. 自定義InputFormat –資料分類輸出
需求:小檔案的合併
分析:
- 在資料採集的時候,就將小檔案或小批資料合成大檔案再上傳 HDFS
- 在業務處理之前,在 HDFS 上使用 MapReduce 程式對小檔案進行合併
- 在 MapReduce 處理時,可採用 CombineFileInputFormat 提高效率
實現思路:
- 編寫自定義的InoputFormat
- 改寫 RecordReader,實現一次 maptask 讀取一個小檔案的完整內容封裝到一個 KV 對
- 在Driver 類中一定要設定使用自定義的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)
程式碼實現:
public class MergeDriver { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "combine small files to bigfile"); job.setJarByClass(MergeDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //設定自定義輸入的類 job.setInputFormatClass(MyMyFileInputForamt.class); Path input = new Path("/hadoop/input/num_add"); Path output = new Path("/hadoop/output/merge_output1"); //這裡使用自定義得我FileInputForamt去格式化input MyMyFileInputForamt.addInputPath(job,input); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper static private class MyMapper extends Mapper<NullWritable, Text, NullWritable, Text> { /* 這裡的map方法就是每讀取一個檔案呼叫一次 */ @Override protected void map(NullWritable key, Text value, Mapper<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { context.write(key, value); } } //Reducer private static class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> { @Override protected void reduce(NullWritable key, Iterable<Text> values, Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { for (Text v : values) { context.write(key, v); } } } //RecordReader ,這種這個兩個泛型,是map端輸入的key和value的型別 private static class MyRecordReader extends RecordReader<NullWritable, Text> { // 輸出的value物件 Text map_value = new Text(); // 檔案系統物件,用於獲取檔案的輸入流 FileSystem fs; // 判斷當前檔案是否已經讀完 Boolean isReader = false; //檔案的切片資訊 FileSplit fileSplit; //初始化方法,類似於Mapper中的setup,整個類最開始執行 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //初始化檔案系統物件 fs = FileSystem.get(context.getConfiguration()); //獲取檔案路徑 fileSplit = (FileSplit) split; } //這個方法,在每次呼叫map中傳入的K-V中,就是在這個方法中給K-V賦值的 @Override public boolean nextKeyValue() throws IOException, InterruptedException { //先讀取一次 if (!isReader) { FSDataInputStream input = fs.open(fileSplit.getPath()); //一次性將整個小檔案內容都讀取出來 byte flush[] = new byte[(int) fileSplit.getLength()]; //將檔案內容讀取到這個byte陣列中 /** * 引數一:讀取的位元組陣列 * 引數二:開始讀取的偏移量 * 引數三:讀取的長度 */ input.readFully(flush, 0, (int) fileSplit.getLength()); isReader = true; map_value.set(flush); //將讀取的內容,放置在map的value中 //保證能正好讀一次,nextKeyValue()第一次返回true正好可以呼叫一次map,第二次返回false return isReader; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return map_value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { fs.close(); } } //FileInputFormat private static class MyMyFileInputForamt extends FileInputFormat<NullWritable, Text> { @Override public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { MyRecordReader mr = new MyRecordReader(); //先呼叫初始化方法 mr.initialize(split, context); return mr; } } }
2. 自定義OutputFormat
需求:一些原始日誌需要做增強解析處理,流程
- 從原始日誌檔案中讀取資料
- 根據業務獲取業務資料庫中的資料
- 根據某個連線條件獲取相應的連線結果
分析:
- 在 MapReduce 中訪問外部資源
- 在業務處理之前,在 HDFS 上使用 MapReduce 程式對小檔案進行合併
- 自定義 OutputFormat,改寫其中的 RecordWriter,改寫具體輸出資料的方法 write() CombineFileInputFormat 提高效率
程式碼實現
//這裡以一個簡單的案例為例,將檔案按照不同的等級輸出的不同的檔案中
public class Score_DiffDic {
//job
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
Job job = null;
try {
job = Job.getInstance(conf, "Score_DiffDic");
job.setJarByClass(Score_DiffDic.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//設定自定義輸出型別
job.setOutputFormatClass(MyOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
Path input = new Path("/hadoop/input/num_add");
FileInputFormat.addInputPath(job,input);
Path output = new Path("/hadoop/output/merge_output1");
//這是自定義輸出型別
MyOutputFormat.setOutputPath(job,output);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
int status = job.waitForCompletion(true) ? 0 : 1;
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private static class MyMapper extends Mapper<LongWritable,Text,Text,DoubleWritable>{
Text mk=new Text();
DoubleWritable mv=new DoubleWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("\\s+");
//computer,huangxiaoming,85
if(fields.length==3){
mk.set(fields[1]);
mv.set(Double.parseDouble(fields[2]));
context.write(mk, mv);
}
}
}
//Reducer
private static class MyReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
DoubleWritable mv=new DoubleWritable();
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum=0;
int count=0;
for(DoubleWritable value:values){
sum+=value.get();
count++;
}
mv.set(sum/count);
context.write(key,mv);
}
}
//FileOutputFormat
private static class MyOutputFormat extends FileOutputFormat<Text, DoubleWritable> {
@Override
public RecordWriter<Text, DoubleWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
FileSystem fs =FileSystem.get(job.getConfiguration());
return new MyRecordWrite(fs);
}
}
//RecordWriter,這裡的兩個泛型是Reudcer輸出K-V的型別
private static class MyRecordWrite extends RecordWriter<Text, DoubleWritable> {
FileSystem fs;
//輸出的檔案的路徑
Path path1 = new Path("/hadoop/output/score_out1");
Path path2 = new Path("/hadoop/output/score_out2");
FSDataOutputStream output1;
FSDataOutputStream output2;
public MyRecordWrite() {
}
//初始化引數
public MyRecordWrite(FileSystem fs) {
this.fs = fs;
try {
output1=fs.create(path1);
output2=fs.create(path2);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, DoubleWritable value) throws IOException, InterruptedException {
//業務邏輯操作,平均分數大於80的在path1中,其他的在path2中
if(value.get()>80){
output1.write((key.toString()+":"+value.get()+"\n").getBytes());
}else{
output2.write((key.toString()+":"+value.get()+"\n").getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
fs.close();
output1.close();
output2.close();
}
}
}