1. 程式人生 > >使用Hadoop 實現文件倒排索引

使用Hadoop 實現文件倒排索引

        文件倒排索引主要是統計每個單詞在各個文件中出現的頻數,因此要以單詞為key,value為文件以及該單詞在此文件頻數,即輸出資料的格式形如:

< word1,[doc1,3] [doc2,4] ... >    :表示word1這個單詞在doc1文件中出現了3次,在doc2文件中出現了4次。

        整個程式的輸入是一系列檔案,比如file01.txt, file02.txt, file03.txt ....,首先要將這些檔案上傳到hadoop hdfs中作為程式的輸入。上傳過程以及Java類的編譯等可以參考這篇部落格:執行Hadoop示例程式WordCount,這裡不再詳細介紹。本程式的原始碼在文章最後面。

一、程式執行的大體思路

        由於文件倒排索引考察的是一個單詞和文件的關係,而系統預設的LineRecordReader是按照每行的偏移量作為map輸入時的key值,每行的內容作為map的value值,這裡的key值(即行偏移量對我們的意義不大),我們這裡考慮將一個文件的名字作為關鍵字,而每一行的值作為value,這樣處理起來比較方便,(即:map的輸入形式為<fileName, a line>,主要是通過一個自定義的RecordReader類來實現,下面會有介紹)。整個程式資料處理流程如下面所示:


        map類的主要作用是處理程式的輸入,這裡的輸入形式是<fileName,a line>,即輸入的關鍵字key是檔名如file01.txt,值value為一行資料,map的任務是將這一行資料進行分詞,並以圖中第一部分的形式進行輸出。

        combine類的主要作用是將map輸出的相同的key的value進行合併(相加),這樣有利於減少資料傳輸,combine是在本節點進行的。

        partition的主要作用是對combine的輸出進行分割槽,分割槽的目的是使key值相同的資料被分到同一個節點,這樣在進行reduce操作的時候僅需要本地的資料就足夠,不需要通過網路向其他節點尋找資料。上圖中的 "partitionby word1 rather than word1#doc1" 意思是將word1作為分割槽時的關鍵字,而不是word1#doc1,因為我們在之前的輸出的關鍵字的形式是word1#doc1

的不是word1這樣系統會預設按照進行word1#doc1分割槽,而我們最終想要的結果是按照word1分割槽的,所以需要我們自定義patition類。

        reduce的操作主要是將結果進行求和整理,並使結果符合我們所要的形式。

2、程式和各個類的設計說明

        這部分按照程式執行的順序依次介紹每個類的設計和作用,有些子類繼承了父類,但是並沒有重新實現父類的方法,這裡不詳細介紹這些方法。

2.1、FileNameRecordReader類

        FileNameRecordReader類繼承自RecordReader,是RecordReader類的自定義實現,主要作用是將記錄所在的檔名作為key,而不是記錄行所在檔案的偏移,獲取檔名所用的語句為:
fileName = ((FileSplit) arg0).getPath().getName();

2.2、FileNameInputFormat類

        因為我們重寫了RecordReader類,這裡要重寫FileInputFormat類來使用我們的自定義FileNameRecordReader,這個類的主要作用就是返回一個FileNameRecordReader類的例項。

2.3、InvertedIndexMapper類

        這個類繼承自Mapper,主要方法有setup和map方法,setup方法的主要作用是在執行map前初始化一個stopwords的list,主要在map處理輸入的單詞時,如果該單詞在stopwords的list中,則跳過該單詞,不進行處理。stopwords剛開始是以一個文字檔案的形式存放在hdfs中,程式在剛開始執行的時候通過Hadoop Configuration將這個文字檔案設定為CacheFile供各個節點共享,並在執行map前,初始化一個stopwords列表。
InvertedIndexMapper的主要操作是map,這個方法將讀入的一行資料進行分詞操作,並以<key: word1#doc1  value: 1>的鍵值對形式,向外寫資料,在map方法中,寫出的value都是1。InvertedIndexMapper類的類圖如下圖2所示。

