1. 程式人生 > >用hadoop streaming 來執行mapreduce的指令碼

用hadoop streaming 來執行mapreduce的指令碼

2013-02-01

周海漢/文

2013.2.1

http://abloz.com

tom white的《hadoop the_definitive_guide 3nd edition》附錄C裡面講到用streaming方式來處理氣象的原始資料。由於氣象原始資料是小檔案壓縮成的gz2檔案,需要將檔案解壓,合併成一個大檔案,再按年份壓縮存放。這樣便於後續的mapreduce處理。

white用的是streaming執行bash指令碼 load_ncdc_map.sh。該指令碼將NLineInputFormat格式的文字檔案作為輸入,內容是一個個存放在S3中的氣象資料。解壓,合併,上傳到hadoop。

#!/usr/bin/env bash
# NLineInputFormat gives a single line: key is offset, value is S3 URI read offset s3file # Retrieve file from S3 to local disk echo "reporter:status:Retrieving $s3file" >&2 $HADOOP_INSTALL/bin/hadoop fs -get $s3file . # Un-bzip and un-tar the local file target=`basename $s3file .tar.bz2` mkdir -p
$target echo "reporter:status:Un-tarring $s3file to $target" >&2 tar jxf `basename $s3file` -C $target # Un-gzip each station file and concat into one file echo "reporter:status:Un-gzipping $target" >&2 for file in $target/*/* do gunzip -c $file >> $target.all echo "reporter:status:Processed
$file" >&2 done # Put gzipped version into HDFS echo "reporter:status:Gzipping $target and putting in HDFS" >&2 gzip -c $target.all | $HADOOP_INSTALL/bin/hadoop fs -put - gz/$target.gz

執行方式:

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar 
-D mapred.reduce.tasks=0 
-D mapred.map.tasks.speculative.execution=false 
-D mapred.task.timeout=12000000 
-input ncdc_files.txt 
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat 
-output output 
-mapper load_ncdc_map.sh 
-file load_ncdc_map.sh

其中ncdc_files.txt格式:

s3n://hadoopbook/ncdc/raw/isd-1901.tar.bz2
s3n://hadoopbook/ncdc/raw/isd-1902.tar.bz2
...

可見streaming方式的mapreduce十分強大,可以執行指令碼和第三方可執行程式,讀取輸入檔案後標準輸入作為可執行檔案輸入,可執行檔案的輸出作為標準輸出。

仿其原理,我們可以做一個bash指令碼來統計或排序大檔案。

要統計排序的文字:README.txt

[[email protected] ~]$ ls -l README.txt
-rw-r--r-- 1 zhouhh zhouhh 1399 Feb 1 10:53 README.txt

[[email protected] ~]$ wc README.txt
34 182 1399 README.txt
[[email protected] ~]$ hadoop fs -ls
Found 3 items
-rw-r--r-- 2 zhouhh supergroup 9358 2013-01-10 17:52 /user/zhouhh/fsimage
drwxr-xr-x - zhouhh supergroup 0 2013-02-01 10:30 /user/zhouhh/gz
-rw-r--r-- 2 zhouhh supergroup 65 2012-09-26 14:10 /user/zhouhh/test中文.txt
[[email protected] ~]$ hadoop fs -put README.txt .
[[email protected] ~]$ hadoop fs -ls
Found 4 items
-rw-r--r-- 2 zhouhh supergroup 1399 2013-02-01 10:56 /user/zhouhh/README.txt
-rw-r--r-- 2 zhouhh supergroup 9358 2013-01-10 17:52 /user/zhouhh/fsimage
drwxr-xr-x - zhouhh supergroup 0 2013-02-01 10:30 /user/zhouhh/gz
-rw-r--r-- 2 zhouhh supergroup 65 2012-09-26 14:10 /user/zhouhh/test中文.txt
[[email protected] ~]$ hadoop fs -ls README.txt
Found 1 items
-rw-r--r-- 2 zhouhh supergroup 1399 2013-02-01 10:56 /user/zhouhh/README.txt

