1. 程式人生 > >MapReduce框架學習(4)——倒排索引程式實戰

MapReduce框架學習(4)——倒排索引程式實戰

參考: JeffreyZhou的部落格園
《Hadoop權威指南》第四版

0 倒排索引(Inverted Index)

前面我們執行過WordCount例子,得到的單詞計數結果,如果輸入3篇文件,得到的結果是這3個文件所有的單詞總數計數,得到如下這樣

WordCount

但是,如果我想知道“hello”這個單詞在各個文件中的計數情況呢?也就是最後得到的結果是:
倒排索引
理解一下,上面的結果,是根據文件來查單詞的頻率,下面是根據單詞來查在文件中出現的頻率,所以稱為倒排索引(Inverted Index)。
  那麼,這個結果又是咋形成的呢?

4.1 輸入輸出過程

  1. 首先是map過程,輸入的是文字,一條條的行記錄,輸出呢?應該包含:單詞,所在文件編號,單詞數。那麼第一個問題來了,map的輸入是Key-value,這有三個引數,誰是key是,誰是value呢?不夠分啊。分析一下,數量是需要累計的,所以單詞數肯定在value裡,單詞在key裡,文件編號呢?這個引數不能進行累加等操作,不同檔案內的相同單詞也不能累加,所以它應該放在key中。所以這就是一個複合鍵,value則是預設的數量1。map後的輸出應該是這樣:
key value
Hello;T1 1
world;T1 1
Hello;T1 1
Bye;T3 1
  1. combine過程,此時的combine的輸入就應該是剛才map定義的複合鍵型別了,在此時將上述的key-value格式進行一輪合併,這個輸出應該不改變資料型別,照樣傳到下一環節,這一輪的輸出應該是:
key value
Hello;T1 2
world;T1 1
Bye;T3 3

注:
此處與參考教程中有點不同,上面的按照combine的原理進行推理的,但按照原始碼,其輸出應該是:

key value
Hello;T1 T1:2
world;T1 T1:1
Bye;T3 T3:3
  1. reduce過程,此時只需要按照相同的key(此處為複合鍵中的單詞),將不同map的value結果進行合併處理,就可以得到最終結果:
key value
Hello T1:2;T2:1
world T1:1;T2:2
Bye T2:1;T3:3

那麼各個環節的資料格式變換也看到了,接下來就用程式碼來實現各個環節吧。

有一點需要說明:以下程式中有些程式碼已經`deprecated`,現在java語法已經有更好的實現方法,但本例中還是照抄過來,學習其思路和框架後,再進行修改。
在學習中,不用糾結於具體的語法,而且其邏輯思路。

4.2 map類

前面說到了,這個key是複合的,所以常用的幾種基本型別已經滿足不了我了,先來設定一個複合鍵MyType.class

public static class Mytype implements WritableComparable<MyType> {
	public MyType() {}
     
 // 單詞
     private String word;
     public void setWord(String word) {this.word = word;}
     public String getWord() {return word;}
 // 文件編號
	 private String filePath;
     public void setFile(String filePath) {this.filePath = filePath;}
     public String getFile() {return filePath;}

 // 序列化
      @Override
      public void write(DataOutput out)  throws IOException {
      	out.writeUTF(word);
      	out.writeUTF(filePath);
      }
 
     
 // 
      @Override
	  public void readFile(DataInput in) throws IOException {
	  	word = in.readUTF();
	  	filePath = in.readUTF();
	  }
     
 // 比較器
      @Override
      public int compareTo(MYtype arg0) {
      	if (word != aeg0.word) {
      		return word.compareTo(arg0.word);
      	return filePath.compareTo(arg0.filePath);
      	}
	  }

然後,再來寫map函式:

public static class InvertedIndexMapper extends Mapper<Object, Text, MyType, Text> {
	public void map(Object key, Text value, Context context) 
	throws IOException, InterruptedException {
		FileSplit split = (FileSplit)context.getInputSplit();
		StringTokenizer itr = new StringTonizer(value.toString());
		while(itr.hasMoreTokens()) {
			MyType key = new MyType();
			key.setWord(itr.nextToken());
			key.setFile(split.getPath().toUri().getPath().replace("/user/hadoop/input/",""));
			context.write(key,new Text("1"));
		}
	}
}

4.3 Combine類

public static class InvertedIndexCombiner extends Reducer<MyType,Text,MyType,Text> {
	public void reduce(MyType key, Text values, Context context) 
	throws IOException,InterruptException {
		int sum = 0;
		for (Text value : values) {
			sum += Integer.parseInt(value.toString());
		}
		context.write(key,new Text(key.getFile()+":"+sum));
	}
}

4.4 Reduce類

public static class InvertedIndexReducer extends Reducer<MyType, Text, Text, Text> {
	public void reduce(MyType key, Iterable<Text> values, Context context) 
	throws IOException,InterruptionException {
		Text result = new Text();
		String fileList = new String();
		for (Text value : values) {
			fileList += value.toString() + ";";
		}
		result.set(fileList);
		context.write(new Text(key.getWord()),result);
	}
}

4.5 Job配置

public static void main(String[] args) throws IOException {
	Configuration conf = new Configuration();
	// System.out.println("url:"+conf.get("fs.defaultFS"))
	job = Job.getInstance(conf,"MyInvertedIndex");
	
	job.setJarByClass(MyInvertedIndex.class);
	job.setMapperClass(InvertedIndexMapper.class);
	job.setMapOutputKeyClass(MyType.class);
	job.setMapOutputValueClass(Text.class);
		
	job.setCombinerClass(InvertedIndexCombiner.class);
	job.setReducerClass(InvertedIndexReducer.class);

	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

// 檢測輸出目錄output是否已存在,若存在,則刪除
	// Path path = new Path("output");
	// FileSystem hdfs = new FileSystem.get(conf);
	// if (hdfs.exists(path))
		// hdfs.delete(path,true);

	FileInputFormat.addInputPath(job,new Path("input"));
	FileOutputFormat.addOutputPath(job,new Path("output"));

	job.waitForCompletion(true);
}

4.x 後記

  1. 為什麼自定義的Combine類中,reduce方法傳入的引數是(Iterable values),上一環節map的輸出明明是 new Text("1")。。。再接著看Reduce環節的reduce方法,發現裡面也是 Iterable<Text> values,想明白了,可能這中間還有一個操作,將上一環節傳來的序列化Text(value)變為可迭代資料。