1. 程式人生 > >hadoop 單詞個數及所處檔案位置統計

hadoop 單詞個數及所處檔案位置統計

一、題目描述

        輸入若干個檔案,得到所有檔案中某單詞的所在檔名,單詞在文件中出現的次數和具體的位置資訊

例如,輸入檔案如下:

1.txt:

it iswhat it is

what isit

it is abanana

2.txt:

i is whathe is

haoop isit

it he abanana

3.txt:

hadoop iswhat hello is

what isit

hello isa he

輸出如下:

a     {1:1;(3,3) };{3:1;(3,3) };{2:1;(3,3) };

banana   {2:1;(3,4) };{1:1;(3,4) };

hadoop  {3:1;(1,1) };

haoop    {2:1;(2,1) };

he   {2:2;(1,4) (3,2) };{3:1;(3,4) };

hello       {3:2;(1,4) (3,1) };

i      {2:1;(1,1) };

is     {2:3;(1,2) (1,5) (2,2) };{3:4;(1,2) (1,5)(2,2) (3,2) };{1:4;(1,2) (1,5) (2,2) (3,2) };

it     {1:4;(1,1) (1,4) (2,3) (3,1) };{2:2;(2,3)(3,1) };{3:1;(2,3) };

what       {3:2;(1,3) (2,1) };{2:1;(1,3)};{1:2;(1,3) (2,1) };

例如,在輸出檔案中“he  {2:2;(1,4) (3,2) };{3:1;(3,4) };”表示單詞he,在文件2.txt中出現兩次,位置是第一行第四個和第三行第二個,在3.txt中出現一次,位置是第三行第四個。

二、實現思路

       首先通過inputSplit()自帶函式獲取檔案的名稱,在每次maper函式執行時記錄一次該檔案的名稱,maper,combine,reduce ,都設定成檔案輸入輸出介面。 

      maper 函式主要功能是獲取檔名稱,給讀取的每個單詞設定個數為1, 方便reduce 處理統計相同單詞的格式。 map 每次讀取的一個檔案是一行一行的讀取的,每讀取該檔案一行,用變數a 記錄該單詞所處的行數,用b 記錄該單詞所處的列數。然後key 處理成string 格式(包含,單詞名,所處檔名,行a,列b), value 為該單詞的個數。

然後(key, value)打包成hadoop 檔案介面傳遞給combine() 函式。

         combine() 函式主要處理傳遞過來的(key, value) , 將string 形式的key 用split() 劃分,統計每個單詞的總個數,累加為sum , 然後處理key 按題目要求寫入