排序:

[[email protected] ~]$ hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input README.txt -output wordcount1 -mapper /bin/cat -reducer /bin/sort
[[email protected] ~]$ hadoop fs -ls wordcount/part*
Found 1 items
-rw-r--r-- 2 zhouhh supergroup 1433 2013-02-01 11:20 /user/zhouhh/wordcount/part-00000

統計行數,字數

[[email protected] ~]$ hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input README.txt -output wordcount1 -mapper /bin/cat -reducer /usr/bin/wc

[[email protected] ~]$ hadoop fs -cat wordcount1/p*
34 182 1433

這裡可以發現一個問題,streaming會給輸入檔案的換行0a增加09位元組(tab),變成090a,所以統計檔案的位元組數會是原檔案位元組數+行數,並且檔案內容也改變了。 該問題導致sort後文件改變。用diff比較每行都多了一個09(tab)位元組。

原理: mapper和reducer都是可執行檔案,它們從標準輸入讀入資料(一行一行讀), 並把計算結果發給標準輸出。Streaming工具會建立一個Map/Reduce作業, 並把它傳送給合適的叢集,同時監視這個作業的整個執行過程。

如果一個可執行檔案被用於mapper,則在mapper初始化時, 每一個mapper任務會把這個可執行檔案作為一個單獨的程序啟動。 mapper任務執行時,它把輸入切分成行並把每一行提供給可執行檔案程序的標準輸入。 同時,mapper收集可執行檔案程序標準輸出的內容,並把收到的每一行內容轉化成key/value對,作為mapper的輸出。 預設情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。 如果沒有tab,整行作為key值,value值為null。這可以定製。

如果一個可執行檔案被用於reducer,每個reducer任務會把這個可執行檔案作為一個單獨的程序啟動。 Reducer任務執行時,它把輸入切分成行並把每一行提供給可執行檔案程序的標準輸入。 同時,reducer收集可執行檔案程序標準輸出的內容,並把每一行內容轉化成key/value對,作為reducer的輸出。 預設情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。這可以定製。

對於需要複製到叢集其他機器的檔案,用-file filename 指定。

這是Map/Reduce框架和streaming mapper/reducer之間的基本通訊協議。

我們可以看到,原檔案和streaming後的檔案不同:

part of the two file of hex code: sort README.txt :

原始檔:

0000000: 0a0a 0a0a 0a0a 0a61 6c67 6f72 6974 686d  .......algorithm
0000010: 732e 2020 5468 6520 666f 726d 2061 6e64  s.  The form and
0000020: 206d 616e 6e65 7220 6f66 2074 6869 7320   manner of this
0000030: 4170 6163 6865 2053 6f66 7477 6172 6520  Apache Software
0000040: 466f 756e 6461 7469 6f6e 0a61 6e64 206f  Foundation.and o
0000050: 7572 2077 696b 692c 2061 743a 0a62 7920  ur wiki, at:.by
0000060: 6d6f 7274 6261 792e 6f72 672e 0a63 6865  mortbay.org..che
0000070: 636b 2079 6f75 7220 636f 756e 7472 7927  ck your country'

streaming README.txt and reduce sort:

0000000: 090a 090a 090a 090a 090a 090a 090a 616c  ..............al
0000010: 676f 7269 7468 6d73 2e20 2054 6865 2066  gorithms.  The f
0000020: 6f72 6d20 616e 6420 6d61 6e6e 6572 206f  orm and manner o
0000030: 6620 7468 6973 2041 7061 6368 6520 536f  f this Apache So
0000040: 6674 7761 7265 2046 6f75 6e64 6174 696f  ftware Foundatio
0000050: 6e09 0a61 6e64 206f 7572 2077 696b 692c  n..and our wiki,

這是因為streaming將每一行當成key+tab+value,而value為空。 參考: http://hadoop.apache.org/docs/r0.19.2/cn/streaming.html

如非註明轉載, 均為原創. 本站遵循知識共享CC協議,轉載請註明來源