1. 程式人生 > >mapreduce演算法之倒排索引

mapreduce演算法之倒排索引

package mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class indexSearch {


    public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{


        private Text keyInfo = new Text();  // 儲存單詞和URI的組合
        private Text valueInfo = new Text(); //儲存詞頻
        private FileSplit split;  // 儲存split物件。
        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            //獲得<key,value>對所屬的FileSplit物件。
            split = (FileSplit) context.getInputSplit();
            System.out.println("偏移量"+key);
            System.out.println("值"+value);
            //StringTokenizer是用來把字串擷取成一個個標記或單詞的,預設是空格或多個空格(\t\n\r等等)擷取
            StringTokenizer itr = new StringTokenizer( value.toString());
            while( itr.hasMoreTokens() ){
                // key值由單詞和URI組成。
                keyInfo.set( itr.nextToken()+":"+split.getPath().getName().toString());
                //詞頻初始為1
                valueInfo.set("1");
                context.write(keyInfo, valueInfo);
            }
            System.out.println("key"+keyInfo);
            System.out.println("value"+valueInfo);
        }
    }


    public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
        private Text info = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {


            //統計詞頻

            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString() );
            }


            int splitIndex = key.toString().indexOf(":");


            //重新設定value值由URI和詞頻組成
            info.set( key.toString().substring( splitIndex + 1) +":"+sum );


            //重新設定key值為單詞
            key.set( key.toString().substring(0,splitIndex));


            context.write(key, info);
            System.out.println("key"+key);
            System.out.println("value"+info);
        }
    }


    public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{


        private Text result = new Text();


        @Override
        protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {


            //生成文件列表
            String fileList = new String();
            for (Text value : values) {
                fileList += value.toString()+";";
            }
            result.set(fileList);


            context.write(key, result);
        }


    }


    public static void main(String[] args) {
        try {
            Configuration conf = new Configuration();


            Job job = Job.getInstance(conf,"InvertedIndex");
            job.setJarByClass(indexSearch.class);


            //實現map函式,根據輸入的<key,value>對生成中間結果。
            job.setMapperClass(InvertedIndexMapper.class);


            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);


            job.setCombinerClass(InvertedIndexCombiner.class);
            job.setReducerClass(InvertedIndexReducer.class);


            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);


            //我把那兩個檔案上傳到這個index目錄下了
            FileInputFormat.addInputPath(job, new Path("hdfs://192.168.120.128:9000/input/"));
            //把結果輸出到out_index+時間戳的目錄下
            FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.120.128:9000/out_index"+System.currentTimeMillis()+"/"));


            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }
}