1. 程式人生 > >統計單詞在每個檔案中出現的次數,並且將出現次數按照降序排列

統計單詞在每個檔案中出現的次數,並且將出現次數按照降序排列

package kaoshi3;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //統計單詞在每個檔案中出現的次數,並且將出現次數按照降序排列
public class wordcount { static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ Text mk=new Text(); Text mv=new Text(); String filename=""; @Override //setup job任務執行時載入一次,可以獲取檔案資訊 protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws
IOException, InterruptedException { InputSplit insplit = context.getInputSplit(); //通過上下文物件,獲取切片檔案的切片資訊 FileSplit fs=(FileSplit)insplit; //轉換其型別,inputsplit中沒有獲取檔名的 filename = fs.getPath().getName(); //獲取檔名 } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //liangchaowei love liujialing String[] sp = value.toString().split(" "); for(String word: sp){ mk.set(word); mv.set(filename); context.write(mk, mv); //單詞作為key,檔名作為value傳送 } } } static class MyReducer extends Reducer<Text, Text, Text, Text>{ Text t = new Text(); String sv=""; int sum1=0; int sum2=0; String file1=""; String file2=""; String out1=""; String out2=""; @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { for(Text v:values){ sv = v.toString(); if(sv.startsWith("mapreduce-4-1.txt")){ file1=sv; //檔名 sum1++; //獲取次數 }else{ file2=sv; //獲取次數 sum2++; //檔名 } } if(sum1>sum2){ //判斷排序 out1=file1+":"+sum1+"\t"+file2+":"+sum2;//輸出資料比較少就用了String型別 t.set(out1); context.write(key,t); }else{ out2=file2+":"+sum2+"\t"+file1+":"+sum1; t.set(out2); context.write(key,t); } } } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { //本地執行新增 System.setProperty("HADOOP_USER_NAME", "hadoop"); //新增配置檔案 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //建立一個job任務 job.setJarByClass(kaoshi3.wordcount.class); //指定驅動類的載入路徑 job.setMapperClass(MyMapper.class); //指定mapper的載入類 job.setReducerClass(MyReducer.class); //指定reduce載入類 job.setOutputKeyClass(Text.class); //設定輸出key的型別 job.setOutputValueClass(Text.class); //設定輸出valse的型別 FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/ksin02")); //新增輸入路徑 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf); Path path = new Path("/ksout02"); //輸出路徑 if(fs.exists(path)){ //判斷輸出路徑是否存在 fs.delete(path,true); } FileOutputFormat.setOutputPath(job, path); job.waitForCompletion(true); } }