2.4、SumCombiner類

        這個類主要是將前面InvertedIndexMapper類的輸出結果進行合併,如果一個單詞在一個文件中出現了多次,則將value的值設定為出現的次數和。

2.5、NewPartitioner類

        分割槽類主要是將前面的輸出進行分割槽,即選擇合適的節點,分割槽類一般使用關鍵字key進行分割槽,但是我們這裡的關鍵字為word1#doc1,我們最終是想讓word相同的記錄在同一臺節點上,故NewPartitioner的任務是利用word進行分割槽。


2.5、InvertedIndexReducer類

         InvertedIndexReducerreduce的輸入形式為:<key: word1#doc1  value: 2>  <key: word1#doc2  value: 1> <key: word2#doc1  value: 1>,如第一個圖中所示可見同一個單詞會作為多次輸入,傳遞給reduce,而最終的結果要求只輸出一次單詞,而不同的文件如doc1,doc2要作為這個單詞的value輸出,我們的reduce在實現此功能時,設定兩個變數CurrentItem和postingList,其中CurrentItem儲存每次每次讀入的key,初始值為空,postingList是一個列表,表示這個key對於的出現的文件以及在此文件中出現的次數。因為同一個key可能被讀入多次,每次在讀入key時,同上一個CurrentItem進行比較,如果跟上一個CurrentItem相同,表示讀入的是同一個key,進而將新讀入的key的文件追加到postingList中;如果根上一個CurrentItem不同,表示相同的單詞以及讀完了,這時候我們要統計上一個CurrentItem出現的總次數,以及含有此item的總的文章數,這些資訊我們之前都存放在postingList中,只要遍歷此時的postingList就能得到上述資訊,並在得到資訊之後重置CurrentItem和postingList。具體見程式碼實現。其類圖如上圖所示。

3、執行結果截圖

我編譯以及執行使用的命令如下,大家可以根據自己目錄情況適當調整

  javac -classpath ~/hadoop-1.2.1/hadoop-core-1.2.1.jar -d ./ InvertedIndexer.java 
  jar -cfv inverted.jar -C ./* .
  hadoop jar ./inverted.jar InvertedIndexer input output

  #執行結束後顯示
  hadoop fs -cat output/part-r-00000
結果截圖:

4、源程式

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.ArrayList;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndexer {
	
	public static class FileNameInputFormat extends FileInputFormat<Text, Text> {
		@Override
		public RecordReader<Text, Text> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,
				InterruptedException {
			FileNameRecordReader fnrr = new FileNameRecordReader();
			fnrr.initialize(split, context);
			return fnrr;
		}
	}

	public static class FileNameRecordReader extends RecordReader<Text, Text> {
		String fileName;
		LineRecordReader lrr = new LineRecordReader();

		@Override
		public Text getCurrentKey() throws IOException, InterruptedException {
			return new Text(fileName);
		}

		@Override
		public Text getCurrentValue() throws IOException, InterruptedException {
			return lrr.getCurrentValue();
		}

		@Override
		public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
			lrr.initialize(arg0, arg1);
			fileName = ((FileSplit) arg0).getPath().getName();
		}

		public void close() throws IOException {
			lrr.close();
		}

		public boolean nextKeyValue() throws IOException, InterruptedException {
			return lrr.nextKeyValue();
		}

		public float getProgress() throws IOException, InterruptedException {
			return lrr.getProgress();
		}
	}

	public static class InvertedIndexMapper extends Mapper<Text, Text, Text, IntWritable> {
		private Set<String> stopwords;
		private Path[] localFiles;
		private String pattern = "[^\\w]";

		public void setup(Context context) throws IOException,InterruptedException {
			stopwords = new TreeSet<String>();
			Configuration conf = context.getConfiguration();
			localFiles = DistributedCache.getLocalCacheFiles(conf);
			for (int i = 0; i < localFiles.length; i++) {
				String line;
				BufferedReader br = new BufferedReader(new FileReader(localFiles[i].toString()));
				while ((line = br.readLine()) != null) {
					StringTokenizer itr = new StringTokenizer(line);
					while (itr.hasMoreTokens()) {
						stopwords.add(itr.nextToken());
					}
				}
				br.close();
			}
		}

		protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
			String temp = new String();
			String line = value.toString().toLowerCase();
			line = line.replaceAll(pattern, " ");
			StringTokenizer itr = new StringTokenizer(line);
			for (; itr.hasMoreTokens();) {
				temp = itr.nextToken();
				if (!stopwords.contains(temp)) {
					Text word = new Text();
					word.set(temp + "#" + key);
					context.write(word, new IntWritable(1));
				}
			}
		}
	}

	public static class SumCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static class NewPartitioner extends HashPartitioner<Text, IntWritable> {
		public int getPartition(Text key, IntWritable value, int numReduceTasks) {
			String term = new String();
			term = key.toString().split("#")[0]; // <term#docid>=>term
			return super.getPartition(new Text(term), value, numReduceTasks);
		}
	}

	public static class InvertedIndexReducer extends Reducer<Text, IntWritable, Text, Text> {
		private Text word1 = new Text();
		private Text word2 = new Text();
		String temp = new String();
		static Text CurrentItem = new Text(" ");
		static List<String> postingList = new ArrayList<String>();

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			String keyWord = key.toString().split("#")[0];
			int needBlank = 15-keyWord.length();
			for(int i=0;i<needBlank;i++){
				keyWord += " ";
			}
			word1.set(keyWord);
			
			temp = key.toString().split("#")[1];	//key的形式為word1#doc1,所以temp為doc1
			for (IntWritable val : values) {	//得到某個單詞在一個檔案中的總數
				sum += val.get();
			}
			word2.set("[" + temp + "," + sum + "]"); //word2的格式為:[doc1,3]
			if (!CurrentItem.equals(word1) && !CurrentItem.equals(" ")) {
				StringBuilder out = new StringBuilder();
				long count = 0;
				double fileCount = 0;
				for (String p : postingList) {
					out.append(p);
					out.append(" ");
					count = count + Long.parseLong(p.substring(p.indexOf(",") + 1,p.indexOf("]")));
					fileCount++;
				}
				out.append("[total," + count + "] ");
				double average = count/fileCount;
				out.append("[average,"+String.format("%.3f", average)+"].");
				
				if (count > 0)
					context.write(CurrentItem, new Text(out.toString()));
				postingList = new ArrayList<String>();
			}
			CurrentItem = new Text(word1);
			postingList.add(word2.toString());
		}

		public void cleanup(Context context) throws IOException,InterruptedException {
			StringBuilder out = new StringBuilder();
			long count = 0;
			for (String p : postingList) {
				out.append(p);
				out.append(" ");
				count = count + Long.parseLong(p.substring(p.indexOf(",") + 1,p.indexOf("]")));
			}
			out.append("[total," + count + "].");
			if (count > 0)
				context.write(CurrentItem, new Text(out.toString()));
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		DistributedCache.addCacheFile(new URI("hdfs://namenode:9000/user/hadoop/stop_word/stop_word.txt"),conf);
		Job job = new Job(conf, "inverted index");
		job.setJarByClass(InvertedIndexer.class);
		
		job.setInputFormatClass(FileNameInputFormat.class);
		
		job.setMapperClass(InvertedIndexMapper.class);
		job.setCombinerClass(SumCombiner.class);
		job.setReducerClass(InvertedIndexReducer.class);
		job.setPartitionerClass(NewPartitioner.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

4、參考文獻

《深入理解大資料 大資料處理與程式設計實戰》主編:黃宜華老師(南京大學)