1. 程式人生 > >Hadoop入門案例 全排序之TotalOrderPartitioner工具類+自動取樣

Hadoop入門案例 全排序之TotalOrderPartitioner工具類+自動取樣

程式碼

package com.myhadoop.mapreduce.test;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.InputSampler;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner
; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop
.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * Created by kaishun on 2017/6/10. */ public class TotalOrderSort extends Configured implements Tool{ public static class myMap extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>{ public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String[] split = value.toString
().split("\\s+"); for (int i = 0; i <split.length ; i++) { Text word = new Text(split[i]); context.write(word,new Text("")); } } } public static class myReduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException { context.write(key, new Text("")); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJarByClass(TotalSort.class); job.setJobName("TotalSortTest"); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setNumReduceTasks(3); //因為map和reduce的輸出是同樣的型別,所以輸出一個就可以了 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(myMap.class); job.setReducerClass(myReduce.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 設定分割槽檔案,即取樣後放在的檔案的檔名,不是完整路徑 TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2])); //取樣器:三個引數 /* 第一個引數 freq: 表示來一個樣本,將其作為取樣點的概率。如果樣本數目很大 *第二個引數 numSamples:表示取樣點最大數目為,我這裡設定10代表我的取樣點最大為10,如果超過10,那麼每次有新的取樣點生成時 * ,會刪除原有的一個取樣點,此引數大資料的時候儘量設定多一些 * 第三個引數 maxSplitSampled:表示的是最大的分割槽數:我這裡設定100不會起作用,因為我設定的分割槽只有4個而已 */ InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 10, 100); //把分割槽檔案放在hdfs上,對程式沒什麼效果,方便我們檢視而已 FileInputFormat.addInputPath(job, new Path("/test/sort")); //將取樣點寫入到分割槽檔案中,這個必須要 InputSampler.writePartitionFile(job, sampler); job.setPartitionerClass(TotalOrderPartitioner.class); boolean success = job.waitForCompletion(true); return success ? 0:1; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new TotalSortTest(), args); System.exit(ret); } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

注意的地方

 InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 10, 100);中三個引數要注意
  • 1
InputSampler.Sampler<Text, Text> 只能是Text,Text的型別
  • 1
3. TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));用來給TotalOrderPartitioner初始化賦值,job.setPartitionerClass(TotalOrderPartitioner.class); 進行分割槽,就不需要自己寫分割槽函數了  
  • 1
4. job.setInputFormatClass(KeyValueTextInputFormat.class); 注意裡面是KeyValueTextInputFormat.class,而不是TextInputFormat.class
  • 1
5. 在叢集上,次程式才能體現出來
  • 1

6.由於我這裡,map的輸入和輸出都是用的(Text,Text),所以我只需要設定

job.setOutputKeyClass(Text.class); 
job.setOutputValueClass(Text.class);
  • 1
  • 2

如果不一樣,那麼 應該設定4個,前兩個為map的輸出型別,後兩個為reduce的輸出型別

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);

MapReduce全排序的方法1:

  每個map任務對自己的輸入資料進行排序,但是無法做到全域性排序,需要將資料傳遞到reduce,然後通過reduce進行一次總的排序,但是這樣做的要求是隻能有一個reduce任務來完成。

  並行程度不高,無法發揮分散式計算的特點。

MapReduce全排序的方法2:

  針對方法1的問題,現在介紹方法2來進行改進;

  使用多個partition對map的結果進行分割槽,且分割槽後的結果是有區間的,將多個分割槽結果拼接起來,就是一個連續的全域性排序檔案。

    

  Hadoop自帶的Partitioner的實現有兩種,一種為HashPartitioner, 預設的分割槽方式,計算公式 hash(key)%reducernum,另一種為TotalOrderPartitioner, 為排序作業建立分割槽,分割槽中資料的範圍需要通過分割槽檔案來指定。

  分割槽檔案可以人為建立,如採用等距區間,如果資料分佈不均勻導致作業完成時間受限於個別reduce任務完成時間的影響。

  也可以通過抽樣器,先對資料進行抽樣,根據資料分佈生成分割槽檔案,避免資料傾斜。

