hadoop 二次排序和一個java實現
需要二次排序的原因:mapreduce架構自動對對映器生成的鍵進行排序,即歸約器啟動之前,所有鍵是有序的,但是值是隨機的,二次排序指的是對值進行排序。歸約器輸入形如:,即一個key對應多個值,這些值是無序的,排序後得到有序的值,如下:
其中,S按照升序或者降序排列
歸約器對於二次排序的兩種解決方案:
1.讓歸約器讀取和快取給定鍵的所有值,完成歸約器中排序,特點是不可伸縮,依賴歸約器記憶體
2.使用mapreduce框架對歸約器值排序,方法是建立組合鍵,例如,A是鍵,B和C是值,選擇B作為次鍵,這樣A和B作為組合鍵,day作為值,將排序交給MapReduce框架完成,這樣不用在記憶體中排序,是可伸縮的方案
定製外掛:
1.分割槽器:根據對映器的輸出鍵決定將哪個對映器的輸出傳送到哪個歸約器,其本質是利用一致性hash演算法
2.比較器:按照自然鍵對一個歸約器中的資料分組,程式碼如下:
以year,month,temperature為例,MapReduce框架對於二次排序整體的處理流程是:
1.對映器建立對,其中K是組合鍵,V是temperature的值,組合鍵的部分是自然鍵
2.通過分割槽器外掛,將所有自然鍵傳送給同一個歸約器
3.通過分組比較器,保證溫度按照順序到達歸約器
顯然,MapReduce框架完成了排序,而不用在記憶體中操作
程式碼如下,除了mapper,reducer,主作業流程以外,還有其餘3個檔案,一個分割槽器、一個比較器、一箇中間件類:
DateTemperatureGroupingComparator.java分組比較器:
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class DateTemperatureGroupingComparator extends WritableComparator { public DateTemperatureGroupingComparator() { /* 呼叫父類建構函式 */ super(DateTemperaturePair.class, true); } @Override /* 決定輸出鍵和歸約器的對應關係,保證相同鍵傳送到同一個歸約器 */ public int compare(WritableComparable a, WritableComparable b) { DateTemperaturePair pair1 = (DateTemperaturePair) a; DateTemperaturePair pair2 = (DateTemperaturePair) b; return pair1.getYearMonth().compareTo(pair2.getYearMonth()); } }
DateTemperaturePartitioner.java分割槽器:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class DateTemperaturePartitioner extends
Partitioner<DateTemperaturePair, Text> {
@Override
/* 入參是mapper的輸出鍵和輸出值的型別,是一個String類的內建hash演算法 */
public int getPartition(DateTemperaturePair dataTemperaturePair, Text text,
int i) {
return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i);
}
}
DateTemperaturePair.java中間鍵類
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class DateTemperaturePair
/* java不支援多重繼承,使用implements繼承介面,不同介面之間用逗號隔開 */
implements Writable, WritableComparable<DateTemperaturePair> {
private String yearMonth; //自然鍵
private String day;
protected Integer temperature; //次鍵
/* 用這個方法指出如何對DateTemperaturePair排序 */
public int compareTo(DateTemperaturePair o) {
/* 呼叫String的字串比較方法compareTo */
int compareValue = this.yearMonth.compareTo(o.getYearMonth());
if (compareValue == 0) {
compareValue = temperature.compareTo(o.getTemperature());
}
/* 這樣實現降序排列 */
return -1 * compareValue;
}
/* DataOutput用於將java基本型別轉換成二進位制字元流 */
public void write(DataOutput dataOutput) throws IOException {
Text.writeString(dataOutput, yearMonth);
dataOutput.writeInt(temperature);
}
public void readFields(DataInput dataInput) throws IOException {
this.yearMonth = Text.readString(dataInput);
this.temperature = dataInput.readInt();
}
@Override
public String toString() {
return yearMonth.toString();
}
public String getYearMonth() {
return yearMonth;
}
public void setYearMonth(String text) {
this.yearMonth = text;
}
public void setDay(String day) {
this.day = day;
}
public Integer getTemperature() {
return temperature;
}
public void setTemperature(Integer temperature) {
this.temperature = temperature;
}
}
SecondarySortingMapper.java對映器:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/* 注意輸出鍵的型別是DateTemperaturePair,是自定義的組合鍵 */
public class SecondarySortingMapper extends
Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
String yearMonth = tokens[0] + "-" + tokens[1];
String day = tokens[2];
int temperature = Integer.parseInt(tokens[3]);
DateTemperaturePair reduceKey = new DateTemperaturePair();
reduceKey.setYearMonth(yearMonth);
reduceKey.setDay(day);
reduceKey.setTemperature(temperature);
context.write(reduceKey, new IntWritable(temperature));
}
}
SecondarySortingReducer.java歸約器:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/* 輸入鍵和輸出鍵的型別都是自定義的中間鍵 */
public class SecondarySortingReducer extends
Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> {
@Override
protected void reduce(DateTemperaturePair key,
Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
/* java的字串變數,在修改場景下執行速度和效率比string高 */
StringBuilder sortedTemperatureList = new StringBuilder();
for (IntWritable temperature : values) {
sortedTemperatureList.append(temperature);
sortedTemperatureList.append(",");
}
sortedTemperatureList.deleteCharAt(sortedTemperatureList.length() - 1);
context.write(key, new Text(sortedTemperatureList.toString()));
}
}
SecondarySort.java主作業流程:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SecondarySort extends Configured implements Tool {
public int run(String[] args) throws Exception {
/* 設定作業,指導hadoop獲取jar包 */
Job job = new Job();
job.setJarByClass(SecondarySort.class);
job.setJobName("SecondarySort");
/* 獲取input路徑和output路徑 */
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
/* 設定mapper的輸出鍵和輸出值 */
job.setMapOutputKeyClass(DateTemperaturePair.class);
job.setMapOutputValueClass(IntWritable.class);
/* 設定reducer的輸出鍵和輸出值 */
job.setOutputKeyClass(DateTemperaturePair.class);
job.setOutputValueClass(IntWritable.class);
/* 指定要使用的mapper,reducer,分割槽器,比較器 */
job.setMapperClass(SecondarySortingMapper.class);
job.setReducerClass(SecondarySortingReducer.class);
job.setPartitionerClass(DateTemperaturePartitioner.class);
job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);
boolean status = job.waitForCompletion(true);
return status ? 0 : 1;
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException(
"!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySort"
+ "<input-path> <output-path>");
}
int returnStatus = ToolRunner.run(new SecondarySort(), args);
System.exit(returnStatus);
}
}
輸入檔案:
[[email protected] ~]# hdfs dfs -cat /sample_input.txt
2000,12,04,10
2000,11,01,20
2000,12,02,-20
2000,11,02,30
2000,11,24,-40
2012,12,21,30
2012,12,22,-20
2012,12,23,60
2012,12,24,70
2012,12,25,10
2013,01,22,80
2013,01,23,90
2013,01,24,70
2013,01,20,-10
執行作業命令:
hadoop jar SecondarySort.jar SecondarySort /sample_input.txt output
作業執行結果如下,可以看到已經按照temperature欄位降序排序了:
[[email protected] ~]# hdfs dfs -cat output/*
2013-01 90,80,70,-10
2012-12 70,60,30,10,-20
2000-12 10,-20
2000-11 30,20,-40