1. 程式人生 > >利用MapReduce來實現文件全域性搜尋引擎

利用MapReduce來實現文件全域性搜尋引擎

                        利用MapReduce來實現全域性搜尋引擎

根據內容來檢視文件,可以統計每個單詞在一些文件中出現了幾次,來實現全文檢索的這樣的一個功能

預備檔案:

hadoop中分三步走:

1.mapper對文件初步處理, 獲得每個單詞以及單詞的路徑,設定每個單詞出現的次數都初步設定為1;

    輸出格式 : 單詞||文件uri   1;

2.combiner對於每個文件同樣的單詞初步的合計統計次數並輸出到reducer

     合併每個檔案單詞出現的次數,也就是詞頻 

     輸出格式: 單詞  uri------詞頻

 3.reducer經過shuffer處理形成最終的檔案

    輸出格式;     單詞    uri------詞頻;uri-------詞頻;

程式碼展示:

package demo01.hadoop;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class InvertedIndex extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {

		if (args.length != 2) {

			System.out.println("args error!");
			return -1;
		}
		Path src = new Path(args[0]);
		Path desc = new Path(args[1]);

		Configuration conf = getConf();
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(desc)) {
			fs.delete(desc, true);
		}

		Job job = Job.getInstance(conf,"倒排索引");
		job.setJarByClass(getClass());

		job.setMapperClass(MyMapper.class);	
		job.setMapOutputKeyClass(Text.class);            //這是reducer的東西
		job.setMapOutputValueClass(Text.class);
		
		
		job.setCombinerClass(MyCombiner.class);
		
		//job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class); 
		
		FileInputFormat.addInputPath(job, src);
		FileOutputFormat.setOutputPath(job, desc);

		return job.waitForCompletion(true) ? 0 : 1;

	}

	public static void main(String[] args) throws Exception {

		int code = ToolRunner.run(new InvertedIndex(), args);
		System.exit(code);

	}

	/**
	 * 
	 * @author hp 輸出格式 : key單詞:文件uri value每個單詞設定出現次數為1;
	 *
	 */

	public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {

		String uri;
		Text key2 = new Text();
		Text value2 = new Text();

		@Override
		public void setup(Mapper<LongWritable, Text, Text,Text>.Context context)
				throws IOException, InterruptedException {

			FileSplit split = (FileSplit) context.getInputSplit();
			this.uri = split.getPath().toString();

		}

		@Override
		public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text,Text>.Context context)
				throws IOException, InterruptedException {

			String[] strs = value.toString().split("\\s+");
			for (String str : strs) {

				key2.set(str + "||" + uri);
				value2.set("1");
				
				context.write(key2, value2);
			}

		}
	}

	/**
	 * 合併每個檔案單詞出現的次數,也就是詞頻 輸出格式: key單詞 value uri+每個文件中的詞頻
	 */

	public static class MyCombiner extends Reducer<Text, Text, Text, Text> {

		Text key4 = new Text();
		Text value4 = new Text();

		
		@Override
		public void reduce(Text key3, Iterable<Text> value3,
				Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {

			
			int sum = 0;
			for (Text v3 : value3) {

				sum += Integer.parseInt(v3.toString());

			}
			

			String word = key3.toString().substring(0, key3.toString().indexOf("||"));
			key4.set(word);
			int pos = key3.toString().length();
			String uri = key3.toString().substring(key3.toString().indexOf("||") + 2, pos);
            
			value4.set(uri + "-------" +sum);
			context.write(key4, value4);

		}

	}
	
	public static class MyReducer extends Reducer<Text, Text, Text,Text>{
		
		Text key6 = new Text();
		Text value6 = new Text();
		
		@Override
		public void reduce(Text key5, Iterable<Text> value5, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			
			StringBuffer sb = new StringBuffer();
			for (Text v5 : value5) {
				
				sb.append(v5 + ";");
			}
			
			key6.set(key5.toString());
			value6.set(sb.toString());	
			context.write(key6, value6);
		}
		
	}

}

歡迎提出見解跟指導