MapReduce部分API程式設計練習(好友推薦)
阿新 • • 發佈:2018-12-11
1、主方法
package com.bjsxt.FOF; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyFOF { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(true); Job job=Job.getInstance(conf); job.setJarByClass(MyFOF.class); job.setJobName("ooxx"); Path filein=new Path("/user/local"); FileInputFormat.addInputPath(job, filein); Path fileout=new Path("/data"); if(fileout.getFileSystem(conf).exists(fileout)) { fileout.getFileSystem(conf).delete(fileout); } FileOutputFormat.setOutputPath(job, fileout); // job.setInputFormatClass(cls); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // job.setPartitionerClass(MyPartitioner.class); // job.setSortComparatorClass(MyComparator.class); // job.setGroupingComparatorClass(MyGrouping.class); // job.setCombinerClass(MyCombiner.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(2); job.waitForCompletion(true); } }
2、map方法
package com.bjsxt.FOF; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text mkey=new Text(); IntWritable mval=new IntWritable(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //相同的key為一組,這一組key呼叫一次reduce方法 //value::::tom hello hadoop cat String[] split = StringUtils.split(value.toString(),' '); for(int i=1;i<split.length;i++) { mkey.set(getfof(split[0],split[i])); mval.set(0); context.write(mkey, mval); for(int j=i+1;j<split.length;j++) { mkey.set(getfof(split[i],split[j])); mval.set(1); context.write(mkey, mval); } } } public static String getfof(String s1,String s2) { if(s1.compareTo(s2)<0) { return s1+":"+s2; }else { return s2+":"+s1; } } }
3、reduce方法
package com.bjsxt.FOF; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MRBench.Reduce; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ IntWritable rval=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { //相同的key為一組,這一組資料呼叫一次reduce方法 //方法內迭代這一組資料 //hadoop:hadoop 0 int flag=0; int sum=0; for(IntWritable v:values) { if(v.get()==0) { flag=1; } sum+=v.get(); if(flag==0) { rval.set(sum); context.write(key, rval); } } } }