大数据-Hadoop生态(18)-MapReduce框架原理-WritableComparable排序和GroupingComparator分组
1.排序概述
2.排序分类
3.WritableComparable案例
这个文件,是 ofollow,noindex" target="_blank">大数据-Hadoop生态(12)-Hadoop序列化和源码追踪 的输出文件,可以看到,文件根据key,也就是手机号进行了字典排序
字段含义分别为手机号,上行流量,下行流量,总流量
需求是根据总流量进行排序
Bean对象,需要实现序列化,反序列化和Comparable接口
package com.nty.writableComparable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * author nty * date time 2018-12-12 16:33 */ /** * 实现WritableComparable接口 * 原先将bean序列化时,需要实现Writable接口,现在再实现Comparable接口 * * public interface WritableComparable<T> extends Writable, Comparable<T> * * 所以我们可以实现Writable和Comparable两个接口,也可以实现WritableComparable接口 */ public class Flow implements WritableComparable<Flow> { private long upflow; private long downflow; private long total; public long getUpflow() { return upflow; } public void setUpflow(long upflow) { this.upflow = upflow; } public long getDownflow() { return downflow; } public void setDownflow(long downflow) { this.downflow = downflow; } public long getTotal() { return total; } public void setTotal(long total) { this.total = total; } //快速赋值 public void setFlow(long upflow, long downflow){ this.upflow = upflow; this.downflow = downflow; this.total = upflow + downflow; } @Override public String toString() { return upflow + "\t" + downflow + "\t" + total; } //重写compareTo方法 @Override public int compareTo(Flow o) { return Long.compare(o.total, this.total); } //序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upflow); out.writeLong(downflow); out.writeLong(total); } //反序列化方法 @Override public void readFields(DataInput in) throws IOException { upflow = in.readLong(); downflow = in.readLong(); total = in.readLong(); } }
Mapper类
package com.nty.writableComparable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * author nty * date time 2018-12-12 16:47 */ public class FlowMapper extends Mapper<LongWritable, Text, Flow, Text> { private Text phone = new Text(); private Flow flow = new Flow(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //13470253144180180360 //分割行数据 String[] flieds = value.toString().split("\t"); //赋值 phone.set(flieds[0]); flow.setFlow(Long.parseLong(flieds[1]), Long.parseLong(flieds[2])); //写出 context.write(flow, phone); } }
Reducer类
package com.nty.writableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * author nty * date time 2018-12-12 16:47 */ //注意一下输出类型 public class FlowReducer extends Reducer<Flow, Text, Text, Flow> { @Override protected void reduce(Flow key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { //输出 context.write(value,key); } } }
Driver类
package com.nty.writableComparable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * author nty * date time 2018-12-12 16:47 */ public class FlowDriver { public static void main(String[] args) throwsException { //1. 获取Job实例 Configuration configuration = new Configuration(); Job instance = Job.getInstance(configuration); //2. 设置类路径 instance.setJarByClass(FlowDriver.class); //3. 设置Mapper和Reducer instance.setMapperClass(FlowMapper.class); instance.setReducerClass(FlowReducer.class); //4. 设置输出类型 instance.setMapOutputKeyClass(Flow.class); instance.setMapOutputValueClass(Text.class); instance.setOutputKeyClass(Text.class); instance.setOutputValueClass(Flow.class); //5. 设置输入输出路径 FileInputFormat.setInputPaths(instance, new Path("d:\\Hadoop_test")); FileOutputFormat.setOutputPath(instance, new Path("d:\\Hadoop_test_out")); //6. 提交 boolean b = instance.waitForCompletion(true); System.exit(b ? 0 : 1); } }
结果
4.GroupingComparator案例
订单id 商品id 商品金额
0000001Pdt_01222.8 0000002Pdt_05722.4 0000001Pdt_0233.8 0000003Pdt_06232.8 0000003Pdt_0233.8 0000002Pdt_03522.8 0000002Pdt_04122.4
求出每一个订单中最贵的商品
需求分析:
1) 将订单id和商品金额作为key,在Map阶段先用订单id升序排序,如果订单id相同,再用商品金额降序排序
2) 在Reduce阶段,用groupingComparator按照订单分组,每一组的第一个即是最贵的商品
先定义bean对象,重写序列化反序列话排序方法
package com.nty.groupingComparator; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * author nty * date time 2018-12-12 18:07 */ public class Order implements WritableComparable<Order> { private String orderId; private String productId; private double price; public String getOrderId() { return orderId; } public Order setOrderId(String orderId) { this.orderId = orderId; return this; } public String getProductId() { return productId; } public Order setProductId(String productId) { this.productId = productId; return this; } public double getPrice() { return price; } public Order setPrice(double price) { this.price = price; return this; } @Override public String toString() { return orderId + "\t" + productId + "\t" + price; } @Override public int compareTo(Order o) { //先按照订单排序,正序 int compare = this.orderId.compareTo(o.getOrderId()); if(0 == compare){ //订单相同,再比较价格,倒序 return Double.compare( o.getPrice(),this.price); } return compare; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeUTF(productId); out.writeDouble(price); } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.productId = in.readUTF(); this.price = in.readDouble(); } }
Mapper类
package com.nty.groupingComparator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * author nty * date time 2018-12-12 18:07 */ public class OrderMapper extends Mapper<LongWritable, Text, Order, NullWritable> { private Order order = new Order(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //0000001Pdt_01222.8 //分割行数据 String[] fields = value.toString().split("\t"); //为order赋值 order.setOrderId(fields[0]).setProductId(fields[1]).setPrice(Double.parseDouble(fields[2])); //写出 context.write(order,NullWritable.get()); } }
GroupingComparator类
package com.nty.groupingComparator; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * author nty * date time 2018-12-12 18:08 */ public class OrderGroupingComparator extends WritableComparator { //用作比较的对象的具体类型 public OrderGroupingComparator() { super(Order.class,true); } //重写的方法要选对哦,一共有三个,选择参数为WritableComparable的方法 //默认的compare方法调用的是a,b对象的compare方法,但是现在我们排序和分组的规则不一致,所以要重写分组规则 @Override public int compare(WritableComparable a, WritableComparable b) { Order oa = (Order) a; Order ob = (Order) b; //按照订单id分组 return oa.getOrderId().compareTo(ob.getOrderId()); } }
Reducer类
package com.nty.groupingComparator; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * author nty * date time 2018-12-12 18:07 */ public class OrderReducer extends Reducer<Order, NullWritable,Order, NullWritable> { @Override protected void reduce(Order key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //每一组的第一个即是最高价商品,不需要遍历 context.write(key, NullWritable.get()); } }
Driver类
package com.nty.groupingComparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * author nty * date time 2018-12-12 18:07 */ public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1获取实例 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); //2设置类路径 job.setJarByClass(OrderDriver.class); //3.设置Mapper和Reducer job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); //4.设置自定义分组类 job.setGroupingComparatorClass(OrderGroupingComparator.class); //5. 设置输出类型 job.setMapOutputKeyClass(Order.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Order.class); job.setOutputValueClass(NullWritable.class); //6. 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test")); FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out")); //7. 提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
输出结果