1. 程式人生 > >Mapreduce自定義資料型別

Mapreduce自定義資料型別

Hadoop自帶的資料型別

Intwritable,LongWritable,Text,xxWritable.

某些情況下:使用自定義的資料型別方便一些(類似java中的pojo)。

實現:

實現writableComparable介面即可。

場景例如:

成績表:由語文,數學,英文組成。

想讓按照總成績進行排名。如果成績相同,則按照語文,數學,英文來排序。

一、自定義ScoreWritable實現writableComparable介面:

package com.day07; import org.apache.hadoop.io.WritableComparable; import java.io.*; public class ScoreWritable implements WritableComparable<ScoreWritable> {     int chinese;     int math;     int english;     int sum;

    public ScoreWritable() {     }     public ScoreWritable(int chinese, int math, int english) {         this.chinese = chinese;         this.math = math;         this.english = english;         this.sum=chinese+english+math;     }     @Override     public String toString() {         return "ScoreWritable{" +                 "chinese=" + chinese +                 ", math=" + math +                 ", english=" + english +                 ", sum=" + sum +                 '}';     }     public int getChinese() {         return chinese;     }     public void setChinese(int chinese) {         this.chinese = chinese;     }     public int getMath() {         return math;     }     public void setMath(int math) {         this.math = math;     }     public int getEnglish() {         return english;     }     public void setEnglish(int english) {         this.english = english;     }     public int getSum() {         return sum;     }     public void setSum(int sum) {         this.sum = sum;     }     //比較     public int compareTo(ScoreWritable that) {         //先比較總成績         if (this.sum>that.getSum()){             return -1;         }else if(this.sum<that.getSum()){             return 1;         }else{             if (this.chinese>that.getChinese()){                 return -1;             }else if (that.chinese<that.getChinese()){                 return 1;             }else {                 return -(this.math-that.getMath());             }         }     }     //序列化--dataOutput(data流):可以自定義序列化物件,節省空間,hadoop用的就是這個流     public void write(DataOutput out) throws IOException {         out.writeInt(chinese);         out.writeInt(math);         out.writeInt(english);         out.writeInt(sum);     }     //反序列化     public void readFields(DataInput in) throws IOException {         this.chinese = in.readInt();         this.math = in.readInt();         this.english = in.readInt();         this.sum = in.readInt();     } }

注意:

最好實現toString方法。

二、編寫ScoreJob類用於測試自定義的ScoreWritable

package com.day05; import com.day03.MaxSaleJob; import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class ScoreJob {     public static class ScoreMapper extends Mapper<LongWritable,Text,ScoreWritable,NullWritable>{         @Override         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {             //super.map(key, value, context);             String[] grades = value.toString().split(",");             ScoreWritable score = new ScoreWritable(Integer.parseInt(grades[0]), Integer.parseInt(grades[1]), Integer.parseInt(grades[2]));             context.write(score,NullWritable.get());         }     }     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {         Configuration coreSiteConf = new Configuration();         coreSiteConf.addResource(Resources.getResource("core-site-local.xml"));         //設定一個任務         Job job = Job.getInstance(coreSiteConf, "score");         //設定job的執行類         job.setJarByClass(ScoreJob.class);         //mrdemo/target/mrdemo-1.0-SNAPSHOT.jar         //job.setJar("mrdemo/target/mrdemo-1.0-SNAPSHOT.jar");         //設定Map和Reduce處理類         job.setMapperClass(ScoreMapper.class);         //map輸出型別         job.setMapOutputKeyClass(ScoreWritable.class);         job.setMapOutputValueClass(NullWritable.class);         //設定job/reduce輸出型別         /*job.setOutputKeyClass(Text.class);         job.setOutputValueClass(NullWritable.class);*/         //設定任務的輸入路徑         FileInputFormat.addInputPath(job, new Path("/score/"));         FileSystem fileSystem = FileSystem.get(coreSiteConf);         if(fileSystem.exists(new Path("/out/"))){

//刪除存在檔案,並遍歷刪除             fileSystem.delete(new Path("/out/"),true);         };         FileOutputFormat.setOutputPath(job, new Path("/out/"));         //執行任務         boolean flag = job.waitForCompletion(true);         if(flag){             FSDataInputStream open = fileSystem.open(new Path("/out/part-r-00000"));             byte[] buffer = new byte[1024];             IOUtils.readFully(open,buffer,0,open.available());             System.out.println(new String(buffer));         }     } }

三、測試結果,類似於一下內容