1. 程式人生 > >分別使用Hadoop和Spark實現二次排序

分別使用Hadoop和Spark實現二次排序

零、序(注意本部分與標題無太大關係,可直接翻到第一部分)

  既然沒用為啥會有序?原因不想再開一篇文章,來抒發點什麼感想或者計劃了,就在這裡寫點好了:

  前些日子買了幾本書,打算學習和研究大資料方面的知識,一直因為實習、考試、畢業設計等問題搞得沒有時間,現在進入了寒假,可以安心的學點有用的知識了。

  這篇部落格裡的演算法部分的內容來自《資料演算法:Hadoop/Spark大資料處理技巧》一書,不過書中的程式碼雖然思路正確,但是程式碼不完整,並且只有java部分的程式設計,我在它的基礎上又加入scala部分,當然是在使用Spark的時候寫的scala。

  廢話不多說,進入正題。

一、輸入、期望輸出、思路。

輸入為SecondarySort.txt,內容為:

2000,12,04,10

2000,11,01,20

2000,12,02,-20

2000,11,07,30

2000,11,24,-40

2012,12,21,30

2012,12,22,-20

2012,12,23,60

2012,12,24,70

2012,12,25,10

2013,01,23,90

2013,01,24,70

2013,01,20,-10

意義為:

年,月,日,溫度

期望輸出:

2013-01 90,70,-10

2012-12 70,60,30,10,-20

2000-12 10,-20

2000-11 30,20,-40

意義為:

年-月 溫度1,溫度2,溫度3,……

年-月從上之下降序排列,

溫度從左到右降序排列

思路:

拋棄不需要的代表日的哪一行資料

將年月作為組合鍵(key),比較大小,降序排列

將對應年月(key)的溫度的值(value)進行降序排列和拼接

二、使用Java編寫MapReduce程式實現二次排序

程式碼要實現的類有:

除了常見的SecondarySortingMapper,SecondarySortingReducer,和SecondarySortDriver以外

這裡還多出了兩個個外掛類(DateTemperatureGroupingComparator和DateTemperaturePartioner)和一個自定義型別(DateTemperaturePair)

以下是實現的程式碼(注意以下每個檔案的程式碼段我去掉了包名,所以要使用的話自己加上吧):

SecondarySortDriver.java

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class SecondarySortDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {

        Configuration configuration = getConf();

        Job job = Job.getInstance(configuration, "SecondarySort");

        job.setJarByClass(SecondarySortDriver.class);

        job.setJobName("SecondarySort");

        Path inputPath = new Path(args[0]);

        Path outputPath = new Path(args[1]);

        FileInputFormat.setInputPaths(job, inputPath);

        FileOutputFormat.setOutputPath(job, outputPath);

        // 設定map輸出key value格式

        job.setMapOutputKeyClass(DateTemperaturePair.class);

        job.setMapOutputValueClass(IntWritable.class);

        // 設定reduce輸出key value格式

        job.setOutputKeyClass(DateTemperaturePair.class);

        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(SecondarySortingMapper.class);

        job.setReducerClass(SecondarySortingReducer.class);

        job.setPartitionerClass(DateTemperaturePartitioner.class);

        job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);

        boolean status = job.waitForCompletion(true);

        return status ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

        if (args.length != 2) {

            throw new IllegalArgumentException(

                    "!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySortDriver"

                            + "<input-path> <output-path>");

        }

        int returnStatus = ToolRunner.run(new SecondarySortDriver(), args);

        System.exit(returnStatus);

    }

}

DateTemperaturePair.java

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

