1. 程式人生 > >MapReduce實戰:自定義輸入格式實現成績管理

MapReduce實戰:自定義輸入格式實現成績管理

stat app 註意 false exce 考試成績 fileinput collect 劃分

1. 項目需求

  我們取有一份學生五門課程的期末考試成績數據,現在我們希望統計每個學生的總成績和平均成績。 樣本數據如下所示,每行數據的數據格式為:學號、姓名、語文成績、數學成績、英語成績、物理成績、化學成績。

19020090040 秦心芯 123 131 100 95 100
19020090006 李磊    99  92  100 90 100
19020090017 唐一建 90  99  100  89 95
19020090031 曾麗麗 100 99   97  79 96
19020090013 羅開俊 105 115  94  45 100
19020090039 周世海 114 116  93  31 97
19020090020 王正偉 109  98  88  47 99
19020090025 謝瑞彬 94  120  100 50 73
19020090007 於微     89   78  100 66 99
19020090012 劉小利  87  82   89 71 99

下面我們需要編寫程序,實現自定義輸入並求出每個學生的總成績和平均成績。

2. 項目實現

  第一步:為了便於每個學生學習成績的計算,這裏我們需要自定義一個 ScoreWritable 類實現 WritableComparable 接口,將學生各門成績封裝起來。

/**
 * 
 */
package com.hadoop.InputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/** * @author Zimo * 編寫學習成績讀寫自定義類ScoreWritable,實現WritableComparable接口中的方法 * 數據格式參考:19020090017 name 90 99 100 89 95 * */ public class ScoreWritable implements WritableComparable<Object>{ private float Chinese; private float Math; private float English; private float Physics;
private float Chemistry; //無參構造器 public ScoreWritable() {} //重載構造函數 public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry) { this.Chinese = Chinese; this.Math = Math; this.English = English; this.Physics = Physics; this.Chemistry = Chemistry; } // set/get方法 public float getChinese() { return Chinese; } public void setChinese(float chinese) { Chinese = chinese; } public float getMath() { return Math; } public void setMath(float math) { Math = math; } public float getEnglish() { return English; } public void setEnglish(float english) { English = english; } public float getPhysics() { return Physics; } public void setPhysics(float physics) { Physics = physics; } public float getChemistry() { return Chemistry; } public void setChemistry(float chemistry) { Chemistry = chemistry; } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub Chinese = in.readFloat(); Math = in.readFloat(); English = in.readFloat(); Physics = in.readFloat(); Chemistry = in.readFloat(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeFloat(Chinese); out.writeFloat(Math); out.writeFloat(English); out.writeFloat(Physics); out.writeFloat(Chemistry); } @Override public int compareTo(Object arg0) { // TODO Auto-generated method stub return 0; } }

  第二步:自定義輸入格式 ScoreInputFormat 類,首先繼承 FileInputFormat,然後分別重寫 isSplitable() 方法和 createRecordReader() 方法。 需要註意的是,重寫createRecord Reader()方法,其實也就是重寫其返回的對象ScoreRecordReader。ScoreRecordReader 類繼承 RecordReader,實現數據的讀取。

/**
 * 
 */
package com.hadoop.InputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.LineReader;

/**
 * @author Zimo
 * 自定義學生成績讀寫類InputFormat,繼承自FileInputFormat接口,並實現其中的方法
 *
 */
public class ScoreInputFormat extends FileInputFormat<Text, ScoreWritable> {

    @Override
    public RecordReader<Text, ScoreWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        return new ScoreRecordreader();//RecordReader 中的兩個參數分別填寫我們期望返回的key/value類型,我們期望key為Text類型,
                                     //value為ScoreWritable類型封裝學生所有成績
    }

    public static class ScoreRecordreader extends RecordReader<Text, ScoreWritable> {

        public LineReader in;//行讀取器
        public Text key;//自定義key類型
        public ScoreWritable  value;//自定義value類型
        public Text line;//每行數據類型
        
        @Override
        public void close() throws IOException {
            // TODO Auto-generated method stub
            if (in != null) {
                in.close();
            }
            
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            return key;
        }

        @Override
        public ScoreWritable getCurrentValue() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            return 0;
        }

        @Override
        public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            FileSplit split = (FileSplit)input;
            Configuration job = context.getConfiguration();
            Path file = split.getPath();
            FileSystem fs = file.getFileSystem(job);
            
            FSDataInputStream filein = fs.open(file);
            in = new LineReader(filein, job);
            line = new Text();
            key = new Text();
            value = new ScoreWritable();
        }
        
        //此方法讀取每行數據,完成自定義的key和value
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int linesize = in.readLine(line);
            if (linesize == 0) {
                return false;
            }
            String[] pieces = line.toString().split("\\s+");//解析每行數據,根據空格劃分
            if (pieces.length != 7) {
                throw new IOException("Invalid record received");
            }
            
            //將學生的每門成績轉換為 float 類型
            float a, b, c, d, e;
            try{
                a = Float.parseFloat(pieces[2].trim());
                b = Float.parseFloat(pieces[3].trim());
                c = Float.parseFloat(pieces[4].trim());
                d = Float.parseFloat(pieces[5].trim());
                e = Float.parseFloat(pieces[6].trim());
            } catch(NumberFormatException exception) {
                throw new IOException("Error parsing floating poing value in record");
            }
            key.set(pieces[0] + "\t" + pieces[1]);//完成自定義key數據
            //封裝自定義value數據
            value.setChinese(a);
            value.setMath(b);
            value.setEnglish(c);
            value.setPhysics(d);
            value.setChemistry(e);
            return true;
        }
        
    }
    
}

  在上述類中,我們只需根據自己的需求,重點編寫nextKeyValue()方法即可,其它的方法比較固定,仿造著編碼就可以了。

  第三步:編寫 MapReduce 程序,統計學生總成績和平均成績。這裏 MapReduce 程序仿造前面模板編寫就可以了,很簡單。

