mapreduce 實戰
阿新 • • 發佈:2018-12-07
從一堆單詞中找出,擁有相同字元的單詞。比如:
輸入: 輸出:
aap aap aap,apa,paa
paa abfsd basdf,sabdf
acle
basdf
sabdf
apa
程式碼Anagram.class:
package com.linewell.mapreduce; import java.io.IOException; import java.util.Arrays; import org.apache.commons.collections.IterableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Anagram implements Tool { public static void main(String[] args) throws Exception { String[] arg0 = { "hdfs://192.168.72.129:9000/local/in", "hdfs://192.168.72.129:9000/local/out3" }; int status =ToolRunner.run(new Configuration(), new Anagram(), arg0); System.exit(status); } public static class AnagramMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String text = value.toString(); char[] textCharArr = text.toCharArray(); // 把字串裝換成字元陣列。 Arrays.sort(textCharArr); // 按字元順序排序 //String SortedText = new String(textCharArr); // 把字元轉換成字串。 context.write(new Text(String.valueOf(textCharArr)), value); } } public static class AnagramReducer extends Reducer< Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer value = new StringBuffer(); int count = 0; // 用來統計相同字母單詞的個數 for (Text text : values) { if (value.length() > 0) { value.append(","); // 用逗號進行單詞之間的拼接 } value.append(text); count++; } System.out.println("============="+count); if (count > 1) { // 過濾單個單詞 context.write(key, new Text(value.toString())); } } } public int run(String[] arg0) throws Exception { Configuration conf = new Configuration(); Path path = new Path(arg0[1]); FileSystem fs = path.getFileSystem(conf); // 刪除已經存在的輸出目錄 if (fs.isDirectory(path)) { fs.delete(path, true); } // 建立物件 Job job = new Job(conf, "anagram"); job.setJarByClass(Anagram.class); // 自定輸入、輸出目錄 FileInputFormat.setInputPaths(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); // 指定map reduce 物件 job.setMapperClass(AnagramMapper.class); job.setReducerClass(AnagramReducer.class); // 指定mapper 和reducer 的輸出型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 提交作業 return job.waitForCompletion(true) ? 1 : 0; } @Override public Configuration getConf() { // TODO Auto-generated method stub return null; } @Override public void setConf(Configuration arg0) { // TODO Auto-generated method stub } }