輸出檔案的格式, 然後將單詞用key 儲存, 單詞後面的(檔案: (行: 列))這樣的格式存為key , 然後將(key, value) 以text 型別介面傳給reduce().

       reduce 主要功能是實現單詞和 單詞所處的檔案位置,最終按  單詞   { (檔案:(行:列);(檔案:(行:列))......} 這樣的格式寫入輸出檔案中。

       main() 函式主要是實現 輸入輸出檔案的路徑, 重新整理輸出檔案的內容, mapper , reduce 的初始化等功能。

三、完整程式碼

     

package wordcount;

import java.io.IOException;
import  java.util.*;
import java.io.File;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WordCount {
	
	public static int a=1;  //need change
	
	public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();
	

    public Map<String,String> wd;
    String strl = new String();
    
    private FileSplit split;
    private Text valueInfo =new Text();
    private Text keyInfo = new Text();
    
    public String fileName = "";
    public void setup(Context context) throws IOException,InterruptedException{
    	InputSplit inputSplit = context.getInputSplit();
    	String     fileFormat = ((FileSplit) inputSplit).getPath().getName().toString();
    	fileName = fileFormat.substring(0,fileFormat.lastIndexOf("."));
    	System.out.println("fileName: "+ fileName);
    	}
      
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    		 int b=1;
    		 if(a==4)
    			 a=1;
    		split =(FileSplit) context.getInputSplit();
    		StringTokenizer stk = new StringTokenizer(value.toString());
 
    		String[] st=value.toString().split(" ");
       		System.out.println("Line sentence : "+ value.toString()+" "+st.length);
    		while(stk.hasMoreElements()) {
    			keyInfo.set(stk.nextToken()+ ":"+fileName+":"+a+":"+b);
    			if(b==st.length) a++;
    			valueInfo.set("1");
    			context.write(keyInfo, valueInfo);
    			b++;
    		}
    	//	a++;
      
    }
  }
  
  public   int cNum =1;
  private void setLocationFormat(String docID, String val){
	  String[] aa= val.split("");
	  String res = new String();
	  for (int i=0;i<val.length();i++)
		  res ="{" + docID +":"+i +"};";
	  cNum++;
  }
  
  public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text> {
	  
    Text info = new Text();
    public void reduce(Text key, Iterable<Text> values,Context context) 
            throws IOException, InterruptedException {
      int sum = 0;
   //   String str = new String();
    //  str="  ";
      for (Text val : values) {
        sum += Integer.parseInt(val.toString());
      }
    //  result.set(sum);
    //  str += result;
       String[] str = key.toString().split(":");
   //   int splitIndex = key.toString().indexOf(":");
   //   key.set(key.toString().substring(0,splitIndex));
   //   System.out.println(key.toString().substring(0,splitIndex));
       key.set(str[0]);
    //  System.out.println(key.toString().substring(splitIndex+1));
    //  info.set(key.toString().substring(splitIndex+1) + ":" +sum);
      System.out.println("str[0]: "+ str[0]+"   "+"str[1]: "+str[1]);
       info.set(str[1] + ":" + sum+"("+ str[2] + "," + str[3] + ")");
      context.write(key, info);
    }
  }
  
  public static class InvertedIndexReduce extends Reducer<Text,Text,Text,Text>{
	  
	  private Text result = new Text();
	  public void reduce(Text key,Iterable<Text> values,Context context)
	  		throws IOException, InterruptedException {
		  String fileList = new String();
		  for(Text value:values){
			  fileList += value.toString()+";";
		  }
		  result.set("{"+fileList+"}");
		  context.write(key, result);
	  }
  }
  
  public static void wordDeal(String wordOfDoc, String docID,TreeMap<String ,TreeMap<String,Integer>> tmp){
	  wordOfDoc = wordOfDoc.toLowerCase();
	  if(!tmp.containsKey(wordOfDoc)){
		  // word load first time
		  TreeMap<String,Integer>tmpST = new TreeMap<String, Integer>();
		  tmpST.put(docID, 1);
		  tmp.put(wordOfDoc, tmpST);
	  }else{
		  //if load the word first time ,and count =null , then 
		  //add (docId,1) to tmpST, if not , then count++ , rewrite it to tmpST
		  TreeMap<String,Integer>tmpST = tmp.get(wordOfDoc);
		  Integer count = tmpST.get(docID);
		  count = ((count == null)?1:count++);
		  tmpST.put(docID, count);
		  tmp.put(wordOfDoc, tmpST);
		  
	  }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    //conf.set("fs.default.name", "hdfs://master:9000");
    //conf.set("mapred.job.tracker", "hdfs://master:9001");
    conf.set("fs.default.name", "file:///");
    conf.set("mapred.job.tracker", "local");
    
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    
    FileSystem fs= FileSystem.get(conf);
    fs.delete(new Path(args[1]),true);
    
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    

    
    job.setMapperClass(TokenizerMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    
    job.setCombinerClass(InvertedIndexCombiner.class);
    
    job.setReducerClass(InvertedIndexReduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
   
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}



四、輸出結果

a{1:1(3,3);2:1(3,3);3:1(3,3);}
banana{1:1(3,4);2:1(3,4);}
hadoop{3:1(1,1);}
haooop{2:1(2,1);}
he{2:1(1,4);2:1(3,2);3:1(3,4);}
hello{3:1(1,4);3:1(3,1);}
i{2:1(1,1);}
is{2:1(1,2);2:1(1,5);2:1(2,2);3:1(1,2);3:1(1,5);3:1(2,2);3:1(3,2);1:1(1,2);1:1(1,5);1:1(2,2);1:1(3,2);}
it{1:1(1,1);1:1(1,4);1:1(2,3);1:1(3,1);2:1(2,3);2:1(3,1);3:1(2,3);}
what{3:1(1,3);3:1(2,1);2:1(1,3);1:1(1,3);1:1(2,1);}