大資料入門(12)mr倒排索引.
阿新 • • 發佈:2018-11-10
package com.hadoop.hdfs.mr.flowsort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.hadoop.hdfs.mr.flowsum.FlowBean; public class SortMR { public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String phoneNB = fields[0]; long u_flow = Long.parseLong(fields[1]); long d_flow = Long.parseLong(fields[2]); context.write(new FlowBean(phoneNB, u_flow, d_flow), NullWritable.get()); }; } public static class SortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ protected void reduce(FlowBean bean, java.lang.Iterable<NullWritable> values, Context context) throws IOException ,InterruptedException { String phoneNB = bean.getPhoneNB(); context.write(new Text(phoneNB), bean); }; } public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(SortMR.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); int status = job.waitForCompletion(true)?0:1; System.exit(status); } }