1. 程式人生 > >Hadoop和Spark分別實現二次排序

Hadoop和Spark分別實現二次排序

將下列資料中每個分割槽中的第一列順序排列,第二列倒序排列。

Text 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2021
5051
5052
5053
5054
6051
6053
6052
6056
6057
7058
6061
7054
7055
7056
7057
7058
1055
8067
9043
3044
5067
5087
4077
2011
1055
2084
7045
9055
9144
7844
7632
8823
9134
5611
3323
2411

使用Hadoop

寫法一,參考《Hadoop權威指南》改寫:

IntPair類

 Java Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.hadoop.mr.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;

publicclass IntPair implements WritableComparable<IntPair> {
    
private IntWritable first;
    
private IntWritable second;

    
public
void set(IntWritable first, IntWritable second) {
        
this.first = first;
        
this.second = second;
    }
    
//注意:需要新增無參的構造方法,否則反射時會報錯。
public IntPair() {
        set(
new IntWritable(), new IntWritable());
    }
    
public IntPair(int first, int second) {
        set(
new IntWritable(first), new IntWritable(second));
    }

    
public IntPair(IntWritable first, IntWritable second) {
        set(first, second);
    }

    
public IntWritable getFirst() {
        
return first;
    }

    
publicvoid setFirst(IntWritable first) {
        
this.first = first;
    }

    
public IntWritable getSecond() {
        
return second;
    }

    
publicvoid setSecond(IntWritable second) {
        
this.second = second;
    }

    @Override
    
publicvoid write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    @Override
    
publicvoid readFields(DataInput inthrows IOException {
        first.readFields(
in);
        second.readFields(
in);
    }

    @Override
    
publicint hashCode() {
        
return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    
publicboolean equals(Object o) {
        
if (o instanceof IntPair) {
            IntPair tp = (IntPair) o;
            
return first.equals(tp.first) && second.equals(tp.second);
        }
        
return false;
    }

    @Override
    
publicString toString() {
        
return first + "\t" + second;
    }

    @Override
    
publicint compareTo(IntPair tp) {
        
int cmp = first.compareTo(tp.first);
        
if (cmp != 0) {
            
return cmp;
        }
        
return second.compareTo(tp.second);
    }
}

Secondary類

 Java Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package com.hadoop.mr.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass SecondarySort {
    
staticclass TheMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> {
        @Override
        
protectedvoid map(LongWritable key, Text value, Context context)
                
throws IOException, InterruptedException {
            
String[] fields = value.toString().split("\t");
            
int field1 = Integer.parseInt(fields[0]);
            
int field2 = Integer.parseInt(fields[1]); 
            context.write(
new IntPair(field1,field2), NullWritable.get());
        }
    }
    
    
staticclass TheReducer extends Reducer<IntPair, NullWritable,IntPair, NullWritable> {
        
//private static final Text SEPARATOR = new Text("------------------------------------------------");
        @Override
        
protectedvoid reduce(IntPair key, Iterable<NullWritable> values, Context context)
                
throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

    
publicstaticclass FirstPartitioner extends Partitioner<IntPair, NullWritable> {

        @Override
        
publicint getPartition(IntPair key, NullWritable value,
                
int numPartitions) {
            
return Math.abs(key.getFirst().get()) % numPartitions;
        }
        
    }
    
    
//如果不新增這個類,預設第一列和第二列都是升序排序的。這個類的作用是使第一列升序排序,第二列降序排序
publicstaticclass KeyComparator extends WritableComparator {
        
//無參構造器必須加上,否則報錯。
protected KeyComparator() {
            
super(IntPair.class, true);
        }
        @Override
        
publicint compare(WritableComparable a, WritableComparable b) {
            IntPair ip1 = (IntPair) a;
            IntPair ip2 = (IntPair) b;
            
//第一列按升序排序
int cmp = ip1.getFirst().compareTo(ip2.getFirst());
            
if (cmp != 0) {
                
return cmp;
            }
            
//在第一列相等的情況下,第二列按倒序排序
return -ip1.getSecond().compareTo(ip2.getSecond());
        }
    }
    
/*  public static class GroupComparator extends WritableComparator {
        //無參構造器必須加上,否則報錯。
        protected GroupComparator() {
            super(IntPair.class, true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            IntPair ip1 = (IntPair) a;
            IntPair ip2 = (IntPair) b;
            return ip1.getFirst().compareTo(ip2.getFirst());
        }
    }*/
//入口程式
publicstaticvoid main(String[] args) throws Exception {
        Configuration conf = 
new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SecondarySort.
class);
        
//設定Mapper的相關屬性
        job.setMapperClass(TheMapper.class);
        
//當Mapper中的輸出的key和value的型別和Reduce輸出的key和value的型別相同時,以下兩句可以省略。
//job.setMapOutputKeyClass(IntPair.class);
//job.setMapOutputValueClass(NullWritable.class);
    
        FileInputFormat.setInputPaths(job, 
new Path(args[0]));
        
        
//設定分割槽的相關屬性
        job.setPartitionerClass(FirstPartitioner.class);
        
//在map中對key進行排序
        job.setSortComparatorClass(KeyComparator.class);
        
//job.setGroupingComparatorClass(GroupComparator.class);
//設定Reducer的相關屬性
        job.setReducerClass(TheReducer.class);
        job.setOutputKeyClass(IntPair.
class);
        job.setOutputValueClass(NullWritable.
class);
        FileOutputFormat.setOutputPath(job, 
new Path(args[1]));
        
//設定Reducer數量
int reduceNum = 1;
        
if(args.length >= 3 && args[2] != null){
            reduceNum = Integer.parseInt(args[
2]);
        }
        job.setNumReduceTasks(reduceNum);
        job.waitForCompletion(true);
    }
    
}

打成secsort.jar包,從hdfs上的/test/secsortdata獲取資料檔案,mapreduce輸出目錄是/test/secsortresult8,啟動1個reduce:

hadoop jar secsort.jar /test/secsortdata /test/secsortresult8 1

測試結果:


可以發現第一列(key)是順序排列的,對於相同key的values,是倒序排列的。

如果使用兩個reduce會怎樣?

hadoop jar secsort.jar /test/secsortdata /test/secsortresult9 2

測試結果:



那如果將程式碼中的GroupComparator的註釋以及第100行的註釋去掉,結果會怎麼樣呢?


如上圖,它只會輸出每個key中的第一個value值。

寫法二,參考自http://www.superwu.cn/2013/08/18/492/:

程式碼:

IntPair類可以改寫成:

 Java Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.hadoop.mr.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

publicclass IntPair implements WritableComparable<IntPair> {
    
privateint first = 0;
    
privateint second = 0;

    
publicvoid set(int first, int second) {
        
this.first = first;
        
this.second = second;
    }

    
// 注意:需要新增無參的構造方法,否則反射時會報錯。
public IntPair() {

    }

    
public IntPair(int first, int second) {
        set(first, second);
    }

    
publicint getFirst() {
        
return first;
    }

    
publicvoid setFirst(int first) {
        
this.first = first;
    }

    
publicint getSecond() {
        
return second;
    }

    
publicvoid setSecond(int second) {
        
this.second = second;
    }

    @Override
    
publicvoid write(DataOutput out) throws IOException {
        out.write(first);
        out.write(second);
    }

    @Override
    
publicvoid readFields(DataInput inthrows IOException {
        first = 
in.readInt();
        second = 
in.readInt();
    }

    @Override
    
publicint hashCode() {
        
return first + "".hashCode() + second + "".hashCode();
    }

    @Override
    
publicboolean equals(Object right) {
        
if (right instanceof IntPair) {
            IntPair r = (IntPair) right;
            
return r.getFirst() == first && r.getSecond() == second;
        } 
else {
            
return false;
        }
    }

    
// 這裡的程式碼是關鍵,因為對key排序時,呼叫的就是這個compareTo方法
    @Override
    
publicint compareTo(IntPair o) {
        
if (first != o.getFirst()) {
            
return first - o.getFirst();
        } 
elseif (second != o.getSecond()) {
            
return o.getSecond() - second;
        } 
else {
            
return0;
        }
    }
}

Secondary類可以改寫成:

 Java Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package com.hadoop.mr.sort;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass SecondarySort {
    
staticclass TheMapper extends
            Mapper<LongWritable, Text, IntPair, IntWritable> {
        
privatefinal IntPair outKey = new IntPair();
        
privatefinal IntWritable outValue = new IntWritable();

        @Override
        
protectedvoid map(LongWritable key, Text value, Context context)
                
throws IOException, InterruptedException {
            
// 預設以” \t\n\r\f”(前有一個空格,引號不是)為分割符
            StringTokenizer itr = new StringTokenizer(value.toString(), "\t");
            
int first = 0;
            
int second = 0;
            
if (itr.hasMoreTokens()) {
                first = Integer.parseInt(itr.nextToken());
                
if (itr.hasMoreTokens()) {
                    second = Integer.parseInt(itr.nextToken());
                }
                outKey.set(first, second);
                outValue.set(second);
                context.write(outKey, outValue);
            }
        }
    }

    
staticclass TheReducer extends
            Reducer<IntPair, IntWritable, Text, IntWritable> {
        
privatestaticfinal Text SEPARATOR = new Text("------------------------------------------------");
        
privatefinal Text first = new Text();
        @Override
        
protectedvoid reduce(IntPair inKey, Iterable<IntWritable> inValues, Context context)
                
throws IOException, InterruptedException {
            first.set(Integer.toString(inKey.getFirst()));
            
for(IntWritable value: inValues) {
              context.write(first, value);
            }
            context.write(SEPARATOR, null);
        }

    }

    
publicstaticclass FirstPartitioner extends
            Partitioner<IntPair, IntWritable> {
        @Override
        
publicint getPartition(IntPair key, IntWritable value,int numPartitions) {
            
return Math.abs(key.getFirst()* 127) % numPartitions;
        }
    }

    
/**
     * 在分組比較的時候,只比較原來的key,而不是組合key。
     */
publicstaticclass GroupComparator implements RawComparator<IntPair> {
      @Override
      
publicint compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        
return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8);
      }
      @Override
      
publicint compare(IntPair o1, IntPair o2) {
        
int first1 = o1.getFirst();
        
int first2 = o2.getFirst();
        
return first1 - first2;
      }
    }
    
    
// 入口程式
publicstaticvoid main(String[] args) throws Exception {
        Configuration conf = 
new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SecondarySort.
class);
        
// 設定Mapper的相關屬性
        job.setMapperClass(TheMapper.class);
        
// 當Mapper中的輸出的key和value的型別和Reduce輸出的key和value的型別相同時,以下兩句可以省略。
        job.setMapOutputKeyClass(IntPair.class);
        job.setMapOutputValueClass(IntWritable.
class);

        FileInputFormat.setInputPaths(job, 
new Path(args[0]));
        
// 設定分割槽的相關屬性
        job.setPartitionerClass(FirstPartitioner.class);
        job.setGroupingComparatorClass(GroupComparator.
class);
        
// 設定Reducer的相關屬性
        job.setReducerClass(TheReducer.class);
        job.setOutputKeyClass(Text.
class);
        job.setOutputValueClass(IntWritable.
class);
        FileOutputFormat.setOutputPath(job, 
new Path(args[1]));
        
// 設定Reducer數量
int reduceNum = 1;
        
if (args.length >= 3 && args[2] != null) {
            reduceNum = Integer.parseInt(args[
2]);
        }
        job.setNumReduceTasks(reduceNum);
        job.waitForCompletion(true);
    }
}