這裡實現一個通過隨機抽樣來生成分割槽檔案,然後對資料進行全排序,根據分割槽檔案的範圍分配到不同的reducer中。

示例程式碼:

複製程式碼
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

import java.io.IOException;

/**
 * Created by Edward on 2016/10/4.
 */
public class TotalSort {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //access hdfs's user
        System.setProperty("HADOOP_USER_NAME","root");

        Configuration conf = new Configuration();
        conf.set("mapred.jar", "D:\\MyDemo\\MapReduce\\Sort\\out\\artifacts\\TotalSort\\TotalSort.jar");

        FileSystem fs = FileSystem.get(conf);

        /*RandomSampler 引數說明
        * @param freq Probability with which a key will be chosen.
        * @param numSamples Total number of samples to obtain from all selected splits.
        * @param maxSplitsSampled The maximum number of splits to examine.
        */
        InputSampler.RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 10, 10);

        //設定分割槽檔案, TotalOrderPartitioner必須指定分割槽檔案
        Path partitionFile = new Path( "_partitions");
        TotalOrderPartitioner.setPartitionFile(conf, partitionFile);

        Job job = Job.getInstance(conf);
        job.setJarByClass(TotalSort.class);
        job.setInputFormatClass(KeyValueTextInputFormat.class); //資料檔案預設以\t分割
        job.setMapperClass(Mapper.class);
        job.setReducerClass(Reducer.class);
        job.setNumReduceTasks(4);  //設定reduce任務個數,分割槽檔案以reduce個數為基準,拆分成n段

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setPartitionerClass(TotalOrderPartitioner.class);

        FileInputFormat.addInputPath(job, new Path("/test/sort"));

        Path path = new Path("/test/wc/output");

        if(fs.exists(path))//如果目錄存在,則刪除目錄
        {
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job, path);

        //將隨機抽樣資料寫入分割槽檔案
        InputSampler.writePartitionFile(job, sampler);

        boolean b = job.waitForCompletion(true);
        if(b)
        {
            System.out.println("OK");
        }

    }
}
複製程式碼

測試資料:

複製程式碼
1    1
2    1
3    1
4    1
5    1
6    1
7    1
8    1
9    1
10    2
11    2
12    2
13    2
14    2
15    2
16    2
17    2
18    2
19    2
20    2
...
5999    4
6000    4
6001    4
6002    4
6003    4
6004    4
6005    4
6006    4
6007    4
6008    4
6009    4
6010    4
複製程式碼

抽樣生成的分割槽檔案為:

# hadoop fs -text  /user/root/_partitions

 2673 (null)
 4441 (null)
 5546 (null)

生成的抽樣檔案為sequence file通過 -text開啟檢視

生成的排序結果檔案:

檔案內容:

 hadoop fs -cat /test/wc/output/part-r-00000

複製程式碼
...
2668    4
2669    4
267     3
2670    4
2671    4
2672    4
複製程式碼

hadoop fs -cat /test/wc/output/part-r-00001

複製程式碼
...
4431    4
4432    4
4433    4
4434    4
4435    4
4436    4
4437    4
4438    4
4439    4
444     3
4440    4
複製程式碼

hadoop fs -cat /test/wc/output/part-r-00002

複製程式碼
...
554     3
5540    4
5541    4
5542    4
5543    4
5544    4
5545    4
複製程式碼

hadoop fs -cat /test/wc/output/part-r-00003

複製程式碼
...
99      2
990     3
991     3
992     3
993     3
994     3
995     3
996     3
997     3
998     3
999     3
複製程式碼

測試文字

