結合案例講解MapReduce重要知識點 ------- 使用自定義MapReduce資料型別實現二次排序
阿新 • • 發佈:2018-12-20
自定義資料型別SSData
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class SSData implements WritableComparable<SSData>{ private int first; private int second; public SSData(){ } public SSData(int first, int second) { this.first = first; this.second = second; } @Override public void write(DataOutput out) throws IOException { out.writeInt(first); out.writeInt(second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readInt(); this.second = in.readInt(); } @Override public int compareTo(SSData o) { int tmp = this.first - o.first; //第一列jiang序 if(tmp != 0){ return tmp; } // //return o.second.compareTo(this.second); return o.second - this.second; //第2列jiang序 } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + first; result = prime * result + second; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SSData other = (SSData) obj; if (first != other.first) return false; if (second != other.second) return false; return true; } /** * @return the first */ public int getFirst() { return first; } /** * @param first the first to set */ public void setFirst(int first) { this.first = first; } /** * @return the second */ public int getSecond() { return second; } /** * @param second the second to set */ public void setSecond(int second) { this.second = second; } @Override public String toString() { return "[ "+first + " " + second+" ]"; } }
MapReduce類SecondarySort
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SecondarySort extends ToolRunner implements Tool{ /** * 自定義的myMapper * @author lyd * */ static class MyMapper extends Mapper<LongWritable, Text, SSData, IntWritable>{ @Override protected void setup(Context context)throws IOException, InterruptedException { } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String lines [] = line.split(" "); SSData ss = new SSData(Integer.parseInt(lines[0]), Integer.parseInt(lines[1])); context.write(ss, new IntWritable(Integer.parseInt(lines[1]))); } @Override protected void cleanup(Context context)throws IOException, InterruptedException { } } /** * 自定義MyReducer * @author lyd * */ static class MyReducer extends Reducer<SSData, IntWritable, SSData, Text>{ @Override protected void setup(Context context)throws IOException, InterruptedException { } @Override protected void reduce(SSData key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException { /** * */ /*for (IntWritable i : value) { SSData ss = new SSData(key.get(), i.get()); context.write(ss, new Text("")); }*/ context.write(key, new Text("")); } @Override protected void cleanup(Context context)throws IOException, InterruptedException { } } @Override public void setConf(Configuration conf) { conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); } @Override public Configuration getConf() { return new Configuration(); } /** * 驅動方法 */ @Override public int run(String[] args) throws Exception { //1、獲取conf物件 Configuration conf = getConf(); //2、建立job Job job = Job.getInstance(conf, "model01"); //3、設定執行job的class job.setJarByClass(SecondarySort.class); //4、設定map相關屬性 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(SSData.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); //5、設定reduce相關屬性 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(SSData.class); job.setOutputValueClass(Text.class); //判斷輸出目錄是否存在,若存在則刪除 FileSystem fs = FileSystem.get(conf); if(fs.exists(new Path(args[1]))){ fs.delete(new Path(args[1]), true); } FileOutputFormat.setOutputPath(job, new Path(args[1])); //6、提交執行job int isok = job.waitForCompletion(true) ? 0 : 1; return isok; } /** * job的主入口 * @param args */ public static void main(String[] args) { try { //對輸入引數作解析 String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs(); System.exit(ToolRunner.run(new SecondarySort(), argss)); } catch (Exception e) { e.printStackTrace(); } } }