1. 程式人生 > >十二道MR習題 – 1 – 排序

十二道MR習題 – 1 – 排序

eas string package mapred 什麽 iterable 都是 tput except

題目:

一個文件,大小約為100G。文件的每一行都是一個數字,要求對文件中的所有數字進行排序。

對於這個題目,了解過Hadoop的同學可以笑而不語了。即使用spark實現也是非常簡單的事情。

先說下如何用Hadoop實現。實際上也沒什麽好說的:Map任務逐行讀入數字,而後在Reduce中輸出就可以了,簡單粗暴到令人發指。

看下代碼好了:

package com.zhyea.dev;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException;
public class NumberSort { public static class SplitterMapper extends Mapper<Object, Text, IntWritable, IntWritable> { private static final IntWritable intWritable = new IntWritable(); @Override public void map(Object key, Text value, Context context) {
try { int num = Integer.valueOf(value.toString()); intWritable.set(num); context.write(intWritable, intWritable); } catch (Exception e) { e.printStackTrace(); } } } public static class IntegrateReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { @Override public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) { try { context.write(key, key); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "number-sort"); job.setJarByClass(NumberSort.class); job.setMapperClass(SplitterMapper.class); job.setReducerClass(IntegrateReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

在map方法中,輸出值的Value部分我選擇了一個IntWritable的值。Value值的類型也是可以設置為NullWritable的,不過這樣map任務執行起來會比較慢,雖然reduce任務執行的會快一些,但是終究是得不償失。

在我們的程序裏沒有執行任何排序的動作,但是輸出的結果是有序的,這是因為在shuffle階段已經完成了排序(一次快速排序,一次歸並排序)。

再來看看用spark是如何完成的:

object NumSortJob {


  def main(args: Array[String]): Unit = {
    val inputPath = args(0)
    val outputPath = args(1)
    val conf = new SparkConf().setAppName("Num Sort")
    val sc = new SparkContext(conf)
    val data = sc.hadoopFile[LongWritable, Text, TextInputFormat](inputPath)

    data.map(p => p._2.toString.toInt).distinct().sortBy[Int](p => p).coalesce(1, true).saveAsTextFile(outputPath)
  }

}

spark則需要主動進行排序。即使選擇了使用sortBasedShuffle,它的排序也僅止於mapper端的排序,結果集不一定是有序的。

#########

十二道MR習題 – 1 – 排序