ba bac
df gh hgg dft dfa dfga df fdaf qqq we fsf aa bb ab
rr
ty ioo zks huawei mingtong jyzt beijing shanghai shenzhen wuhan nanning guilin 
zhejiang hanzhou anhui hefei xiaoshan xiaohao anqian zheli guiyang
  • 1
  • 2
  • 3
  • 4
  • 5

原理分析

利用mapReduce中map到reduce端的shuffle進行排序,MapReduce只能保證各個分割槽內部有序,但不能保證全域性有序,於是我還自定義了分割槽,在map後、shuffle之前,我先將小於c的放在0分割槽,c-f的放在1分割槽,其餘的放在2分割槽,這樣,首先保證了分割槽與分割槽之間是整體有序,然後各個分割槽進行各自的shuffle,使其分割槽內部有序。

程式碼

package com.myhadoop.mapreduce.test;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;

/**
 * Created by kaishun on 2017/6/10.
 */
public class TotalSortTest extends Configured implements Tool{

    public static class myMap extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>{

        public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String[] split = value.toString().split("\\s+");
            for (int i = 0; i <split.length ; i++) {
                Text word = new Text(split[i]);
                context.write(word,new Text(""));
            }
        }
    }
    public static class myReduce extends Reducer<Text,Text,Text,Text>{

        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException
        {
                context.write(key, new Text(""));

        }
    }
    public static class Partition extends Partitioner<Text,Text>{
        @Override
        public int getPartition(Text value1, Text value2, int i) {

            if(value1.toString().compareTo("c")<0){
                return 0;
            }else if(value1.toString().compareTo("f")<0){
                return 1;
            }
            return 2;
        }
    }



    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(TotalSort.class);
        job.setJobName("TotalSortTest");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setPartitionerClass(Partition.class);
        job.setMapperClass(myMap.class);
        job.setReducerClass(myReduce.class);
        job.setNumReduceTasks(3);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean success = job.waitForCompletion(true);
        return success ? 0:1;
    }
    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new TotalSortTest(), args);
        System.exit(ret);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 相關推薦

    Hadoop入門案例 排序TotalOrderPartitioner工具+自動取樣

    程式碼 package com.myhadoop.mapreduce.test; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache

    hdfs使用隨機取樣器進行分區劃分實現排序totalOrderPartitioner,sampler

    問題描述現在有個sequenceFile檔案裡面記錄著年份和溫度,key是年份value是溫度,找出每年的最高氣溫然後按照年份遞增排序。因為reducer預設會對key進行排序,解決辦法有兩種:第一種使用一個reducer,這樣做會造成效能問題,因為所有的key都發往了一臺機

    Android開發中有用工具--Log工具

    util lena 日誌 日誌信息 stat 們的 常常 我們 imp 在開發的過程中。我們常常會使用Log來輸出日誌,幫助我們來調試程序 可是有時候並不能全然滿足我們的須要 ,比方我想知道這個日誌信息是來自於哪一個包 哪一個類 所以我們封裝一個這個Log類。方便我們的

    集合框架Arrays工具的asList()方法的使用

    oar out for -s toa 包含 arr toarray 組元 一:字符串數組轉集合 1 String[] str = { "ds", "df", "da", "dfg" }; 2 List<String> array = new A

    JavaWeb基礎JdbcUtils工具2.0

    數據庫 機會 pro throws password 開源項目 基礎 static 性能 使用c3p0連接池來改版JdbcUtils工具 1. 使用c3p0連接池獲取連接,使代碼更加簡單 1 /** 2 * 使用c3p0連接池做小工具 3 * JdbcUtils

    Java常用(五)集合工具Collections

    可變集合 並發 nts oid element 出現的次數 ole 最小 概述 前言    Java提供了一個操作Set、List和Map等集合的工具類:Collections,該工具類提供了大量方法對集合進行排序、查詢和修改等操作,     還提供了將集合對象置為不可

    java學習筆記DBUtils工具

    return on() 變參 ner where 占位符 使用步驟 args user DBUtils工具類 一.介紹   DBUtils是Apache組織開源的數據庫工具類。 二.使用步驟   ①.創建QueryRunner對象   ②.調用update()方法或者que

    7-Python3從入門到實戰—基礎數據型(字典-Dictionary)

    from ref 不存在 gpo http 必須 ems href int Python從入門到實戰系列——目錄 字典的定義 字典是另一種可變容器模型,且可存儲任意類型對象;使用鍵-值(key-value)存儲,具有極快的查找速度; 字典的每個鍵值(key=>val

    MapReduceJob工具開發

    大數據 Hadoop MapReduce Java [toc] MapReduce之Job工具類開發 在MapReduce程序寫Mapper和Reducer的驅動程序時,有很多代碼都是重復性代碼,因此可以將其提取出來寫成一個工具類,後面再寫MapReduce程序時都會使用這個工具類。 Job

    Java並發CountDownLatch工具

    進行 ole print 對象 exception 線程 獲取 lean CA 一、CountDownLatch工具類介紹 CountDownLatch類是Java並發工具常用的四大工具之一,CountDownLatch允許一個或者多個線程等待

    排序算法工具

    ner nbsp red 並且 param copy tar pre code /** * 排序算法工具類 */ public class GeneratedArray { /** * * 生成隨機長度數組[min,max) *

    java日期工具DATE

     java日期工具類 import java.text.SimpleDateFormat; import java.util.*; public class DateUtil { /** * 獲取SimpleDateFormat * @param parttern 日

    java操作JDBCOracle工具

    /** * JDBC之Oracle工具類 * * @author: Rodge * @time: 2018年10月4日 下午4:06:15 * @version: V1.0.0 */ public class JDBCUtilForOracle { private static fi

    java操作JDBCMySQL工具

    /** * JDBC之MySQL工具類 * * @author: Rodge * @time: 2018年10月4日 下午4:03:42 * @version: V1.0.0 */ public class JDBCUtilForMySQL { private static fin

    Java集合工具

    Collections工具類 collections是集合中的一個工具類,封裝的都是靜態方法。 1.sort(List)方法可對list中的元素進行排序。 2.而實現sort(list,comparator)方法的呼叫,要新建class,實現Comparator介面,並複寫compare

    檔案上傳至阿里雲OssUtil工具的使用

    什麼是Oss? 阿里雲物件儲存服務(Object Storage Service,簡稱OSS)為您提供基於網路的資料存取服務。使用OSS,您可以通過網路隨時儲存和呼叫包括文字、圖片、音訊和視訊等在內的各種非結構化資料檔案。 簡單來說,Oss支援任意型別的檔案遠端儲存(檔案型

    快速建立SpringBoot2.x應用工具自動建立web應用、SpringBoot2.x的依賴預設Maven版本

    快速建立SpringBoot2.x應用之工具類自動建立web應用簡介:使用構建工具自動生成專案基本架構   1、工具自動建立:http://start.spring.io/   2、訪問地址:http://localhost:8080/ 開始下載,解壓,匯入Maven專案 匯入後:

    多執行緒學習筆記六併發工具CountDownLatch和CyclicBarrier

    目錄 簡介 CountDownLatch 示例 實現分析 CountDownLatch與Thread.join() CyclicBarrier 實現分析 CountDownLatch和CyclicBarrier區別 簡介

    Lucene5學習LuceneUtils工具簡單封裝

    週六花了整整一下午,將Lucene5中有關索引的常見操作進行了簡單封裝,廢話不多說,上程式碼: package com.yida.framework.lucene5.util; import java.io.IOException; import java.util.c

    Pinyin4jPinyin4jUtils工具

    import org.junit.Test; import net.sourceforge.pinyin4j.PinyinHelper; import net.sourceforge.pinyin4j.format.HanyuPinyinCaseType; import net.sourceforge.pin