1. 程式人生 > >MapReduce 中用於劃分資料的那些函式 以及它們在streaming中的實現

MapReduce 中用於劃分資料的那些函式 以及它們在streaming中的實現

MapReduce中有三個步驟用於劃分大資料集, 給mapper和reducer提供資料

InputSplit

第一個是InputSplit, 它把資料劃分成若干塊提供給mapper

預設情況下是根據資料檔案的block, 來劃分, 一個block對應一個mapper, 優先在block所在的機器上啟動mapper

如果要重構這個 InputSplit 函式的話, 要去 InputFormat 裡重構 getSplits 函式

https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/mapred/InputFormat.html

在streaming中:

-inputformat JavaClassNameOptionalClass you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default
-outputformat JavaClassNameOptionalClass you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default

這兩個引數指定姚世勇inputformat class

Partition

partition用於把結果分配給不同的reducer, 一般繼承自 "org.apache.hadoop.mapreduce.Partitioner"  這個類

Grouping

這個概念比較難理解, 意思是在資料給reducer前再進行一次分組, 一組資料給到同一個reducer執行一次, 他們的key用的是分組中第一個資料的key

https://stackoverflow.com/questions/14728480/what-is-the-use-of-grouping-comparator-in-hadoop-map-reduce

最佳答案中 a-1和a-2因為grouping的關係合併成了 a-1為key的一組資料給reducer處理

那麼在streaming中Partition和Grouping該怎麼處理呢?

在streaming中可以用命令列引數指定Partition的類:

-partitioner JavaClassNameOptionalClass that determines which reduce a key is sent to

也可以用另一種引數結合sort命令來指定:

-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D mapred.text.key.partitioner.options=-k1,2 \

這裡指定了分割符, 並且分割出來前4個field是key, 並用第一和第二個field來做partition

-D mapreduce.partition.keycomparator.options='-k1,2 -k3,3nr -k4,4nr'

linux中的sort命令:

sort -k1 -k2n -k3nr #表示優先根據第一列排序, 再根據第二列排序且第二列是數字,再根據第三列排序它是數字而且要逆序來排

grouping在streaming的模式中沒有相應實現, 但是可以利用partition來代替.

附表:

ParameterOptional/RequiredDescription
-input directoryname or filenameRequiredInput location for mapper
-output directorynameRequiredOutput location for reducer
-mapper executable or JavaClassNameRequiredMapper executable
-reducer executable or JavaClassNameRequiredReducer executable
-file filenameOptionalMake the mapper, reducer, or combiner executable available locally on the compute nodes
-inputformat JavaClassNameOptionalClass you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default
-outputformat JavaClassNameOptionalClass you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default
-partitioner JavaClassNameOptionalClass that determines which reduce a key is sent to
-combiner streamingCommand or JavaClassNameOptionalCombiner executable for map output
-cmdenv name=valueOptionalPass environment variable to streaming commands
-inputreaderOptionalFor backwards-compatibility: specifies a record reader class (instead of an input format class)
-verboseOptionalVerbose output
-lazyOutputOptionalCreate output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write)
-numReduceTasksOptionalSpecify the number of reducers
-mapdebugOptionalScript to call when map task fails
-reducedebugOptionalScript to call when reduce task fails