1. 程式人生 > >MapReduce處理多個不同的出入檔案

MapReduce處理多個不同的出入檔案

MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式
現有兩份資料
phone
123,good number
124,common number
125,bad number

user
zhangsan,123
lisi,124
wangwu,125

現在需要把user和phone按照phone number連線起來。得到下面的結果
zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number

分析思路(不同檔案之間每行資料有相同的kay,在map階段相同key的不同value就形成了一個集合,在map階段對該集合裡的value進行組合進而得到想要的結果)

還是相當於兩張表的一對一join操作。join時對value設定個Bean(JavaBean實現writablecomparable介面),key為外來鍵值

join的優化,詳見http://blog.csdn.net/u010366796/article/details/44649933,設定KeyBean(外健和標識flag屬性),進行排序

本例中將通過value進行排序,即在value的JavaBean中通過實習CompareTo()方法,完成排序,使得phone表位於首位

1.對value實現JavaBean(實現writablecomparable介面)

01.package test.mr.multiinputs;
02. 03.import java.io.DataInput; 04.import java.io.DataOutput; 05.import java.io.IOException; 06. 07.import org.apache.hadoop.io.WritableComparable; 08. 09./* 10.* 自定義的JavaBean 11.*/ 12.public class FlagString implements WritableComparable<FlagString> { 13.private String value; 14.private int flag; // 標記 0:表示phone表 1:表示user表
15. 16.public FlagString() { 17.super(); 18.// TODO Auto-generated constructor stub 19.} 20. 21.public FlagString(String value, int flag) { 22.super(); 23.this.value = value; 24.this.flag = flag; 25.} 26. 27.public String getValue() { 28.return value; 29.} 30. 31.public void setValue(String value) { 32.this.value = value; 33.} 34. 35.public int getFlag() { 36.return flag; 37.} 38. 39.public void setFlag(int flag) { 40.this.flag = flag; 41.} 42. 43.@Override 44.public void write(DataOutput out) throws IOException { 45.out.writeInt(flag); 46.out.writeUTF(value); 47. 48.} 49. 50.@Override 51.public void readFields(DataInput in) throws IOException { 52.this.flag = in.readInt(); 53.this.value = in.readUTF(); 54.} 55. 56.@Override 57.public int compareTo(FlagString o) { 58.if (this.flag >= o.getFlag()) { 59.if (this.flag > o.getFlag()) { 60.return 1; 61.} 62.} else { 63.return -1; 64.} 65.return this.value.compareTo(o.getValue()); 66.} 67. 68.}


2.多map類,map1(實現對phone表文件操作)

01.package test.mr.multiinputs; 02. 03.import java.io.IOException; 04. 05.import org.apache.hadoop.io.LongWritable; 06.import org.apache.hadoop.io.Text; 07.import org.apache.hadoop.mapreduce.Mapper; 08. 09.public class MultiMap1 extends Mapper<LongWritable, Text, Text, FlagString> { 10.private String delimiter; // 定義分隔符,由job端設定 11. 12.@Override 13.protected void setup( 14.Mapper<LongWritable, Text, Text, FlagString>.Context context) 15.throws IOException, InterruptedException { 16.delimiter = context.getConfiguration().get("delimiter", ","); 17.} 18. 19.@Override 20.protected void map(LongWritable key, Text value, 21.Mapper<LongWritable, Text, Text, FlagString>.Context context) 22.throws IOException, InterruptedException { 23.String line = value.toString().trim(); 24.if (line.length() > 0) { 25.String[] str = line.split(delimiter); 26.if (str.length == 2) { 27.context.write(new Text(str[0].trim()), 28.new FlagString(str[1].trim(), 0)); // flag=0,表示phone表 29.} 30.} 31.} 32.}


2.map2(實現對user表文件操作)

01.package test.mr.multiinputs; 02. 03.import java.io.IOException; 04. 05.import org.apache.hadoop.io.LongWritable; 06.import org.apache.hadoop.io.Text; 07.import org.apache.hadoop.mapreduce.Mapper; 08. 09.public class MultiMap2 extends Mapper<LongWritable, Text, Text, FlagString> { 10.private String delimiter; // 設定分隔符 11. 12.@Override 13.protected void setup( 14.Mapper<LongWritable, Text, Text, FlagString>.Context context) 15.throws IOException, InterruptedException { 16.delimiter = context.getConfiguration().get("delimiter", ","); 17.} 18. 19.@Override 20.protected void map(LongWritable key, Text value, 21.Mapper<LongWritable, Text, Text, FlagString>.Context context) 22.throws IOException, InterruptedException { 23.String line = value.toString().trim(); 24.if (line.length() > 0) { 25.String[] str = line.split(delimiter); 26.if (str.length == 2) { 27.context.write(new Text(str[1].trim()), 28.new FlagString(str[0].trim(), 1)); // flag=1為user表 29.} 30.} 31.} 32.}


3.reduce類

01.package test.mr.multiinputs; 02. 03.import java.io.IOException; 04. 05.import org.apache.hadoop.io.NullWritable; 06.import org.apache.hadoop.io.Text; 07.import org.apache.hadoop.mapreduce.Reducer; 08. 09.public class MultiRedu extends Reducer<Text, FlagString, NullWritable, Text> { 10.private String delimiter; // 設定分隔符 11. 12.@Override 13.protected void setup( 14.Reducer<Text, FlagString, NullWritable, Text>.Context context) 15.throws IOException, InterruptedException { 16.delimiter = context.getConfiguration().get("delimiter", ","); 17.} 18. 19.@Override 20.protected void reduce(Text key, Iterable<FlagString> values, 21.Reducer<Text, FlagString, NullWritable, Text>.Context context) 22.throws IOException, InterruptedException { 23.// 最後輸出的格式為: uservalue,key,phonevalue 24.String phoneValue = ""; 25.String userValue = ""; 26.int num = 0; 27.for (FlagString value : values) { 28.// 第一個即為phone表 29.if (num == 0) { 30.phoneValue = value.getValue(); 31.num++; 32.} else { 33.userValue = value.getValue(); 34.context.write(NullWritable.get(), 35.new Text(userValue + key.toString() + phoneValue)); 36.} 37.} 38.} 39.}


4.job類(關鍵!!實現多檔案的輸入格式等)

001.package test.mr.multiinputs; 002. 003.import org.apache.hadoop.conf.Configuration; 004.import org.apache.hadoop.fs.Path; 005.import org.apache.hadoop.io.NullWritable; 006.import org.apache.hadoop.io.Text; 007.import org.apache.hadoop.mapreduce.Job; 008.import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; 009.import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 010.import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 011.import org.apache.hadoop.util.Tool; 012.import org.apache.hadoop.util.ToolRunner; 013. 014./* 015.* MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式 016.現有兩份資料 017.phone 018.123,good number 019.124,common number 020.123,bad number 021. 022.user 023.zhangsan,123 024.lisi,124 025.wangwu,125 026. 027.現在需要把user和phone按照phone number連線起來。得到下面的結果 028.zhangsan,123,good number 029.lisi,123,common number 030.wangwu,125,bad number 031.*/ 032.public class MultiMapMain extends Configuration implements Tool { 033.private String input1 = null; // 定義的多個輸入檔案 034.private String input2 = null; 035.private String output = null; 036.private String delimiter = null; 037. 038.@Override 039.public void setConf(Configuration conf) { 040. 041.} 042. 043.@Override 044.public Configuration getConf() { 045.return new Configuration(); 046.} 047. 048.@Override 049.public int run(String[] args) throws Exception { 050.setArgs(args); 051.checkParam();// 對引數進行檢測 052. 053.Configuration conf = new Configuration(); 054.Job job = new Job(conf); 055.job.setJarByClass(MultiMapMain.class); 056. 057.job.setMapOutputKeyClass(Text.class); 058.job.setMapOutputValueClass(FlagString.class); 059. 060.job.setReducerClass(MultiRedu.class); 061.job.setOutputKeyClass(NullWritable.class); 062.job.setOutputValueClass(Text.class); 063. 064.// MultipleInputs類新增檔案路徑 065.MultipleInputs.addInputPath(job, new Path(input1), 066.TextInputFormat.class, MultiMap1.class); 067.MultipleInputs.addInputPath(job, new Path(input2), 068.TextInputFormat.class, MultiMap2.class); 069. 070.FileOutputFormat.setOutputPath(job, new Path(output)); 071.job.waitForCompletion(true); 072.return 0; 073.} 074. 075.private void checkParam() { 076.if (input1 == null || "".equals(input1.trim())) { 077.System.out.println("no input phone-data path"); 078.userMaunel(); 079.System.exit(-1); 080.} 081.if (input2 == null || "".equals(input2.trim())) { 082.System.out.println("no input user-data path"); 083.userMaunel(); 084.System.exit(-1); 085.} 086.if (output == null || "".equals(output.trim())) { 087.System.out.println("no output path"); 088.userMaunel(); 089.System.exit(-1); 090.} 091.if (delimiter == null || "".equals(delimiter.trim())) { 092.System.out.println("no delimiter"); 093.userMaunel(); 094.System.exit(-1); 095.} 096. 097.} 098. 099.// 使用者手冊 100.private void userMaunel() { 101.System.err.println("Usage:"); 102.System.err.println("-i1 input    phone data path."); 103.System.err.println("-i2 input    user data path."); 104.System.err.println("-o output    output data path."); 105.System.err.println("-delimiter data delimiter    default comma."); 106.} 107. 108.// 對屬性進行賦值 109.// 設定輸入的格式:-i1 xxx(輸入目錄) -i2 xxx(輸入目錄) -o xxx(輸出目錄) -delimiter x(分隔符) 110.private void setArgs(String[] args) { 111.for (int i = 0; i < args.length; i++) { 112.if ("-i1".equals(args[i])) { 113.input1 = args[++i]; // 將input1賦值為第一個檔案的輸入路徑 114.} else if ("-i2".equals(args[i])) { 115.input2 = args[++i]; 116.} else if ("-o".equals(args[i])) { 117.output = args[++i]; 118.} else if ("-delimiter".equals(args[i])) { 119.delimiter = args[++i]; 120.} 121.} 122.} 123. 124.public static void main(String[] args) throws Exception { 125.Configuration conf = new Configuration(); 126.ToolRunner.run(conf, new MultiMapMain(), args); // 呼叫run方法 127.} 128.}