1. 程式人生 > >hadoop MapReduce —— 輸出每個單詞所對應的檔案

hadoop MapReduce —— 輸出每個單詞所對應的檔案

下面是四個檔案及其內容。

程式碼實現:

Mapper:
package cn.tedu.invert;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertMapper extends Mapper<LongWritable, Text, Text, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 獲取檔名 FileSplit fileSplit = (FileSplit)context.getInputSplit(); String pathName
= fileSplit.getPath().getName(); // 將檔案中的內容提取 String[] words = value.toString().split(" "); // 每一個單詞都對應著自己所在檔案的檔名 for(String word:words){ context.write(new Text(word), new Text(pathName)); } } }
Reducer:
package cn.tedu.invert;

import java.io.IOException; import java.util.HashSet; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class InvertReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 雜湊表不存重複元素,將重複的檔名去掉 HashSet<String> set = new HashSet<>(); for (Text text : values) { set.add(text.toString()); } StringBuilder sb = new StringBuilder(); for (String str : set) { sb.append(str.toString()).append(" "); } context.write(key, new Text(sb.toString())); } }

 Driver:

package cn.tedu.invert;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "JobName");
        job.setJarByClass(cn.tedu.invert.InvertDriver.class);
        job.setMapperClass(InvertMapper.class);
        job.setReducerClass(InvertReducer.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.74.129:9000/text/invert"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.74.129:9000/result/invert_result"));

        if (!job.waitForCompletion(true))
            return;
    }
}

結果: