1. 程式人生 > >自定義GroupingComparator -- 求出每一筆訂單中成交金額最大的一筆交易

自定義GroupingComparator -- 求出每一筆訂單中成交金額最大的一筆交易

程式碼地址:
https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/secondarySort

訂單id 商品id 成交金額
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000002 Pdt_05 722.4
Order_0000003 Pdt_01 222.8

現在需要求出每一個訂單中成交金額最大的一筆交易

分析:
相同的訂單id必須到同一個reduce去才能進行統計出每個訂單中數量最大的那筆。
寫一個Partition方法,只要是訂單相同的就讓他們到同一個reduce中。
但是傳遞過去的給同一個reduce進行處理的資料都是相同的訂單id,但是卻是三個不同的bean,三個bean是不能看成一個key的。

OrderBean:

package com.thp.bigdata.secondarySort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

/**
 * 訂單
 * @author 湯小萌
 *
 */
public class OrderBean implements WritableComparable<OrderBean>{
	private Text itemId;		// 訂單id
	private DoubleWritable mount;	// 訂單數量
	
	public OrderBean() {}
	
	public OrderBean(Text itemId, DoubleWritable mount) {
		set(itemId, mount);
	}
	public void set(Text itemId, DoubleWritable mount) {
		this.itemId = itemId;
		this.mount = mount;
	}
	
	public Text getItemId() {
		return itemId;
	}
	public void setItemId(Text itemId) {
		this.itemId = itemId;
	}
	public DoubleWritable getMount() {
		return mount;
	}
	public void setMount(DoubleWritable mount) {
		this.mount = mount;
	}
	
	
	
	
	@Override
	public String toString() {
		return itemId + "\t" + mount.get();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(itemId.toString());
		out.writeDouble(mount.get());
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.itemId = new Text(in.readUTF());
		this.mount = new DoubleWritable(in.readDouble());
	}
	
	// 【注意:】
	//  這個方法是進行排序的
	/**
	 * 在記憶體往外溢位的時候需要呼叫比較方法進行排序
	 * 在檔案進行合併  merge  的時候也需要呼叫比較方法進行排序
	 */
	@Override
	public int compareTo(OrderBean o) {
		int cmp = this.itemId.compareTo(o.getItemId());
		if(cmp == 0) {
			// 加上了  -  號  就變成了倒序排序了  從大往小排序
			cmp = -this.mount.compareTo(o.mount);
		}
		return cmp;
	}
	
}

ItemIdPartitioner :

package com.thp.bigdata.secondarySort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

import com.thp.bigdata.secondarySort.OrderBean;

/**
 * 自定義的Paritioner:
 * 		讓相同的id分到相同的partition 進行處理
 * @author 湯小萌
 *
 */
public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable> {
	
	/**
	 * 相同id的OrderBean會發往相同的parttion
	 * 而且產生的分割槽數,是會跟使用者設定的 reduce task保持一致
	 * numPartitions  就是 設定的 reduce task
	 */
	@Override
	public int getPartition(OrderBean bean, NullWritable value, int numPartitions) {
		return (bean.getItemId().hashCode() & Integer.MAX_VALUE) % numPartitions;
	}

}

ItemIdGroupingComparator :

package com.thp.bigdata.secondarySort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import com.thp.bigdata.secondarySort.OrderBean;

/**
 * 利用reduce端的ItemIdGroupingComparator來實現將相同的id的OrderBean看成相同的Key
 * @author 湯小萌
 *
 */
public class ItemIdGroupingComparator extends WritableComparator {
	
	// 這個構造方法是一定要有的
	// 傳入作為key的bean的class型別,已經制定主要讓框架做反射的例項物件
	protected ItemIdGroupingComparator() {
		super(OrderBean.class, true);
	}
	
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		OrderBean aBean = (OrderBean) a;
		OrderBean bBean = (OrderBean) b;
		// 相同的orderId就認為是相同的key
		return aBean.getItemId().compareTo(bBean.getItemId());
	}
}

MapReduce過程

package com.thp.bigdata.secondarySort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondarySort {
	
	
	/**
	 * 	Order_0000001,Pdt_01,222.8
		Order_0000001,Pdt_05,25.8
		Order_0000002,Pdt_05,325.8
		Order_0000002,Pdt_03,522.8
		Order_0000002,Pdt_04,122.4
		Order_0000003,Pdt_01,222.8
	 * 
	 * 由於Orderbean定義了compareTo方法,所以在shuffle階段就會進行排序
	 * 接下來就是要使用自定義的partitioner進行分割槽
	 * 我們進行分割槽的目的是要將相同的id的OrderBean發往相同的partition進行處理
	 * 每一個partition拿到的都是相同的id的OrderBean
	 * 但是key卻不是一樣的,我們現在要欺騙parition,讓它以為相同id的OrderBean都是相同的key
	 * 那麼處理的時候,就會只保留第一個key,就是我們之前排序好放在最前面的key就是這個id下的訂單數量最高的OrderBean
	 * 
	 */
	static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
		OrderBean bean = new OrderBean();
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			System.out.println(line);
			String[] fields = line.split(",");
			// System.out.println(fields[0] + " -- " + fields[2]);
			bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));
			// System.out.println(bean.getItemId());
			context.write(bean, NullWritable.get());
		}
	}
	
	static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
		@Override
		protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context)
				throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}
	}
	
	public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SecondarySort.class);
		
		job.setMapperClass(SecondarySortMapper.class);
		job.setReducerClass(SecondarySortReducer.class);
		
		
		job.setOutputKeyClass(OrderBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path("f:/order/input"));
		FileOutputFormat.setOutputPath(job, new Path("f:/order/output"));
		
		job.setGroupingComparatorClass(ItemIdGroupingComparator.class);
		job.setPartitionerClass(ItemIdPartitioner.class);
		job.setNumReduceTasks(3);
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	
	
}