1. 程式人生 > >檔案倒排索引演算法及其hadoop實現

檔案倒排索引演算法及其hadoop實現

什麼是檔案的倒排索引?

簡單講就是一種搜尋引擎的演算法。過倒排索引,可以根據單詞快速獲取包含這個單詞的文件列表。倒排索引主要由兩個部分組成:“單詞”和對應出現的“倒排檔案”。

  1. MapReduce的設計思路

整個過程包含mapcombinerreduce三個階段,它們各自對應的keyvalue型別如下表所示:

InputKey

InputValue

OutputKey

OutputValue

Map

Object

Text

Text

Text

Combiner

Text

Text

Text

Text

Reduce

Text

Text

Text

Text

使用預設的TextInputFormat讀入檔案,三個部分的具體操作如下:

Map:將每一行的內容分詞,輸出key為“單詞:文章”,輸出value為“出現次數”,這裡是Text型別的“1

Combiner:針對每一個輸入key,將value值轉為int數值累加,並將key中的文章放入value,輸出key為“單詞”,輸出value為“文章:出現次數;……”;

Reduce:針對每一個輸入key,以冒號分割,將value值中的出現次數取出來累加,並記錄文章數量,計算出出平均出現次數,輸出key為“單詞平均出現次數”,輸出value為“文章:出現次數;……

2. MapReduce的程式碼片段

Map程式碼如下:
public static class Map extends Mapper<Object,Text,Text,Text>
{
   private TextvalueInfo = new Text();
   private TextkeyInfo = new Text();
   privateFileSplit split;
   public void map(Object key, Text value,Context context) throws IOException,InterruptedException
    {
       split =(FileSplit) context.getInputSplit();
       StringTokenizerstk = new StringTokenizer(value.toString());//單詞分割
       while(stk.hasMoreElements()) //還有單詞
       {
           Stringname = split.getPath().getName();//獲取檔名
           intsplitIndex = name.indexOf(".");//獲取檔名中點的位置
           keyInfo.set(stk.nextToken()+ ":" + name.substring(0, splitIndex));//單詞:去後綴檔名
           valueInfo.set("1");//outputValue置為1
           context.write(keyInfo,valueInfo);//寫入context
        }
    }
}
Combiner程式碼如下:
public static class Combiner extends Reducer<Text,Text,Text,Text>
{
   Text info =new Text();
    public void reduce(Text key,Iterable<Text> values,Context context) 
throwsIOException, InterruptedException
    {
       int sum = 0;
       for (Textvalue : values)
       {
           sum +=Integer.parseInt(value.toString());//累加同單詞在同文章中出現次數
       }
       intsplitIndex = key.toString().indexOf(":");//獲取key中的冒號位置
       info.set(key.toString().substring(splitIndex+1)+ ":" + sum);//設定value為文章:次數
       key.set(key.toString().substring(0,splitIndex));//設定key為單詞
       context.write(key,info);//寫入context
    }
}

Reduce程式碼如下:
public static class Reduce extends Reducer<Text,Text,Text,Text>
{
   private Textresult = new Text();
   public void reduce(Text key, Iterable<Text> values,Context contex) 
throwsIOException, InterruptedException
    {
       StringfileList = new String();
       double sum =0 , cnt = 0;
       for (Textvalue : values)
       {
           cnt++;//統計出現的文章數
           fileList+= value.toString() + ";";//文章次數之間加分號
           intsplitIndex = value.toString().indexOf(":"); 
           sum +=Integer.parseInt(value.toString().substring(splitIndex+1));//統計出現總次數
       }
       sum /= cnt;//計算平均次數
       result.set(fileList);//設定value值
       key.set(key.toString()+ '\t' + String.format("%.2f", sum));//設定key值
       contex.write(key,result);//寫入context
    }
}
這裡最終輸出的key是“單詞平均出現次數”,
Value是“文章:出現次數;……”。

開發環境: Intellijidea + meaven + java1.8

對武俠小說集合的進行倒排索引,輸出檔案中江湖的截圖如下:
完整程式碼如下:
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.hbase.client.HTable;


public class InvertedIndex
{
    private static Configuration conf2 = null;
    static
    {
        conf2 = HBaseConfiguration.create();
    }

    public static void addData(String tableName, String rowKey, String family,
                               String qualifier, String value )throws Exception
    {
        try
        {
            HTable table = new HTable(conf2, tableName);
            Put put = new Put(Bytes.toBytes(rowKey));
            put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
            table.put(put);
            System.out.println("insert success!");
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }

    public static class Map extends Mapper<Object,Text,Text,Text>
    {
        private Text valueInfo = new Text();
        private Text keyInfo = new Text();
        private FileSplit split;
        public void map(Object key, Text value,Context context) throws IOException, InterruptedException
        {
            split = (FileSplit) context.getInputSplit();
            StringTokenizer stk = new StringTokenizer(value.toString());
            while (stk.hasMoreElements())
            {
                String name = split.getPath().getName();
                int splitIndex = name.indexOf(".");
                keyInfo.set(stk.nextToken() + ":" + name.substring(0, splitIndex));
                valueInfo.set("1");
                context.write(keyInfo, valueInfo);
            }
        }
    }

    public static class Combiner extends Reducer<Text,Text,Text,Text>
    {
        Text info = new Text();
        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException
        {
            int sum = 0;
            for (Text value : values)
            {
                sum += Integer.parseInt(value.toString());
            }
            int splitIndex = key.toString().indexOf(":");
            info.set(key.toString().substring(splitIndex+1) + ":" + sum);
            key.set(key.toString().substring(0,splitIndex));
            context.write(key, info);
        }
    }

    public static class Reduce extends Reducer<Text,Text,Text,Text>
    {
        private Text result = new Text();
        public void reduce(Text key, Iterable<Text> values,Context contex) throws IOException, InterruptedException
        {
            //生成文件列表
            String fileList = new String();
            double sum = 0 , cnt = 0;
            for (Text value : values)
            {
                cnt++;
                fileList += value.toString() + ";";
                int splitIndex = value.toString().indexOf(":");
                sum += Integer.parseInt(value.toString().substring(splitIndex+1));
            }
            sum /= cnt;

            result.set(fileList);
            //key.set(key.toString() + '\t' + String.format("%.2f", sum));
            try
            {
                addData("test", key.toString(), "BigData", "aveNum", String.format("%.2f", sum));
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
            contex.write(key, result);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
    {
        Configuration conf = new Configuration();//配置物件
        Job job = new Job(conf,"InvertedIndex");//新建job
        job.setJarByClass(InvertedIndex.class);//job類

        job.setMapperClass(Map.class);//map設定
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setCombinerClass(Combiner.class);//combiner設定

        job.setReducerClass(Reduce.class);//reduce設定
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //FileInputFormat.addInputPath(job, new Path("/data/wuxia_novels/"));//路徑設定
        //FileOutputFormat.setOutputPath(job, new Path("/user/2016st28/exp2/"));
        FileInputFormat.addInputPath(job, new Path("/input/exp2/"));//路徑設定
        FileOutputFormat.setOutputPath(job, new Path("/output/test/"));

        System.exit(job.waitForCompletion(true)?0:1);
    }
}