MapReduce實戰:自定義輸入格式實現成績管理
1. 項目需求
我們取有一份學生五門課程的期末考試成績數據,現在我們希望統計每個學生的總成績和平均成績。 樣本數據如下所示,每行數據的數據格式為:學號、姓名、語文成績、數學成績、英語成績、物理成績、化學成績。
19020090040 秦心芯 123 131 100 95 100
19020090006 李磊 99 92 100 90 100
19020090017 唐一建 90 99 100 89 95
19020090031 曾麗麗 100 99 97 79 96
19020090013 羅開俊 105 115 94 45 100
19020090039 周世海 114 116 93 31 97
19020090020 王正偉 109 98 88 47 99
19020090025 謝瑞彬 94 120 100 50 73
19020090007 於微 89 78 100 66 99
19020090012 劉小利 87 82 89 71 99
下面我們需要編寫程序,實現自定義輸入並求出每個學生的總成績和平均成績。
2. 項目實現
第一步:為了便於每個學生學習成績的計算,這裏我們需要自定義一個 ScoreWritable 類實現 WritableComparable 接口,將學生各門成績封裝起來。
/**
*
*/
package com.hadoop.InputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* @author Zimo
* 編寫學習成績讀寫自定義類ScoreWritable,實現WritableComparable接口中的方法
* 數據格式參考:19020090017 name 90 99 100 89 95
*
*/
public class ScoreWritable implements WritableComparable<Object>{
private float Chinese;
private float Math;
private float English;
private float Physics;
private float Chemistry;
//無參構造器
public ScoreWritable() {}
//重載構造函數
public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry) {
this.Chinese = Chinese;
this.Math = Math;
this.English = English;
this.Physics = Physics;
this.Chemistry = Chemistry;
}
// set/get方法
public float getChinese() {
return Chinese;
}
public void setChinese(float chinese) {
Chinese = chinese;
}
public float getMath() {
return Math;
}
public void setMath(float math) {
Math = math;
}
public float getEnglish() {
return English;
}
public void setEnglish(float english) {
English = english;
}
public float getPhysics() {
return Physics;
}
public void setPhysics(float physics) {
Physics = physics;
}
public float getChemistry() {
return Chemistry;
}
public void setChemistry(float chemistry) {
Chemistry = chemistry;
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
Chinese = in.readFloat();
Math = in.readFloat();
English = in.readFloat();
Physics = in.readFloat();
Chemistry = in.readFloat();
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeFloat(Chinese);
out.writeFloat(Math);
out.writeFloat(English);
out.writeFloat(Physics);
out.writeFloat(Chemistry);
}
@Override
public int compareTo(Object arg0) {
// TODO Auto-generated method stub
return 0;
}
}
第二步:自定義輸入格式 ScoreInputFormat 類,首先繼承 FileInputFormat,然後分別重寫 isSplitable() 方法和 createRecordReader() 方法。 需要註意的是,重寫createRecord Reader()方法,其實也就是重寫其返回的對象ScoreRecordReader。ScoreRecordReader 類繼承 RecordReader,實現數據的讀取。
/**
*
*/
package com.hadoop.InputFormat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.LineReader;
/**
* @author Zimo
* 自定義學生成績讀寫類InputFormat,繼承自FileInputFormat接口,並實現其中的方法
*
*/
public class ScoreInputFormat extends FileInputFormat<Text, ScoreWritable> {
@Override
public RecordReader<Text, ScoreWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new ScoreRecordreader();//RecordReader 中的兩個參數分別填寫我們期望返回的key/value類型,我們期望key為Text類型,
//value為ScoreWritable類型封裝學生所有成績
}
public static class ScoreRecordreader extends RecordReader<Text, ScoreWritable> {
public LineReader in;//行讀取器
public Text key;//自定義key類型
public ScoreWritable value;//自定義value類型
public Text line;//每行數據類型
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
if (in != null) {
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public ScoreWritable getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return 0;
}
@Override
public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split = (FileSplit)input;
Configuration job = context.getConfiguration();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(job);
FSDataInputStream filein = fs.open(file);
in = new LineReader(filein, job);
line = new Text();
key = new Text();
value = new ScoreWritable();
}
//此方法讀取每行數據,完成自定義的key和value
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
int linesize = in.readLine(line);
if (linesize == 0) {
return false;
}
String[] pieces = line.toString().split("\\s+");//解析每行數據,根據空格劃分
if (pieces.length != 7) {
throw new IOException("Invalid record received");
}
//將學生的每門成績轉換為 float 類型
float a, b, c, d, e;
try{
a = Float.parseFloat(pieces[2].trim());
b = Float.parseFloat(pieces[3].trim());
c = Float.parseFloat(pieces[4].trim());
d = Float.parseFloat(pieces[5].trim());
e = Float.parseFloat(pieces[6].trim());
} catch(NumberFormatException exception) {
throw new IOException("Error parsing floating poing value in record");
}
key.set(pieces[0] + "\t" + pieces[1]);//完成自定義key數據
//封裝自定義value數據
value.setChinese(a);
value.setMath(b);
value.setEnglish(c);
value.setPhysics(d);
value.setChemistry(e);
return true;
}
}
}
在上述類中,我們只需根據自己的需求,重點編寫nextKeyValue()方法即可,其它的方法比較固定,仿造著編碼就可以了。
第三步:編寫 MapReduce 程序,統計學生總成績和平均成績。這裏 MapReduce 程序仿造前面模板編寫就可以了,很簡單。
/**
*
*/
package com.hadoop.InputFormat;
import java.io.IOException;
import org.apache.commons.collections.map.StaticBucketMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author Zimo
* 學生成績統計Hadoop程序
*
*/
public class ScoreCount extends Configured implements Tool {
public static class ScoreMapper extends Mapper<Text, ScoreWritable, Text, ScoreWritable> {
@Override
protected void map(Text key, ScoreWritable value, Context context)throws IOException, InterruptedException
{
context.write(key, value);
}
}
public static class ScoreReduce extends Reducer<Text, ScoreWritable, Text, Text> {
private Text text = new Text();
protected void reduce(Text key, Iterable<ScoreWritable> Values, Context context) throws IOException, InterruptedException {
float totalScore = 0.0f;
float averageScore = 0.0f;
for(ScoreWritable ss:Values) {
totalScore = ss.getChinese() + ss.getMath() + ss.getEnglish() + ss.getPhysics() + ss.getChemistry();
averageScore += totalScore/5;
}
text.set(totalScore + "\t" + averageScore);
context.write(key, text);
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
String[] args0 =
{
"hdfs://Centpy:9000/score/score.txt",
"hdfs://Centpy:9000/score/out/"
};
int ec = ToolRunner.run(new Configuration(), new ScoreCount(), args0);
System.exit(ec);
}
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();//讀取配置文件
//創建輸出路徑
Path myPath = new Path(args[1]);
FileSystem hdfs = myPath.getFileSystem(conf);
if (hdfs.isDirectory(myPath)) {
hdfs.delete(myPath, true);
}
Job job = new Job(conf, "ScoreCount");//新建任務
job.setJarByClass(ScoreCount.class);//設置主類
FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑
job.setMapperClass(ScoreMapper.class);// Mapper
job.setReducerClass(ScoreReduce.class);// Reducer
job.setMapOutputKeyClass(Text.class);// Mapper key輸出類型
job.setMapOutputValueClass(ScoreWritable.class);// Mapper value輸出類型
job.setInputFormatClass(ScoreInputFormat.class);//設置自定義輸入格式
job.waitForCompletion(true);
return 0;
}
}
3. 項目測試
輸入文件如下:
運行後結果如圖:
結果中出現了亂碼!為什麽會有這種情況呢?因為MapReduce采用的默認編碼方式是UTF-8,而我上傳的輸入文件中有中文且不是采用UTF-8編碼格式,所以會出現亂碼。解決方法如下。
並且,為了保證編碼格式一致,先檢查eclipse編碼格式是否為UTF-8,不是則修改eclipse編碼格式為UTF-8。Windows -> Preferences。
最後,重新運行一次,結果如下。
到此,項目就完美地結束了。
以上就是博主為大家介紹的這一板塊的主要內容,這都是博主自己的學習過程,希望能給大家帶來一定的指導作用,有用的還望大家點個支持,如果對你沒用也望包涵,有錯誤煩請指出。如有期待可關註博主以第一時間獲取更新哦,謝謝!
版權聲明:本文為博主原創文章,未經博主允許不得轉載。
MapReduce實戰:自定義輸入格式實現成績管理