1. 程式人生 > >Hadoop使用DATAJOIN軟體包連結不同來源的資料

Hadoop使用DATAJOIN軟體包連結不同來源的資料

具體參見《Hadoop in action》
這裡說一下幾個問題:這幾個問題在stackoverflow 得到了解決
(1)如何輸入多個檔案

  1. 將多個檔案放入一個資料夾,輸入路徑寫資料夾的路徑
  2. MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,MapClass.class);

(2)TaggedWritable要定義一個無參的建構函式,後面reduce反射的時候會用到

(3)呼叫data.readFields的時候,data有可能是空,而且並不知道data的型別,所以在TaggedWritable的write方法序列化data之前,儲存一下data的類名,然後在readFields檢查。

程式碼如下:

package Chapter5;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;

import javax.lang.model.SourceVersion;
import javax.print.DocFlavor.STRING;

import org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.util.EnumCounters.Map; import
org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.MultipleInputs; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DataJoin extends Configured implements Tool { public static class TaggedWritable extends TaggedMapOutput{ private Writable data; public TaggedWritable() { this.tag=new Text(); } public TaggedWritable(Writable data) { this.tag=new Text(""); this.data = data; } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public Writable getData() { // TODO Auto-generated method stub return data; } public void setData(Writable data){ this.data=data; } } public static class MapClass extends DataJoinMapperBase{ @Override protected Text generateGroupKey(TaggedMapOutput arg0) { // TODO Auto-generated method stub String line=((Text)arg0.getData()).toString(); String tokens []=line.split(","); return new Text(tokens[0]); } @Override protected Text generateInputTag(String arg0) { // TODO Auto-generated method stub String datasource=arg0.split("-")[0]; return new Text(datasource); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object arg0) { // TODO Auto-generated method stub TaggedMapOutput res=new TaggedWritable((Text)arg0); res.setTag(this.inputTag); return res; } } public static class Reduce extends DataJoinReducerBase{ @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { // TODO Auto-generated method stub if(tags.length<2)return null; String res=""; for(int i=0;i<values.length;i++){ if(i>0)res+=","; TaggedWritable tmp=(TaggedWritable)values[i]; String line=((Text)tmp.getData()).toString(); String tokens[]=line.split(",",2); res+=tokens[1]; } TaggedWritable retv=new TaggedWritable(new Text(res)); retv.setTag((Text)tags[0]); return retv; } } public int run(String[] args) throws Exception{ // TODO Auto-generated method stub Configuration configuration=getConf(); JobConf job=new JobConf(configuration,DataJoin.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setJobName("DataJoin"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub int res=ToolRunner.run(new Configuration(), new DataJoin(), args); System.exit(res); } }