測試結果:

 hdfs dfs -cat /test/secsortresult18/part-r-*

 Text  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
10      55
10      55
------------------------------------------------
20      84
20      21
20      11
------------------------------------------------
24      11
------------------------------------------------
30      44
------------------------------------------------
40      77
------------------------------------------------
50      87
50      67
50      54
50      53
50      52
50      51
------------------------------------------------
56      11
------------------------------------------------
60      61
60      57
60      56
60      53
60      52
60      51
------------------------------------------------
70      58
70      58
70      57
70      56
70      55
70      54
70      45
------------------------------------------------
76      32
------------------------------------------------
78      44
------------------------------------------------
80      67
------------------------------------------------
88      23
------------------------------------------------
90      55
90      43
------------------------------------------------
33      23
------------------------------------------------
91      44
91      34
------------------------------------------------

使用Spark來實現二次排序

 Scala Code 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.spark.secondApp
import org.apache.spark.{SparkContext, SparkConf}

object SecondarySort {
  
def main(args: Array[String]) {
    val conf = 
new SparkConf().setAppName(" Secondary Sort ").setMaster("local")
    val sc = 
new SparkContext(conf)
    val file = sc.textFile(
"hdfs://worker02:9000/test/secsortdata")
    val rdd = file.map(line => line.split(
"\t")).
      map(x => (x(
0),x(1))).groupByKey().
      sortByKey(true).map(x => (x._1,x._2.toList.sortWith(_>_)))
    val rdd2 = rdd.flatMap{
      x =>
      val len = x._2.length
      val array = 
new Array[(String,String)](len)
      
for(i <- 0 until len) {
        array(i) = (x._1,x._2(i))
      }
      array  
    }
    sc.stop()
  }
}

