結合案例講解MapReduce重要知識點 -------- 使用自定義資料實現記憶體排序
阿新 • • 發佈:2018-12-20
自定義資料WCData
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 自定義資料型別 * @author lyd * */ public class WCData implements WritableComparable<WCData>{ public String word; public int counter; public WCData(){ } public WCData(String word, int counter) { this.word = word; this.counter = counter; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeInt(counter); } @Override public void readFields(DataInput in) throws IOException { this.word = in.readUTF(); this.counter = in.readInt(); } @Override public int compareTo(WCData o) { int tmp = o.counter - this.counter; //降序 //int tmp = this.counter - o.counter; //升序 if(tmp != 0){ return tmp; } return 0; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + counter; result = prime * result + ((word == null) ? 0 : word.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; WCData other = (WCData) obj; if (counter != other.counter) return false; if (word == null) { if (other.word != null) return false; } else if (!word.equals(other.word)) return false; return true; } /** * @return the word */ public String getWord() { return word; } /** * @param word the word to set */ public void setWord(String word) { this.word = word; } /** * @return the counter */ public int getCounter() { return counter; } /** * @param counter the counter to set */ public void setCounter(int counter) { this.counter = counter; } /* (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { return word + ":" + counter; } }
MapReduce類
import java.io.IOException; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import edu.qianfeng.mr.day01.WordCountWritable; /** * * @author lyd * *TOP-N *求前幾名 * 資料: hello qianfeng hello qianfeng qianfeng is best qianfeng better hadoop is good spark is nice 取統計後的前三名: qianfeng 4 is 3 hello 2 */ public class TopN extends ToolRunner implements Tool{ /** * 自定義的myMapper * @author lyd * */ static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void setup(Context context)throws IOException, InterruptedException { } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String lines [] = line.split(" "); for (String s: lines) { context.write(new Text(s), new Text(1+"")); } } @Override protected void cleanup(Context context)throws IOException, InterruptedException { } } /** * 自定義MyReducer * @author lyd * */ static class MyReducer extends Reducer<Text, Text, WCData, NullWritable>{ @Override protected void setup(Context context)throws IOException, InterruptedException { } //獲取treeset物件 TreeSet<WCData> ts = new TreeSet<WCData>(); public static final int k = 5; @Override protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException, InterruptedException { int counter = 0; for (Text t : value) { counter += Integer.parseInt(t.toString()); } //建立資料物件 WCData wc = new WCData(key.toString(), counter); //將資料物件放到treeset中 ts.add(wc); if(ts.size() > k){ //移除元素 ts.remove(ts.last()); } // //context.write(wc, null); } @Override protected void cleanup(Context context)throws IOException, InterruptedException { for (WCData wcData : ts) { context.write(wcData, null); } } } @Override public void setConf(Configuration conf) { conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); } @Override public Configuration getConf() { return new Configuration(); } /** * 驅動方法 */ @Override public int run(String[] args) throws Exception { //1、獲取conf物件 Configuration conf = getConf(); //2、建立job Job job = Job.getInstance(conf, "model01"); //3、設定執行job的class job.setJarByClass(TopN.class); //4、設定map相關屬性 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); //5、設定reduce相關屬性 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(WCData.class); job.setOutputValueClass(NullWritable.class); //判斷輸出目錄是否存在,若存在則刪除 FileSystem fs = FileSystem.get(conf); if(fs.exists(new Path(args[1]))){ fs.delete(new Path(args[1]), true); } FileOutputFormat.setOutputPath(job, new Path(args[1])); //6、提交執行job int isok = job.waitForCompletion(true) ? 0 : 1; return isok; } /** * job的主入口 * @param args */ public static void main(String[] args) { try { //對輸入引數作解析 String [] argss = new GenericOptionsParser(new Configuration(), args).getRemainingArgs(); System.exit(ToolRunner.run(new TopN(), argss)); } catch (Exception e) { e.printStackTrace(); } } }