在hadoop上進行編寫mapreduce程式,統計關鍵詞在text出現次數
阿新 • • 發佈:2019-02-12
mapreduce的處理過程分為2個階段,map階段,和reduce階段。在要求統計指定檔案中的所有單詞的出現次數時,
map階段把每個關鍵詞寫到一行上以逗號進行分隔,並初始化數量為1(相同的單詞hadoop中的map會自動放到一行中)
reduce階段是把每個單詞出現的頻率統計出來重新寫回去。
map階段把每個關鍵詞寫到一行上以逗號進行分隔,並初始化數量為1(相同的單詞hadoop中的map會自動放到一行中)
reduce階段是把每個單詞出現的頻率統計出來重新寫回去。
如程式碼:
package com.clq.hadoop2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { final Text key2 = new Text(); // value2 表示單詞在該行中的出現次數 final IntWritable value2 = new IntWritable(1); // key 表示文字行的起始位置 // value 表示文字行 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { final String[] splited = value.toString().split(","); for (String word : splited) { key2.set(word); // 把key2、value2寫入到context中 context.write(key2, value2); } } }
package com.clq.hadoop2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // value3表示單詞出現的總次數 final IntWritable value3 = new IntWritable(0); /** * key 表示單詞 values 表示map方法輸出的1的集合 context 上下文物件 */ protected void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException { int sum = 0; for (IntWritable count : values) { sum += count.get(); } // 執行到這裡,sum表示該單詞出現的總次數 // key3表示單詞,是最後輸出的key final Text key3 = key; // value3表示單詞出現的總次數,是最後輸出的value value3.set(sum); context.write(key3, value3); } }
package com.clq.hadoop2; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MapperReducer { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //指定輸入和輸出路徑 final String INPUT_PATH = "hdfs://ubuntu:9000/Input"; final String OUTPUT_PATH = "hdfs://ubuntu:9000/output"; //建立一個job物件封裝執行時所需要的資訊 final Job job = new Job(new Configuration(),"MapperReducer"); //打成jar執行 job.setJarByClass(MapperReducer.class); FileInputFormat.setInputPaths(job, INPUT_PATH); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); //指定自己自定義的mapper類 job.setMapperClass(MyMapper.class); //指定執行mapper型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定自己定義的reducer類 job.setReducerClass(MyReducer.class); //指定reducer的key和value型別 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); } }