1. 程式人生 > >MR中自定義bean作為key,輸出某組排序中最大值。

MR中自定義bean作為key,輸出某組排序中最大值。

目錄

  • 需求:MR中自定義bean作為key,輸出某組排序中最大值。
  • 方案:重寫MR中groupingcomparator方法

1.需求:MR中自定義bean作為key,輸出某組排序中最大值。

   場景:求出多個訂單中,金額最大的商品價格。

2.方案:重寫MR中 groupingcomparator 方法

  •       定義排序:orderBean 類,重寫compareto方法,訂單相同時,比較價格,並且降序desc

public class OrderBean

implements WritableComparable<OrderBean>{

 

            private Text itemid;

            private DoubleWritable amount;

 

            public

OrderBean() {

            }

 

            public OrderBean(Text itemid, DoubleWritable amount) {

                       set(

itemid, amount);

 

            }

 

            public void set(Text itemid, DoubleWritable amount) {

 

                       this.itemid = itemid;

                       this.amount = amount;

 

            }

 

 

 

            public Text getItemid() {

                       return itemid;

            }

 

            public DoubleWritable getAmount() {

                       return amount;

            }

 

            @Override

            public int compareTo(OrderBean o) {

                       int cmp = this.itemid.compareTo(o.getItemid());

                       if (cmp == 0) {

                                   cmp = -this.amount.compareTo(o.getAmount());

                       }

                       return cmp;

            }

 

            @Override

            public void write(DataOutput out) throws IOException {

                       out.writeUTF(itemid.toString());

                       out.writeDouble(amount.get());

                      

            }

 

            @Override

            public void readFields(DataInput in) throws IOException {

                       String readUTF = in.readUTF();

                       double readDouble = in.readDouble();

                      

                       this.itemid = new Text(readUTF);

                       this.amount= new DoubleWritable(readDouble);

            }

 

 

            @Override

            public String toString() {

 

                       return itemid.toString() + "\t" + amount.get();

                      

            }

 

}

     

  •      定義分割槽:itemPartitioner 類,按照訂單id分組

public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{

            @Override

            public int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {

                       //相同id的訂單bean,會發往相同的partition

                       //而且,產生的分割槽數,是會跟使用者設定的reduce task數保持一致

                       return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;

            }

}

 

 

  •      定義groupingComparator,利用reduce中該元件將訂單相同的bean組成一組

public class ItemidGroupingComparator extends WritableComparator {

 

         //傳入作為key的bean的class型別,以及制定需要讓框架做反射獲取例項物件

         protected ItemidGroupingComparator() {

                  super(OrderBean.class, true);

         }

        

 

         @Override

         public int compare(WritableComparable a, WritableComparable b) {

                  OrderBean abean = (OrderBean) a;

                  OrderBean bbean = (OrderBean) b;

                 

                  //比較兩個bean時,指定只比較bean中的orderid

                  return abean.getItemid().compareTo(bbean.getItemid());

                 

         }

 

}

 

      執行類SecondarySort

public class SecondarySort {

           

            static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{

                      

                       OrderBean bean = new OrderBean();

                      

                       @Override

                       protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

                                   String line = value.toString();

                                   String[] fields = StringUtils.split(line, ",");

                                  

                                   bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));

                                  

                                   context.write(bean, NullWritable.get());

                                  

                       }

                      

            }

           

            static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{

                      

                      

                       //到達reduce時,相同id的所有bean已經被看成一組,且金額最大的那個一排在第一位

                       @Override

                       protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

                                   context.write(key, NullWritable.get());

                       }

            }

           

           

            public static void main(String[] args) throws Exception {

                      

                       Configuration conf = new Configuration();

                       Job job = Job.getInstance(conf);

                      

                       job.setJarByClass(SecondarySort.class);

                      

                       job.setMapperClass(SecondarySortMapper.class);

                       job.setReducerClass(SecondarySortReducer.class);

                      

                      

                       job.setOutputKeyClass(OrderBean.class);

                       job.setOutputValueClass(NullWritable.class);

                      

                       FileInputFormat.setInputPaths(job, new Path("hdfs://shizhan01:9000/secondarysort/input"));

                       FileOutputFormat.setOutputPath(job, new Path("hdfs://shizhan01:9000/secondarysort/output3"));

                      

                       //在此設定自定義的Groupingcomparator

                       job.setGroupingComparatorClass(ItemidGroupingComparator.class);

                       //在此設定自定義的partitioner

                       job.setPartitionerClass(ItemIdPartitioner.class);

                      

                       job.setNumReduceTasks(1);

                      

                       job.waitForCompletion(true);

                      

            }

 

}

    預處理資料:

                      

   處理後結果: