1. 程式人生 > >mapreduce系列(6)---倒排索引的建立

mapreduce系列(6)---倒排索引的建立

一、概述

如我們有三個檔案:
a.txt,b.txt,c.txt

tian jun
li lei
han meimei
li lei
han meimei
li lei
han meimei
tian jun
gege
jiejie
tian jun
gege
jiejie
gege
jiejie
han meimei
tian jun
han meimei
tian jun

統計出沒個詞在每篇文章中出現的次數,這就是倒排索引了,效果如下:

gege    b.txt-->2,c.txt-->1
han     a.txt-->2,b.txt-->1
,c.txt-->2 jiejie b.txt-->2,c.txt-->1 jun c.txt-->2,b.txt-->2,a.txt-->1 lei b.txt-->1,a.txt-->2 li a.txt-->2,b.txt-->1 meimei a.txt-->2,b.txt-->1,c.txt-->2 tian b.txt-->2,c.txt-->2,a.txt-->1

思路分析:
在mr程式中是通過相同的key來進行歸併的,抓住這點,我們可以想到,把某個詞加上它所屬的檔名合起來組成一個key,這不就是離我們需要的結果很近了,但是可以看到,一個mr很難實現,所以在這個基礎上,我們只需把key和value對換,換下前一個key的顯示格式,通過兩個mr就可以實現我們的需求。

二、程式碼實現

inverIndexStepOne.java

package inverIndex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache
.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * Created by tianjun on 2017/3/20. */ public class inverIndexStepOne { static class InverIndexStepOneMapper extends Mapper<LongWritable,Text,Text,IntWritable> { Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); FileSplit inputSplit = (FileSplit) context.getInputSplit(); String filename = inputSplit.getPath().getName(); for(String word : words){ k.set(word+"--"+filename); context.write(k,v); } } } static class InverIndexStepOneReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable value : values){ count += value.get(); } context.write(key,new IntWritable(count)); } } public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { String os = System.getProperty("os.name").toLowerCase(); if (os.contains("windows")) { System.setProperty("HADOOP_USER_NAME", "root"); } Configuration conf = new Configuration(); conf.set("mapreduce.framework.name","yarn"); conf.set("yarn.resourcemanager.hostname","mini01"); conf.set("fs.defaultFS","hdfs://mini01:9000/"); // 預設就是local模式 // conf.set("mapreduce.framework.name","local"); // conf.set("mapreduce.jobtracker.address","local"); // conf.set("fs.defaultFS","file:///"); Job wcjob = Job.getInstance(conf); wcjob.setJar("F:/myWorkPlace/java/dubbo/demo/dubbo-demo/mr-demo1/target/mr.demo-1.0-SNAPSHOT.jar"); //如果從本地拷貝,是不行的,這時需要使用setJar // wcjob.setJarByClass(Rjoin.class); wcjob.setMapperClass(InverIndexStepOneMapper.class); wcjob.setReducerClass(InverIndexStepOneReducer.class); //設定我們的業務邏輯Mapper類的輸出key和value的資料型別 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(IntWritable.class); //設定我們的業務邏輯Reducer類的輸出key和value的資料型別 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); //如果不設定InputFormat,預設就是使用TextInputFormat.class // wcjob.setInputFormatClass(CombineFileInputFormat.class); // CombineFileInputFormat.setMaxInputSplitSize(wcjob,4194304); // CombineFileInputFormat.setMinInputSplitSize(wcjob,2097152); FileSystem fs = FileSystem.get(new URI("hdfs://mini01:9000"), new Configuration(), "root"); Path path = new Path("hdfs://mini01:9000/wc/index/stepone"); if (fs.exists(path)) { fs.delete(path, true); } //指定要處理的資料所在的位置 FileInputFormat.setInputPaths(wcjob, new Path("hdfs://mini01:9000/input/index")); //指定處理完成之後的結果所儲存的位置 FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://mini01:9000/wc/index/stepone")); boolean res = wcjob.waitForCompletion(true); System.exit(res ? 0 : 1); } }

inverIndexStepTwo.java

package inverIndex;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * Created by tianjun on 2017/3/20.
 */
public class inverIndexStepTwo {

    static class inverIndexStepTwoMapper extends Mapper<LongWritable,Text,Text,Text> {

        Text k = new Text();
        IntWritable v = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] word_file = line.split("--");
            String temp = word_file[1].replace("\t","-->");
            context.write(new Text(word_file[0]),new Text(temp));
        }
    }

    static class inverIndexStepTwoReducer extends Reducer<Text,Text,Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for(Text value : values){
                if(sb.length()!=0){
                    sb.append(",");
                }
                sb.append(value.toString());

            }
            context.write(key,new Text(sb.toString()));
        }
    }

    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {

        String os = System.getProperty("os.name").toLowerCase();
        if (os.contains("windows")) {
            System.setProperty("HADOOP_USER_NAME", "root");
        }

        Configuration conf = new Configuration();

        conf.set("mapreduce.framework.name","yarn");
        conf.set("yarn.resourcemanager.hostname","mini01");
        conf.set("fs.defaultFS","hdfs://mini01:9000/");

//            預設就是local模式
//        conf.set("mapreduce.framework.name","local");
//        conf.set("mapreduce.jobtracker.address","local");
//        conf.set("fs.defaultFS","file:///");


        Job wcjob = Job.getInstance(conf);

        wcjob.setJar("F:/myWorkPlace/java/dubbo/demo/dubbo-demo/mr-demo1/target/mr.demo-1.0-SNAPSHOT.jar");

        //如果從本地拷貝,是不行的,這時需要使用setJar
//        wcjob.setJarByClass(Rjoin.class);

        wcjob.setMapperClass(inverIndexStepTwoMapper.class);
        wcjob.setReducerClass(inverIndexStepTwoReducer.class);

        //設定我們的業務邏輯Mapper類的輸出key和value的資料型別
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setMapOutputValueClass(Text.class);


        //設定我們的業務邏輯Reducer類的輸出key和value的資料型別
        wcjob.setOutputKeyClass(Text.class);
        wcjob.setOutputValueClass(Text.class);


        //如果不設定InputFormat,預設就是使用TextInputFormat.class
//        wcjob.setInputFormatClass(CombineFileInputFormat.class);
//        CombineFileInputFormat.setMaxInputSplitSize(wcjob,4194304);
//        CombineFileInputFormat.setMinInputSplitSize(wcjob,2097152);


        FileSystem fs = FileSystem.get(new URI("hdfs://mini01:9000"), new Configuration(), "root");
        Path path = new Path("hdfs://mini01:9000/wc/index/steptwo");
        if (fs.exists(path)) {
            fs.delete(path, true);
        }

        //指定要處理的資料所在的位置
//        FileInputFormat.setInputPaths(wcjob, new Path("hdfs://mini01:9000/input/index"));
        FileInputFormat.setInputPaths(wcjob, new Path("hdfs://mini01:9000/wc/index/stepone"));
        //指定處理完成之後的結果所儲存的位置
//        FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://mini01:9000/wc/index/stepone"));
        FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://mini01:9000/wc/index/steptwo"));

        boolean res = wcjob.waitForCompletion(true);
        System.exit(res ? 0 : 1);

    }
}

這樣就可以計算出上述的需求