將8~12行復制到spark-shell中執行後,再使用rdd2.collect,結果如下:


上圖中第一列升序排列,第二列降序排列。

Hadoop實現二次排序需要近200行程式碼,而Spark只需要20多行程式碼。

原文連結:http://blog.csdn.net/u014729236/article/details/46327335

相關推薦

HadoopSpark分別實現排序

將下列資料中每個分割槽中的第一列順序排列,第二列倒序排列。 Text  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 2021 5051

Spark:Java實現 排序

測試資料 1 5 2 4 3 6 1 3 2 1 輸出結果 1 3 1 5 2 1 2 4 3 6 實現思路: 1.實現自定義的key,要實現Ordered介面和Serializable介面,在key中實現自己對多個列的排序演算法 2.將包含文

分別使用HadoopSpark實現排序

零、序(注意本部分與標題無太大關係,可直接翻到第一部分)   既然沒用為啥會有序?原因不想再開一篇文章,來抒發點什麼感想或者計劃了,就在這裡寫點好了:   前些日子買了幾本書,打算學習和研究大資料方面的知識,一直因為實習、考試、畢業設計等問題搞得沒有時間,現在進入了寒

Spark實現排序

1、HDFS檔案說明 檔案為普通的文字檔案,無壓縮,\001分割,共3列,一次為province_id,city_id,city_uv需要按照province_id升序,city_uv降序操作2、程式碼var data = sc.textFile("/home/hdfs/te

大資料技術學習筆記之Hadoop框架基礎5-Hadoop高階特性HA及排序思想

一、回顧     -》shuffle流程         -》input:讀取mapreduce輸入的          &nbs

結合案例講解MapReduce重要知識點 ------- 使用自定義MapReduce資料型別實現排序

自定義資料型別SSData import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableCompa

43.top10熱門品類之使用Scala實現排序

本文為《Spark大型電商專案實戰》 系列文章之一,主要介紹使用Scala實現二次排序。 程式碼實現 在Scala IDE中的包com.erik.sparkproject中建立SortKey.sca

Hadoop之MapReduce自定義排序流程例項詳解

一、概述 MapReduce框架對處理結果的輸出會根據key值進行預設的排序,這個預設排序可以滿足一部分需求,但是也是十分有限的。在我們實際的需求當中,往往有要對reduce輸出結果進行二次排序的需求。對於二次排序的實現,網路上已經有很多人分享過了,但是對二次排序的實現的

hadoop 排序一個java實現

需要二次排序的原因:mapreduce架構自動對對映器生成的鍵進行排序,即歸約器啟動之前,所有鍵是有序的,但是值是隨機的,二次排序指的是對值進行排序。歸約器輸入形如:,即一個key對應多個值,這些值是無序的,排序後得到有序的值,如下: 其中,S按照升序或者降序排列

Python Hadoop Mapreduce 實現Hadoop Streaming分組排序

需求:公司給到一份全國各門店銷售資料,要求:1.按門店市場分類,將同一市場的門店放到一起;2.將各家門店按銷售額從大到小,再按利潤從大到小排列 一 需求一:按市場對門店進行分組 分組(partition) Hadoop streaming框架預設情況下會以’/t

hadoop排序的原理實現

預設情況下,Map輸出的結果會對Key進行預設的排序,但是有時候需要對Key排序的同時還需要對Value進行排序,這時候就要用到二次排序了。下面我們來說說二次排序 1、二次排序原理   我們把二次排序分為以下幾個階段   Map起始階段     在Map階段,使用jo

一起學Hadoop——排序演算法的實現

二次排序,從字面上可以理解為在對key排序的基礎上對key所對應的值value排序,也叫輔助排序。一般情況下,MapReduce框架只對key排序,而不對key所對應的值排序,因此value的排序經常是不固定的。但是我們經常會遇到同時對key和value排序的需求,例如Hadoop權威指南中的求一年的高高氣溫

Hadoop MapReduce排序演算法與實現之演算法解析

MapReduce二次排序的原理     1.在Mapper階段,會通過inputFormat的getSplits來把資料集分割成split public abstract class Input

hadoop排序 (Map/Reduce中分割槽分組的問題)

1.二次排序概念:首先按照第一欄位排序,然後再對第一欄位相同的行按照第二欄位排序,注意不能破壞第一次排序的結果 。如: 輸入檔案:20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 

《資料演算法-Hadoop/Spark大資料處理技巧》讀書筆記(一)——排序

寫在前面: 在做直播的時候有同學問Spark不是用Scala語言作為開發語言麼,的確是的,從網上查資料的話也會看到大把大把的用Scala編寫的Spark程式,但是仔細看就會發現這些用Scala寫的文章

spark學習記錄(七、排序分組取TopN問題)

1.二次排序 例題:將兩列數字按第一列升序,如果第一列相同,則第二列升序排列 資料檔案:https://download.csdn.net/download/qq_33283652/10894807 將資料封裝成物件,對物件進行排序,然後取出value public class Se

Spark 排序自定義key 實現(Java)

楔子 spark java版本的二次排序 實現 資料如下 2::4 2::10 3::6 1::5 按照 第一列 和第二列 倒敘排列 實現如下的結果 3::6 2::10 2::4 1::5 demo GitHub 位置 的 Second

Hadoop鏈式MapReduce、多維排序、倒排索引、自連線演算法、排序、Join效能優化、處理員工資訊Join實戰、URL流量分析、TopN及其排序、求平均值最大最小值、資料清洗ETL、分析氣

Hadoop Mapreduce 演算法彙總  第52課:Hadoop鏈式MapReduce程式設計實戰...1 第51課:Hadoop MapReduce多維排序解析與實戰...2 第50課:HadoopMapReduce倒排索引解析與實戰...3 第49課:Hado

MapReduce排序原理實現

/** * 自己定義的key類應該實現WritableComparable介面 */ public class IntPair implements WritableComparable<IntPair>{ int first;//第一個成員變數 int second;//第二個成員變數 p

Hadoop 排序實現

業務場景:通常情況下,在MR操作中到達Reduce中的key值都是按照指定的規則進行排序,在單一key的情況下一切都進行的很自然,直到我們要求資料不再單純的按key進行排序,以如下資料舉例: Key   ->      value: 100  ->      2