1. 程式人生 > >Hadoop MultipleInputs.addInputPath 讀取多個路徑

Hadoop MultipleInputs.addInputPath 讀取多個路徑

MultipleInputs.addInputPath

作用

可以指定多個輸入路徑,每個路徑都可以指定相應的map方法

使用方法

MultipleInputs.addInputPath
(Job job, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass)

舉例

使用wordcount來舉例
F:\hadooptest\wordcount\input1下有個word.txt,單詞用空格分割

aa bb cc

dd ee ff 

aa  bb  ff

F:\hadooptest\wordcount\input2下有個word.txt。單詞用 ## 分割

aa##bb##cc
ee##gg##kk

程式碼

package com.myhadoop.multiple;

import com.myhadoop.mapreduce.test.WordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.StringTokenizer; /** * Created by kaishun on 2017/7/31. */ public class TestMultipleInputs { public static class MapA extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String lines = value.toString(); String strs[] = lines.split("\\s+"); for (int i = 0; i <strs.length ; i++) { word.set(strs[i]); context.write(word, one); } } } public static class MapB extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String lines = value.toString(); String strs[] = lines.split("##"); for (int i = 0; i <strs.length ; i++) { word.set(strs[i]); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJobName("MultipleWordCount"); job.setJarByClass(WordCount.class); //多個輸入,分別對應不同的map MultipleInputs.addInputPath(job,new Path("F:\\hadooptest\\wordcount\\input1"),TextInputFormat.class,WordCount.MapA.class); MultipleInputs.addInputPath(job,new Path("F:\\hadooptest\\wordcount\\input2"),TextInputFormat.class,WordCount.MapB.class); job.setNumReduceTasks(1); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //分到一個reduce job.setReducerClass(WordCount.Reduce.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

輸出

aa  3
bb  3
cc  2
dd  1
ee  2
ff  2
gg  1
kk  1