大資料入門(11)mr自定義分組和切片劃分
阿新 • • 發佈:2018-11-10
public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{ private static HashMap<String,Integer> areaMap = new HashMap<String,Integer>(); static{ areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int arg2) { // TODO Auto-generated method stub //從key中拿到手機號,查詢手機歸屬地字典,不同的省份返回不同的組號 String num = key.toString().substring(0, 3); int code = areaMap.get(num)==null?5:areaMap.get(num); return code; } } /*********************************FlowSumArea.java*********************************************************/ package com.hadoop.hdfs.mr.areapartition; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.hadoop.hdfs.mr.flowsum.FlowBean; public class FlowSumArea { public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //第一行資料 String line = value.toString(); //切分 String[] fields = line.split("\t"); //拿到需要的資料 String phoneNB = fields[1]; long u_flow = Long.parseLong(fields[7]); long d_flow = Long.parseLong(fields[8]); //封裝資料為kv形式 context.write(new Text(phoneNB), new FlowBean(phoneNB, u_flow, d_flow)); } } public static class FlowSumAreaReduce extends Reducer<Text, FlowBean, Text, FlowBean>{ protected void reduce(Text text, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub long u_count = 0; long d_count = 0 ; for (FlowBean bean : values){ u_count+=bean.getUp_flow(); d_count+=bean.getD_flow(); } context.write(new Text(text), new FlowBean(text.toString(), u_count, d_count)); } } public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowSumArea.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReduce.class); //設定自定義的分組邏輯定義 job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //設定reduce 的任務併發數,跟分組的數量保持一致 job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); int status = job.waitForCompletion(true)?0:1; System.exit(status); } }