1. 程式人生 > >MapReduce-自定義Key-二次排序

MapReduce-自定義Key-二次排序

這個例項緊接上一個TopK的例項最後留下的一個問題的解決以及對新的一個技術點的說明,如何自定義輸入輸出的資料型別,這裡也大概引出mapreduce中二次排序的大致思想,但不著重說明二次排序,只是大致說明自定義輸入型別的基本步驟,因為做剛接觸二次排序的時候當時陷入一個思想上的誤區,為了把這個過程記錄下來,所以會在下一篇部落格中著重說明二次排序,為了說明問題我把他說成是“三次排序”可參見《MapReduce-三次排序-曾經想不通的二次排序》。

自定義Key的基本步驟:
所有自定義的key應該實現介面WritableComparable,因為是可序列的並且可比較的。並重載方法 
//反序列化,從流中的二進位制轉換成自定義Key
public void readFields(DataInput in) throws IOException
//序列化,將自定義Key轉化成使用流傳送的二進位制 
public void write(DataOutput out) 
//key的比較,用於map階段和reduce階段的排序 以及用於reduce階段的grouping分組
public int compareTo(IntPair o)
另外新定義的類應該重寫的兩個方法 
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) 
public int hashCode() 
public boolean equals(Object right)  

根據以上步驟下面是實現程式碼:

自定義Key:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

public class SecondSortClass implements WritableComparable<SecondSortClass> {
	/**
	 * 自定義型別的中包含的變數,本例中的變數都是用於排序的變數
	 * 後序的事例中我們還將定義一些其它功能的變數
	 */
	private int first;
	private String second;
	public SecondSortClass() {}
	
	public SecondSortClass(int first, String second) {
		this.first = first;
		this.second = second;
	}
	/**
	 * 反序列化,從流中的二進位制轉換成自定義Key
	 */
	@Override
	public void readFields(DataInput input) throws IOException {
		this.first = input.readInt();
		this.second = input.readUTF();
	}
	/**
	 * 序列化,將自定義Key轉化成使用流傳送的二進位制 
	 */
	@Override
	public void write(DataOutput output) throws IOException {
		output.writeInt(first);
		output.writeUTF(second);
	}
	
	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + first;
		result = prime * result + ((second == null) ? 0 : second.hashCode());
		return result;
	}
	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		SecondSortClass other = (SecondSortClass) obj;
		if (first != other.first)
			return false;
		if (second == null) {
			if (other.second != null)
				return false;
		} else if (!second.equals(other.second))
			return false;
		return true;
	}
	/**
	 * 用於map階段和reduce階段的排序 以及用於reduce階段的grouping分組
	 */
	@Override
	public int compareTo(SecondSortClass o) {
		if(this.first != o.getFirst()) {
			return -(this.first - o.getFirst());
		} else if( !this.second.equals(o.getSecond())) {
			return -this.second.compareTo(o.getSecond());
		} 
		return 0;
	}
	public int getFirst() {
		return first;
	}
	public void setFirst(int first) {
		this.first = first;
	}
	public String getSecond() {
		return second;
	}
	public void setSecond(String second) {
		this.second = second;
	}
}
map階段:
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SecondMapper extends Mapper<LongWritable, Text, SecondSortClass, Text> {

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String line = value.toString().trim();
		if(line.length() > 0) {
			String[] arr = line.split(",");
			if(arr.length == 3) {
				context.write(new SecondSortClass(Integer.valueOf(arr[2]),arr[1]), new Text(arr[1] + "," + arr[2]));
			}
		}
	}
}
reduce階段:
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class SecondReducer extends Reducer<SecondSortClass, Text, NullWritable, Text> {
	int len;


	/**
	 * Map任務啟動的時候呼叫
	 */
	@Override
	protected void setup( Context context)
			throws IOException, InterruptedException {
		/**
		 * 通過context獲取任務啟動時傳入的TopK的K值
		 */
		len = context.getConfiguration().getInt("K", 10);
	}
	@Override
	protected void reduce(SecondSortClass key, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
//		for(Text val: values) {
//			if(len <= 0) {
//				break;
//			}
//			context.write(null, val);
//			len --;
//		}
		if(len > 0) {
			context.write(null, values.iterator().next());
			len --;
		}
	}
}

啟動函式:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class JobMain {
	public static void main(String[] args) throws Exception{
		Configuration configuration = new Configuration();
		/**
		 * 把傳入引數放入Configuration中,map或reduce中可以通過
		 * 獲取Configuration來獲取傳入的引數,這是hadoop傳入引數的
		 * 方式之一
		 */
		configuration.set("K", args[2]);
		Job job = new Job(configuration, "third-sort-job");
		job.setJarByClass(JobMain.class);
		job.setMapperClass(SecondMapper.class);
		job.setMapOutputKeyClass(SecondSortClass.class);
		job.setMapOutputValueClass(Text.class);
		job.setReducerClass(SecondReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		Path outputDir = new Path(args[1]);
		FileSystem fs = FileSystem.get(configuration);
		if(fs.exists(outputDir)) {
			fs.delete(outputDir, true);
		}
		FileOutputFormat.setOutputPath(job, outputDir);
		System.exit(job.waitForCompletion(true)? 0: 1);
	}
}

執行命令:

./hadoop jar mr.jar com.seven.mapreduce.test1.JobMain /input/two /output/two14 3

執行資料:

uid,name,cost
1,mr1,3234
2,mr2,123
3,mr3,9877
4,mr4,348
5,mr5,12345
6,mr6,6646
7,mr7,98
8,mr8,12345

執行結果: