MR中自定義bean作為key,輸出某組排序中最大值。
目錄
- 需求:MR中自定義bean作為key,輸出某組排序中最大值。
- 方案:重寫MR中groupingcomparator方法
1.需求:MR中自定義bean作為key,輸出某組排序中最大值。
場景:求出多個訂單中,金額最大的商品價格。
2.方案:重寫MR中 groupingcomparator 方法
- 定義排序:orderBean 類,重寫compareto方法,訂單相同時,比較價格,並且降序desc
public class OrderBean
private Text itemid; private DoubleWritable amount;
public }
public OrderBean(Text itemid, DoubleWritable amount) { set(
}
public void set(Text itemid, DoubleWritable amount) {
this.itemid = itemid; this.amount = amount;
}
public Text getItemid() { return itemid; }
public DoubleWritable getAmount() { return amount; }
@Override public int compareTo(OrderBean o) { int cmp = this.itemid.compareTo(o.getItemid()); if (cmp == 0) { cmp = -this.amount.compareTo(o.getAmount()); } return cmp; }
@Override public void write(DataOutput out) throws IOException { out.writeUTF(itemid.toString()); out.writeDouble(amount.get());
}
@Override public void readFields(DataInput in) throws IOException { String readUTF = in.readUTF(); double readDouble = in.readDouble();
this.itemid = new Text(readUTF); this.amount= new DoubleWritable(readDouble); }
@Override public String toString() {
return itemid.toString() + "\t" + amount.get();
}
} |
- 定義分割槽:itemPartitioner 類,按照訂單id分組
public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) { //相同id的訂單bean,會發往相同的partition //而且,產生的分割槽數,是會跟使用者設定的reduce task數保持一致 return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
|
- 定義groupingComparator,利用reduce中該元件將訂單相同的bean組成一組
public class ItemidGroupingComparator extends WritableComparator {
//傳入作為key的bean的class型別,以及制定需要讓框架做反射獲取例項物件 protected ItemidGroupingComparator() { super(OrderBean.class, true); }
@Override public int compare(WritableComparable a, WritableComparable b) { OrderBean abean = (OrderBean) a; OrderBean bbean = (OrderBean) b;
//比較兩個bean時,指定只比較bean中的orderid return abean.getItemid().compareTo(bbean.getItemid());
}
} |
執行類SecondarySort
public class SecondarySort {
static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean bean = new OrderBean();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); String[] fields = StringUtils.split(line, ",");
bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));
context.write(bean, NullWritable.get());
}
}
static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
//到達reduce時,相同id的所有bean已經被看成一組,且金額最大的那個一排在第一位 @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondarySortMapper.class); job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://shizhan01:9000/secondarysort/input")); FileOutputFormat.setOutputPath(job, new Path("hdfs://shizhan01:9000/secondarysort/output3"));
//在此設定自定義的Groupingcomparator類 job.setGroupingComparatorClass(ItemidGroupingComparator.class); //在此設定自定義的partitioner類 job.setPartitionerClass(ItemIdPartitioner.class);
job.setNumReduceTasks(1);
job.waitForCompletion(true);
}
} |
預處理資料:
處理後結果: