1. 程式人生 > >MapReduce的典型程式設計場景3

MapReduce的典型程式設計場景3

1. 自定義InputFormat –資料分類輸出

  需求:小檔案的合併

  分析:

     - 在資料採集的時候,就將小檔案或小批資料合成大檔案再上傳 HDFS
     - 在業務處理之前,在 HDFS 上使用 MapReduce 程式對小檔案進行合併
     - 在 MapReduce 處理時,可採用 CombineFileInputFormat 提高效率

  實現思路:

     - 編寫自定義的InoputFormat
     - 改寫 RecordReader,實現一次 maptask 讀取一個小檔案的完整內容封裝到一個 KV 對
     - 在Driver 類中一定要設定使用自定義的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)


程式碼實現

public class MergeDriver {
    //job
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
        Job job = null;
        try {
            job = Job.getInstance(conf, "combine small files to bigfile");
            job.setJarByClass(MergeDriver.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);

            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);

            //設定自定義輸入的類
            job.setInputFormatClass(MyMyFileInputForamt.class);

            Path input = new Path("/hadoop/input/num_add");
            Path output = new Path("/hadoop/output/merge_output1");

            //這裡使用自定義得我FileInputForamt去格式化input
            MyMyFileInputForamt.addInputPath(job,input);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(output)) {
                fs.delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output);

            int status = job.waitForCompletion(true) ? 0 : 1;
            System.exit(status);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    static private class MyMapper extends Mapper<NullWritable, Text, NullWritable, Text> {
        /*
            這裡的map方法就是每讀取一個檔案呼叫一次
         */
        @Override
        protected void map(NullWritable key, Text value, Mapper<NullWritable, Text, NullWritable, Text>.Context context)
                throws IOException, InterruptedException {
            context.write(key, value);
        }
    }
    //Reducer
    private static class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
        @Override
        protected void reduce(NullWritable key, Iterable<Text> values,
                              Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            for (Text v : values) {
                context.write(key, v);
            }
        }
    }
    //RecordReader ,這種這個兩個泛型,是map端輸入的key和value的型別
    private static class MyRecordReader extends RecordReader<NullWritable, Text> {

        // 輸出的value物件
        Text map_value = new Text();

        // 檔案系統物件,用於獲取檔案的輸入流
        FileSystem fs;

        // 判斷當前檔案是否已經讀完
        Boolean isReader = false;

        //檔案的切片資訊
        FileSplit fileSplit;

        //初始化方法,類似於Mapper中的setup,整個類最開始執行
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            //初始化檔案系統物件
            fs = FileSystem.get(context.getConfiguration());
            //獲取檔案路徑
            fileSplit = (FileSplit) split;
        }

        //這個方法,在每次呼叫map中傳入的K-V中,就是在這個方法中給K-V賦值的
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            //先讀取一次
            if (!isReader) {
                FSDataInputStream input = fs.open(fileSplit.getPath());
                //一次性將整個小檔案內容都讀取出來
                byte flush[] = new byte[(int) fileSplit.getLength()];
                //將檔案內容讀取到這個byte陣列中
                /**
                 * 引數一:讀取的位元組陣列
                 * 引數二:開始讀取的偏移量
                 * 引數三:讀取的長度
                 */
                input.readFully(flush, 0, (int) fileSplit.getLength());
                isReader = true;
                map_value.set(flush);  //將讀取的內容,放置在map的value中
                //保證能正好讀一次,nextKeyValue()第一次返回true正好可以呼叫一次map,第二次返回false
                return isReader;
            }
            return false;
        }
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();
        }
        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return map_value;
        }
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
        @Override
        public void close() throws IOException {
            fs.close();
        }
    }
    //FileInputFormat
    private static class MyMyFileInputForamt extends FileInputFormat<NullWritable, Text> {
        @Override
        public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            MyRecordReader mr = new MyRecordReader();
            //先呼叫初始化方法
            mr.initialize(split, context);
            return mr;
        }
    }
}

2. 自定義OutputFormat

  需求:一些原始日誌需要做增強解析處理,流程

     - 從原始日誌檔案中讀取資料
     - 根據業務獲取業務資料庫中的資料
     - 根據某個連線條件獲取相應的連線結果

  分析:

     - 在 MapReduce 中訪問外部資源
     - 在業務處理之前,在 HDFS 上使用 MapReduce 程式對小檔案進行合併
     - 自定義 OutputFormat,改寫其中的 RecordWriter,改寫具體輸出資料的方法 write() CombineFileInputFormat 提高效率


程式碼實現
//這裡以一個簡單的案例為例,將檔案按照不同的等級輸出的不同的檔案中

 public class Score_DiffDic {
    //job
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
        Job job = null;
        try {
            job = Job.getInstance(conf, "Score_DiffDic");
            job.setJarByClass(Score_DiffDic.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            //設定自定義輸出型別
            job.setOutputFormatClass(MyOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DoubleWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);

            Path input = new Path("/hadoop/input/num_add");
            FileInputFormat.addInputPath(job,input);
            Path output = new Path("/hadoop/output/merge_output1");
            //這是自定義輸出型別
            MyOutputFormat.setOutputPath(job,output);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(output)) {
                fs.delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output);

            int status = job.waitForCompletion(true) ? 0 : 1;
            System.exit(status);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    private static class MyMapper extends Mapper<LongWritable,Text,Text,DoubleWritable>{
        Text mk=new Text();
        DoubleWritable mv=new DoubleWritable();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
             String[] fields = value.toString().split("\\s+");
            //computer,huangxiaoming,85
            if(fields.length==3){
                mk.set(fields[1]);
                mv.set(Double.parseDouble(fields[2]));
                context.write(mk, mv);
            }
        }
    }
    //Reducer
    private static class MyReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
        DoubleWritable mv=new DoubleWritable();
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
                throws IOException, InterruptedException {
            double  sum=0;
            int count=0;
            for(DoubleWritable value:values){
                sum+=value.get();
                count++;
            }
            mv.set(sum/count);
            context.write(key,mv);
        }
    }

    //FileOutputFormat
    private static class MyOutputFormat extends FileOutputFormat<Text, DoubleWritable> {
        @Override
        public RecordWriter<Text, DoubleWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            FileSystem fs =FileSystem.get(job.getConfiguration());
            return new MyRecordWrite(fs);
        }
    }

    //RecordWriter,這裡的兩個泛型是Reudcer輸出K-V的型別
    private static class MyRecordWrite extends RecordWriter<Text, DoubleWritable> {
        FileSystem fs;
        //輸出的檔案的路徑
        Path path1 = new Path("/hadoop/output/score_out1");
        Path path2 = new Path("/hadoop/output/score_out2");
        FSDataOutputStream output1;
        FSDataOutputStream output2;

        public MyRecordWrite() {

        }
        //初始化引數
        public MyRecordWrite(FileSystem fs) {
            this.fs = fs;
            try {
                output1=fs.create(path1);
                output2=fs.create(path2);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void write(Text key, DoubleWritable value) throws IOException, InterruptedException {
            //業務邏輯操作,平均分數大於80的在path1中,其他的在path2中
            if(value.get()>80){
                output1.write((key.toString()+":"+value.get()+"\n").getBytes());
            }else{
                output2.write((key.toString()+":"+value.get()+"\n").getBytes());
            }
        }
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            fs.close();
            output1.close();
            output2.close();
        }
    }
}