Hadoop Streaming 做大資料處理詳解
--------------------------------------------------------------------------
以下內容摘自寒小陽老師大資料課程內容
--------------------------------------------------------------------------
Hadoop Streaming
Hadoop streaming是Hadoop的一個工具, 它幫助使用者建立和執行一類特殊的map/reduce作業, 這些特殊的map/reduce作業是由一些可執行檔案或指令碼檔案充當mapper或者reducer。例如在hadoop環境下的命令列可以執行:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper /bin/cat \
-reducer /bin/wc
看不懂彆著急,咱們馬上來分析一下上述的程式碼。
Streaming工作原理
在上面的例子裡,mapper和reducer都是可執行檔案,它們從標準輸入讀入資料(一行一行讀), 並把計算結果發給標準輸出。Streaming工具會建立一個Map/Reduce作業, 並把它傳送給合適的叢集,同時監視這個作業的整個執行過程。
如果一個可執行檔案被用於mapper,則在mapper初始化時, 每一個mapper任務會把這個可執行檔案作為一個單獨的程序啟動。
mapper任務執行時,它把輸入切分成行並把每一行提供給可執行檔案程序的標準輸入。 同時,mapper收集可執行檔案程序標準輸出的內容,並把收到的每一行內容轉化成key/value對,作為mapper的輸出。
預設情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。 如果沒有tab,整行作為key值,value值為null。不過,這可以定製,在下文中將會討論如何自定義key和value的切分方式。
如果一個可執行檔案被用於reducer,每個reducer任務會把這個可執行檔案作為一個單獨的程序啟動。
Reducer任務執行時,它把輸入切分成行並把每一行提供給可執行檔案程序的標準輸入。 同時,reducer收集可執行檔案程序標準輸出的內容,並把每一行內容轉化成key/value對,作為reducer的輸出。
預設情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。在下文中將會討論如何自定義key和value的切分方式。
這是Map/Reduce框架和streaming mapper/reducer之間的基本通訊協議。
使用者也可以使用java類作為mapper或者reducer。上面的例子與這裡的程式碼等價:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer /bin/wc
使用者可以設定stream.non.zero.exit.is.failure true 或false 來表明streaming task的返回值非零時是 Failure 還是Success。預設情況,streaming task返回非零時表示失敗。
將檔案打包到提交的作業中
我們要開始講關鍵點了,並不是每位同學都對java熟悉程度這麼高。沒關係,hadoop允許我們用指令碼語言完成處理過程,並把檔案打包提交到作業中,完成大資料的處理。
任何可執行檔案都可以被指定為mapper/reducer。這些可執行檔案不需要事先存放在叢集上;如果在叢集上還沒有,則需要用-file選項讓framework把可執行檔案作為作業的一部分,一起打包提交。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myPythonScript.py \
-reducer /bin/wc \
-file myPythonScript.py
上面的例子描述了一個使用者把可執行python檔案作為mapper。 其中的選項“-file myPythonScirpt.py”使可執行python檔案作為作業提交的一部分被上傳到叢集的機器上。
除了可執行檔案外,其他mapper或reducer需要用到的輔助檔案(比如字典,配置檔案等)也可以用這種方式打包上傳。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myPythonScript.py \
-reducer /bin/wc \
-file myPythonScript.py \
-file myDictionary.txt
Streaming選項與用法
只使用Mapper的作業
有時只需要map函式處理輸入資料。這時只需把mapred.reduce.tasks設定為零,Map/reduce框架就不會建立reducer任務,mapper任務的輸出就是整個作業的最終輸出。
為了做到向下相容,Hadoop Streaming也支援“-reduce None”選項,它與“-jobconf mapred.reduce.tasks=0”等價。
為作業指定其他外掛
和其他普通的Map/Reduce作業一樣,使用者可以為streaming作業指定其他外掛:
-inputformat JavaClassName -outputformat JavaClassName -partitioner JavaClassName -combiner JavaClassName 用於處理輸入格式的類要能返回Text型別的key/value對。如果不指定輸入格式,則預設會使用TextInputFormat。 因為TextInputFormat得到的key值是LongWritable型別的(其實key值並不是輸入檔案中的內容,而是value偏移量), 所以key會被丟棄,只把value用管道方式發給mapper。
使用者提供的定義輸出格式的類需要能夠處理Text型別的key/value對。如果不指定輸出格式,則預設會使用TextOutputFormat類。
Hadoop Streaming中的大檔案和檔案
任務使用-cacheFile和-cacheArchive選項在叢集中分發檔案和檔案,選項的引數是使用者已上傳至HDFS的檔案或檔案的URI。這些檔案和檔案在不同的作業間快取。使用者可以通過fs.default.name.config配置引數的值得到檔案所在的host和fs_port。
這個是使用-cacheFile選項的例子:
-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink
在上面的例子裡,url中#後面的部分是建立在任務當前工作目錄下的符號連結的名字。這裡的任務的當前工作目錄下有一個“testlink”符號連結,它指向testfile.txt檔案在本地的拷貝。如果有多個檔案,選項可以寫成:
-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2
-cacheArchive選項用於把jar檔案拷貝到任務當前工作目錄並自動把jar檔案解壓縮。例如:
-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3
在上面的例子中,testlink3是當前工作目錄下的符號連結,它指向testfile.jar解壓後的目錄。
下面是使用-cacheArchive選項的另一個例子。其中,input.txt檔案有兩行內容,分別是兩個檔案的名字:testlink/cache.txt和testlink/cache2.txt。“testlink”是指向檔案目錄(jar檔案解壓後的目錄)的符號連結,這個目錄下有“cache.txt”和“cache2.txt”兩個檔案。
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input "/user/me/samples/cachefile/input.txt" \
-mapper "xargs cat" \
-reducer "cat" \
-output "/user/me/samples/cachefile/out" \
-cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \
-jobconf mapred.map.tasks=1 \
-jobconf mapred.reduce.tasks=1 \
-jobconf mapred.job.name="Experiment"
再來看一樣過程和內容
$ ls test_jar/
cache.txt cache2.txt
$ jar cvf cachedir.jar -C test_jar/ .
added manifest
adding: cache.txt(in = 30) (out= 29)(deflated 3%)
adding: cache2.txt(in = 37) (out= 35)(deflated 5%)
$ hadoop dfs -put cachedir.jar samples/cachefile
$ hadoop dfs -cat /user/me/samples/cachefile/input.txt
testlink/cache.txt
testlink/cache2.txt
$ cat test_jar/cache.txt
This is just the cache string
$ cat test_jar/cache2.txt
This is just the second cache string
$ hadoop dfs -ls /user/me/samples/cachefile/out
Found 1 items
/user/me/samples/cachefile/out/part-00000 <r 3> 69
$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000
This is just the cache string
This is just the second cache string
為作業指定附加配置引數
使用者可以使用“-jobconf =”增加一些配置變數。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper\
-reducer /bin/wc \
-jobconf mapred.reduce.tasks=2
上面的例子中,-jobconf mapred.reduce.tasks=2表明用兩個reducer完成作業。
其他選項
Streaming 作業的其他選項如下表:
選項 | 可選/必須 | 描述 |
---|---|---|
-cluster name | 可選 | 在本地Hadoop叢集與一個或多個遠端叢集間切換 |
-dfs host:port or local | 可選 | 覆蓋作業的HDFS配置 |
-jt host:port or local | 可選 | 覆蓋作業的JobTracker配置 |
-additionalconfspec specfile | 可選 | 用一個類似於hadoop-site.xml的XML檔案儲存所有配置,從而不需要用多個"-jobconf name=value"型別的選項單獨為每個配置變數賦值 |
-cmdenv name=value | 可選 | 傳遞環境變數給streaming命令 |
-cacheFile fileNameURI | 可選 | 指定一個上傳到HDFS的檔案 |
-cacheArchive fileNameURI | 可選 | 指定一個上傳到HDFS的jar檔案,這個jar檔案會被自動解壓縮到當前工作目錄下 |
-inputreader JavaClassName | 可選 | 為了向下相容:指定一個record reader類(而不是input format類) |
-verbose | 可選 | 詳細輸出 |
使用-cluster 實現“本地”Hadoop和一個或多個遠端Hadoop叢集間切換。預設情況下,使用hadoop-default.xml和hadoop-site.xml;當使用-cluster 選項時,會使用$HADOOP_HOME/conf/hadoop-.xml。
下面的選項改變temp目錄:
-jobconf dfs.data.dir=/tmp
下面的選項指定其他本地temp目錄:
-jobconf mapred.local.dir=/tmp/local
-jobconf mapred.system.dir=/tmp/system
-jobconf mapred.temp.dir=/tmp/temp
更多有關jobconf的細節請參考:http://wiki.apache.org/hadoop/JobConfFile
在streaming命令中設定環境變數:
-cmdenv EXAMPLE_DIR=/home/example/dictionaries/
高階功能與其他例子
使用自定義的方法切分行來形成Key/Value對
之前已經提到,當Map/Reduce框架從mapper的標準輸入讀取一行時,它把這一行切分為key/value對。 在預設情況下,每行第一個tab符之前的部分作為key,之後的部分作為value(不包括tab符)。
但是,使用者可以自定義,可以指定分隔符是其他字元而不是預設的tab符,或者指定在第n(n>=1)個分割符處分割而不是預設的第一個。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-jobconf stream.map.output.field.separator=. \
-jobconf stream.num.map.output.key.fields=4
在上面的例子,“-jobconf stream.map.output.field.separator=.”指定“.”作為map輸出內容的分隔符,並且從在第四個“.”之前的部分作為key,之後的部分作為value(不包括這第四個“.”)。 如果一行中的“.”少於四個,則整行的內容作為key,value設為空的Text物件(就像這樣建立了一個Text:new Text(""))。
同樣,使用者可以使用“-jobconf stream.reduce.output.field.separator=SEP”和“-jobconf stream.num.reduce.output.fields=NUM”來指定reduce輸出的行中,第幾個分隔符處分割key和value。
一個實用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項)
Hadoop有一個工具類org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, 它在應用程式中很有用。Map/reduce框架用這個類切分map的輸出, 切分是基於key值的字首,而不是整個key。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-jobconf stream.map.output.field.separator=. \
-jobconf stream.num.map.output.key.fields=4 \
-jobconf map.output.key.field.separator=. \
-jobconf num.key.fields.for.partition=2 \
-jobconf mapred.reduce.tasks=12
其中,-jobconf stream.map.output.field.separator=. 和-jobconf stream.num.map.output.key.fields=4是前文中的例子。Streaming用這兩個變數來得到mapper的key/value對。
上面的Map/Reduce 作業中map輸出的key一般是由“.”分割成的四塊。但是因為使用了 -jobconf num.key.fields.for.partition=2 選項,所以Map/Reduce框架使用key的前兩塊來切分map的輸出。其中,-jobconf map.output.key.field.separator=. 指定了這次切分使用的key的分隔符。這樣可以保證在所有key/value對中, key值前兩個塊值相同的所有key被分到一組,分配給一個reducer。
這種高效的方法等價於指定前兩塊作為主鍵,後兩塊作為副鍵。 主鍵用於切分塊,主鍵和副鍵的組合用於排序。一個簡單的示例如下:
Map的輸出(key)
11.12.1.2
11.14.2.3
11.11.4.1
11.12.1.1
11.14.2.2
切分給3個reducer(前兩塊的值用於切分)
11.11.4.1
-----------
11.12.1.2
11.12.1.1
-----------
11.14.2.3
11.14.2.2
在每個切分後的組內排序(四個塊的值都用於排序)
11.11.4.1
-----------
11.12.1.1
11.12.1.2
-----------
11.14.2.2
11.14.2.3
Hadoop聚合功能包的使用(-reduce aggregate 選項)
Hadoop有一個工具包“Aggregate”( https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate)。 “Aggregate”提供一個特殊的reducer類和一個特殊的combiner類, 並且有一系列的“聚合器”(“aggregator”)(例如“sum”,“max”,“min”等)用於聚合一組value的序列。 使用者可以使用Aggregate定義一個mapper外掛類, 這個類用於為mapper輸入的每個key/value對產生“可聚合項”。 combiner/reducer利用適當的聚合器聚合這些可聚合項。
要使用Aggregate,只需指定“-reducer aggregate”:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper myAggregatorForKeyCount.py \
-reducer aggregate \
-file myAggregatorForKeyCount.py \
-jobconf mapred.reduce.tasks=12
python程式myAggregatorForKeyCount.py例子:
#!/usr/bin/python import sys; def generateLongCountToken(id): return "LongValueSum:" + id + "\t" + "1" def main(argv): line = sys.stdin.readline(); try: while line: line = line[:-1]; fields = line.split("\t"); print generateLongCountToken(fields[0]); line = sys.stdin.readline(); except "end of file": return None if __name__ == "__main__": main(sys.argv)
欄位的選取(類似於unix中的 'cut' 命令)
Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduce幫助使用者高效處理文字資料, 就像unix中的“cut”工具。工具類中的map函式把輸入的key/value對看作欄位的列表。 使用者可以指定欄位的分隔符(預設是tab), 可以選擇欄位列表中任意一段(由列表中一個或多個欄位組成)作為map輸出的key或者value。 同樣,工具類中的reduce函式也把輸入的key/value對看作欄位的列表,使用者可以選取任意一段作為reduce輸出的key或value。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
-reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-jobconf map.output.key.field.separa=. \
-jobconf num.key.fields.for.partition=2 \
-jobconf mapred.data.field.separator=. \
-jobconf map.output.key.value.fields.spec=6,5,1-3:0- \
-jobconf reduce.output.key.value.fields.spec=0-2:5- \
-jobconf mapred.reduce.tasks=12
選項“-jobconf map.output.key.value.fields.spec=6,5,1-3:0-”指定了如何為map的輸出選取key和value。Key選取規則和value選取規則由“:”分割。 在這個例子中,map輸出的key由欄位6,5,1,2和3組成。輸出的value由所有欄位組成(“0-”指欄位0以及之後所有欄位)。
選項“-jobconf reduce.output.key.value.fields.spec=0-2:0-”(譯者注:此處應為”0-2:5-“)指定如何為reduce的輸出選取value。 本例中,reduce的輸出的key將包含欄位0,1,2(對應於原始的欄位6,5,1)。 reduce輸出的value將包含起自欄位5的所有欄位(對應於所有的原始欄位)。