1. 程式人生 > >MapReduce 案例之倒排索引

MapReduce 案例之倒排索引

MapReduce 案例之倒排索引
1. 倒排索引
倒排索引是文件檢索系統中最常用的資料結構,被廣泛地應用於全文搜尋引擎。 它主要是用來儲存某個單詞(或片語) 在一個文件或一組文件中的儲存位置的對映,即提供了一種根據內容來查詢文件的方式。由於不是根據文件來確定文件所包含的內容,而是進行相反的操作,因而稱為倒排索引( Inverted Index)。

2. 例項描述
通常情況下,倒排索引由一個單詞(或片語)以及相關的文件列表組成,文件列表中的文件或者是標識文件的 ID 號,或者是指文件所在位置的 URL。如下圖所示:

è¿éåå¾çæè¿°

從上圖可以看出,單詞 1 出現在{文件 1,文件 5,文件 13, ……}中,單詞 2 出現在{文件 2,文件 3,文件 5, ……}中,而單詞 3 出現在{文件 2,文件 10,文件 16, ……}中。在實際應用中,還需要給每個文件新增一個權值,用來指出每個文件與搜尋內容的相關度,如下圖所示:

è¿éåå¾çæè¿°

最常用的是使用詞頻作為權重,即記錄單詞在文件中出現的次數。以英文為例,如下圖所示,索引檔案中的“ MapReduce”一行表示:“ MapReduce”這個單詞在文字 T0 中 出現過 1 次,T1 中出現過 1 次,T2 中出現過 2 次。當搜尋條件為“ MapReduce”、“ is”、“ Simple” 時,對應的集合為: {T0, T1, T2}∩{T0, T1}∩{T0, T1}={T0, T1},即文件 T0 和 T1 包 含了所要索引的單詞,而且只有 T0 是連續的。
è¿éåå¾çæè¿°

3. 設計思路

3.1 Map過程

首先使用預設的 TextInputFormat 類對輸入檔案進行處理,得到文字中每行的偏移量及其內容。顯然, Map 過程首先必須分析輸入的key/value對,得到倒排索引中需要的三個資訊:單詞、文件 URL 和詞頻,如下圖所示。

è¿éåå¾çæè¿°

這裡存在兩個問題:第一, key/value對只能有兩個值,在不使用Hadoop 自定義資料型別的情況下,需要根據情況將其中兩個值合併成一個值,作為 key 或 value 值; 
第二,通過一個 Reduce 過程無法同時完成詞頻統計和生成文件列表,所以必須增加一個 Combine 過程完成詞頻統計。 
這裡將單詞和 URL 組成 key 值(如“ MapReduce: file1.txt”),將詞頻作為value,這樣做的好處是可以利用 MapReduce 框架自帶的Map 端排序,將同一文件的相同單詞的詞頻組成列表,傳遞給 Combine 過程,實現類似於 WordCount 的功能。

3.2 Combine 過程


經過 map 方法處理後, Combine 過程將 key 值相同 value 值累加,得到一個單詞在文件中的詞頻。 如果直接將圖所示的輸出作為 Reduce 過程的輸入,在 Shuffle 過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、 URL 和詞頻組成)應該交由同一個Reducer 處理,但當前的 key 值無法保證這一點,所以必須修改 key 值和 value 值。這次將單詞作為 key 值, URL 和詞頻組成 value 值(如“ file1.txt: 1”)。這樣做的好處是可以利用 MapReduce 框架預設的 HashPartitioner 類完成 Shuffle 過程,將相同單詞的所有記錄傳送給同一個 Reducer 進行處理。

è¿éåå¾çæè¿°

3.3 Reduce 過程

經過上述兩個過程後, Reduce 過程只需將相同 key 值的 value 值組合成倒排索引檔案所需的格式即可,剩下的事情就可以直接交給 MapReduce 框架進行處理了。

è¿éåå¾çæè¿°

3.4 程式程式碼

  • Map程式

package cn.demo.hadoop.mr.invertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>{

    private static Text keyInfo = new Text();// 儲存單詞和 URL 組合  
    private static final Text valueInfo = new Text("1");// 儲存詞頻,初始化為1  

    @Override  
    protected void map(LongWritable key, Text value, Context context)  
            throws IOException, InterruptedException {  

        String line = value.toString();  
        String[] fields = line.split(" ");// 得到欄位陣列  

        FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到這行資料所在的檔案切片  
        String fileName = fileSplit.getPath().getName();// 根據檔案切片得到檔名  

        for (String field : fields) {  
            // key值由單詞和URL組成,如“MapReduce:file1”  
            keyInfo.set(field + ":" + fileName);  
            context.write(keyInfo, valueInfo);  
        }  
    }  


}

  • combine程式
package cn.demo.hadoop.mr.invertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{

    private static Text info = new Text();  

    // 輸入: <MapReduce:file3 {1,1,...}>  
    // 輸出:<MapReduce file3:2>  
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException {  
        int sum = 0;// 統計詞頻  
        for (Text value : values) {  
            sum += Integer.parseInt(value.toString());  
        }  

        int splitIndex = key.toString().indexOf(":");  
        // 重新設定 value 值由 URL 和詞頻組成  
        info.set(key.toString().substring(splitIndex + 1) + ":" + sum);  
        // 重新設定 key 值為單詞  
        key.set(key.toString().substring(0, splitIndex));  

        context.write(key, info);  
    }  


}
  • reduce程式
package cn.demo.hadoop.mr.invertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
    private static Text result = new Text();  

    // 輸入:<MapReduce file3:2>  
    // 輸出:<MapReduce file1:1;file2:1;file3:2;>  
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException {  
        // 生成文件列表  
        String fileList = new String();  
        for (Text value : values) {  
            fileList += value.toString() + ";";  
        }  

        result.set(fileList);  
        context.write(key, result);  
    }  


}
  • 主程式

package cn.demo.hadoop.mr.invertedIndex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndexRunner {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(InvertedIndexRunner.class);

        job.setMapperClass(InvertedIndexMapper.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\ziliao\\data\\InvertedIndex\\input"));
        // 指定處理完成之後的結果所儲存的位置
        FileOutputFormat.setOutputPath(job, new Path("D:\\ziliao\\data\\InvertedIndex\\output"));

        // 向 yarn 叢集提交這個 job
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);
    }

}