public class DateTemperaturePair implements Writable,

        WritableComparable<DateTemperaturePair> {

    private String yearMonth;

    private String day;

    protected Integer temperature;

    public int compareTo(DateTemperaturePair o) {

        int compareValue = this.yearMonth.compareTo(o.getYearMonth());

        if (compareValue == 0) {

            compareValue = temperature.compareTo(o.getTemperature());

        }

        return -1 * compareValue;

    }

    public void write(DataOutput dataOutput) throws IOException {

        Text.writeString(dataOutput, yearMonth);

        dataOutput.writeInt(temperature);

    }

    public void readFields(DataInput dataInput) throws IOException {

        this.yearMonth = Text.readString(dataInput);

        this.temperature = dataInput.readInt();

    }

    @Override

    public String toString() {

        return yearMonth.toString();

    }

    public String getYearMonth() {

        return yearMonth;

    }

    public void setYearMonth(String text) {

        this.yearMonth = text;

    }

    public String getDay() {

        return day;

    }

    public void setDay(String day) {

        this.day = day;

    }

    public Integer getTemperature() {

        return temperature;

    }

    public void setTemperature(Integer temperature) {

        this.temperature = temperature;

    }

}

SecondarySortingMapper.java

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SecondarySortingMapper extends

        Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> {

    @Override

    protected void map(LongWritable key, Text value, Context context)

            throws IOException, InterruptedException {

        String[] tokens = value.toString().split(",");

        // YYYY = tokens[0]

        // MM = tokens[1]

        // DD = tokens[2]

        // temperature = tokens[3]

        String yearMonth = tokens[0] + "-" + tokens[1];

        String day = tokens[2];

        int temperature = Integer.parseInt(tokens[3]);

        DateTemperaturePair reduceKey = new DateTemperaturePair();

        reduceKey.setYearMonth(yearMonth);

        reduceKey.setDay(day);

        reduceKey.setTemperature(temperature);

        context.write(reduceKey, new IntWritable(temperature));

    }

}

DateTemperaturePartioner.java

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class DateTemperaturePartitioner extends

        Partitioner<DateTemperaturePair, Text> {

    @Override

    public int getPartition(DateTemperaturePair dataTemperaturePair, Text text,

            int i) {

        return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i);

    }

}

DateTemperatureGroupingComparator.java

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

public class DateTemperatureGroupingComparator extends WritableComparator {

    public DateTemperatureGroupingComparator() {

        super(DateTemperaturePair.class, true);

    }

    @Override

    public int compare(WritableComparable a, WritableComparable b) {

        DateTemperaturePair pair1 = (DateTemperaturePair) a;

        DateTemperaturePair pair2 = (DateTemperaturePair) b;

        return pair1.getYearMonth().compareTo(pair2.getYearMonth());

    }

}

SecondarySortingReducer.java

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SecondarySortingReducer extends

        Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> {

    @Override

    protected void reduce(DateTemperaturePair key,

            Iterable<IntWritable> values, Context context) throws IOException,

            InterruptedException {

        StringBuilder sortedTemperatureList = new StringBuilder();

        for (IntWritable temperature : values) {

            sortedTemperatureList.append(temperature);

            sortedTemperatureList.append(",");

        }

        sortedTemperatureList.deleteCharAt(sortedTemperatureList.length()-1);

        context.write(key, new Text(sortedTemperatureList.toString()));

    }

}

三、使用scala編寫Spark程式實現二次排序

這個程式碼想必就比較簡潔了。如下:

SecondarySort.scala

package spark

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions

import org.apache.spark.rdd.RDD.rddToPairRDDFunctions

object SecondarySort {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName(" Secondary Sort ")

    .setMaster("local")

    var sc = new SparkContext(conf)

    sc.setLogLevel("Warn")

    //val file = sc.textFile("hdfs://localhost:9000/Spark/SecondarySort/Input/SecondarySort2.txt")

     val file = sc.textFile("e:\\SecondarySort.txt")

    val rdd = file.map(line => line.split(","))

    .map(x=>((x(0),x(1)),x(3))).groupByKey().sortByKey(false)

    .map(x => (x._1._1+"-"+x._1._2,x._2.toList.sortWith(_>_)))

    rdd.foreach(

        x=>{

            val buf = new StringBuilder()

            for(a <- x._2){

              buf.append(a)

              buf.append(",")

              }

            buf.deleteCharAt(buf.length()-1)

            println(x._1+" "+buf.toString())

        })

    sc.stop()

  }

}