/**
 * 
 */
package com.hadoop.InputFormat;

import java.io.IOException;

import org.apache.commons.collections.map.StaticBucketMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author Zimo
 * 學生成績統計Hadoop程序
 *
 */
public class ScoreCount extends Configured implements Tool {

    public static class ScoreMapper extends Mapper<Text, ScoreWritable, Text, ScoreWritable> {
    
        @Override
        protected void map(Text key, ScoreWritable value, Context context)throws IOException, InterruptedException 
        {
            context.write(key, value);
        }

    }
    
    public static class ScoreReduce extends Reducer<Text, ScoreWritable, Text, Text> {
        
        private Text text = new Text();
        
        protected void reduce(Text key, Iterable<ScoreWritable> Values, Context context) throws IOException, InterruptedException {
            float totalScore = 0.0f;
            float averageScore = 0.0f;
            for(ScoreWritable ss:Values) {
                totalScore = ss.getChinese() + ss.getMath() + ss.getEnglish() + ss.getPhysics() + ss.getChemistry();
                averageScore += totalScore/5;
            }
            text.set(totalScore + "\t" + averageScore);
            context.write(key, text);
        }
    }
    
    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        String[] args0 = 
            { 
            "hdfs://Centpy:9000/score/score.txt",
            "hdfs://Centpy:9000/score/out/" 
            };
    int ec = ToolRunner.run(new Configuration(), new ScoreCount(), args0);
    System.exit(ec);

    }

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();//讀取配置文件
        
        //創建輸出路徑
        Path myPath = new Path(args[1]);
        FileSystem hdfs = myPath.getFileSystem(conf);
        
        if (hdfs.isDirectory(myPath)) {
            hdfs.delete(myPath, true);
        }
        
        Job job = new Job(conf, "ScoreCount");//新建任務
        job.setJarByClass(ScoreCount.class);//設置主類
        FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑
        job.setMapperClass(ScoreMapper.class);// Mapper
        job.setReducerClass(ScoreReduce.class);// Reducer
        
        job.setMapOutputKeyClass(Text.class);// Mapper key輸出類型
        job.setMapOutputValueClass(ScoreWritable.class);// Mapper value輸出類型
                
        job.setInputFormatClass(ScoreInputFormat.class);//設置自定義輸入格式
        
        job.waitForCompletion(true);

        return 0;
    }

}

3. 項目測試

  輸入文件如下:

技術分享圖片

  運行後結果如圖:

技術分享圖片

  結果中出現了亂碼!為什麽會有這種情況呢?因為MapReduce采用的默認編碼方式是UTF-8,而我上傳的輸入文件中有中文且不是采用UTF-8編碼格式,所以會出現亂碼。解決方法如下。

技術分享圖片

  並且,為了保證編碼格式一致,先檢查eclipse編碼格式是否為UTF-8,不是則修改eclipse編碼格式為UTF-8。Windows -> Preferences。

技術分享圖片

  最後,重新運行一次,結果如下。

技術分享圖片

  到此,項目就完美地結束了。

以上就是博主為大家介紹的這一板塊的主要內容,這都是博主自己的學習過程,希望能給大家帶來一定的指導作用,有用的還望大家點個支持,如果對你沒用也望包涵,有錯誤煩請指出。如有期待可關註博主以第一時間獲取更新哦,謝謝!

版權聲明:本文為博主原創文章,未經博主允許不得轉載。

MapReduce實戰:自定義輸入格式實現成績管理