MapReduce框架學習(4)——倒排索引程式實戰
阿新 • • 發佈:2018-11-20
- 參考: JeffreyZhou的部落格園
- 《Hadoop權威指南》第四版
0 倒排索引(Inverted Index)
前面我們執行過WordCount例子,得到的單詞計數結果,如果輸入3篇文件,得到的結果是這3個文件所有的單詞總數計數,得到如下這樣
但是,如果我想知道“hello”這個單詞在各個文件中的計數情況呢?也就是最後得到的結果是:
理解一下,上面的結果,是根據文件來查單詞的頻率,下面是根據單詞來查在文件中出現的頻率,所以稱為倒排索引(Inverted Index)。
那麼,這個結果又是咋形成的呢?
4.1 輸入輸出過程
- 首先是map過程,輸入的是文字,一條條的行記錄,輸出呢?應該包含:單詞,所在文件編號,單詞數。那麼第一個問題來了,map的輸入是Key-value,這有三個引數,誰是key是,誰是value呢?不夠分啊。分析一下,數量是需要累計的,所以單詞數肯定在value裡,單詞在key裡,文件編號呢?這個引數不能進行累加等操作,不同檔案內的相同單詞也不能累加,所以它應該放在key中。所以這就是一個複合鍵,value則是預設的數量1。map後的輸出應該是這樣:
key | value |
---|---|
Hello;T1 | 1 |
world;T1 | 1 |
Hello;T1 | 1 |
Bye;T3 | 1 |
… | … |
- combine過程,此時的combine的輸入就應該是剛才map定義的複合鍵型別了,在此時將上述的key-value格式進行一輪合併,這個輸出應該不改變資料型別,照樣傳到下一環節,這一輪的輸出應該是:
key | value |
---|---|
Hello;T1 | 2 |
world;T1 | 1 |
Bye;T3 | 3 |
… | … |
注:
此處與參考教程中有點不同,上面的按照combine的原理進行推理的,但按照原始碼,其輸出應該是:
key | value |
---|---|
Hello;T1 | T1:2 |
world;T1 | T1:1 |
Bye;T3 | T3:3 |
… | … |
- reduce過程,此時只需要按照相同的key(此處為複合鍵中的單詞),將不同map的value結果進行合併處理,就可以得到最終結果:
key | value |
---|---|
Hello | T1:2;T2:1 |
world | T1:1;T2:2 |
Bye | T2:1;T3:3 |
… | … |
那麼各個環節的資料格式變換也看到了,接下來就用程式碼來實現各個環節吧。
有一點需要說明:以下程式中有些程式碼已經`deprecated`,現在java語法已經有更好的實現方法,但本例中還是照抄過來,學習其思路和框架後,再進行修改。
在學習中,不用糾結於具體的語法,而且其邏輯思路。
4.2 map類
前面說到了,這個key是複合的,所以常用的幾種基本型別已經滿足不了我了,先來設定一個複合鍵MyType.class
。
public static class Mytype implements WritableComparable<MyType> {
public MyType() {}
// 單詞
private String word;
public void setWord(String word) {this.word = word;}
public String getWord() {return word;}
// 文件編號
private String filePath;
public void setFile(String filePath) {this.filePath = filePath;}
public String getFile() {return filePath;}
// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(word);
out.writeUTF(filePath);
}
//
@Override
public void readFile(DataInput in) throws IOException {
word = in.readUTF();
filePath = in.readUTF();
}
// 比較器
@Override
public int compareTo(MYtype arg0) {
if (word != aeg0.word) {
return word.compareTo(arg0.word);
return filePath.compareTo(arg0.filePath);
}
}
然後,再來寫map函式:
public static class InvertedIndexMapper extends Mapper<Object, Text, MyType, Text> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit)context.getInputSplit();
StringTokenizer itr = new StringTonizer(value.toString());
while(itr.hasMoreTokens()) {
MyType key = new MyType();
key.setWord(itr.nextToken());
key.setFile(split.getPath().toUri().getPath().replace("/user/hadoop/input/",""));
context.write(key,new Text("1"));
}
}
}
4.3 Combine類
public static class InvertedIndexCombiner extends Reducer<MyType,Text,MyType,Text> {
public void reduce(MyType key, Text values, Context context)
throws IOException,InterruptException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
context.write(key,new Text(key.getFile()+":"+sum));
}
}
4.4 Reduce類
public static class InvertedIndexReducer extends Reducer<MyType, Text, Text, Text> {
public void reduce(MyType key, Iterable<Text> values, Context context)
throws IOException,InterruptionException {
Text result = new Text();
String fileList = new String();
for (Text value : values) {
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(new Text(key.getWord()),result);
}
}
4.5 Job配置
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
// System.out.println("url:"+conf.get("fs.defaultFS"))
job = Job.getInstance(conf,"MyInvertedIndex");
job.setJarByClass(MyInvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(MyType.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 檢測輸出目錄output是否已存在,若存在,則刪除
// Path path = new Path("output");
// FileSystem hdfs = new FileSystem.get(conf);
// if (hdfs.exists(path))
// hdfs.delete(path,true);
FileInputFormat.addInputPath(job,new Path("input"));
FileOutputFormat.addOutputPath(job,new Path("output"));
job.waitForCompletion(true);
}
4.x 後記
- 為什麼自定義的Combine類中,reduce方法傳入的引數是(Iterable
values),上一環節map的輸出明明是 new Text("1")
。。。再接著看Reduce環節的reduce方法,發現裡面也是Iterable<Text> values
,想明白了,可能這中間還有一個操作,將上一環節傳來的序列化Text(value)變為可迭代資料。