Hadoop教程:流
Hadoop流是Hadoop發行版附帶的實用程式。這個實用程式允許您使用任何可執行檔案或指令碼作為mapper 和/或reducer建立和執行Map/Reduce作業。
Python例子
對於Hadoop流,我們正在考慮word-count 問題。Hadoop中的任何工作都必須有兩個階段:mapper和reducer。我們已經在python指令碼中為mapper和reducer編寫了在Hadoop下執行它的程式碼。也可以用Perl和Ruby編寫相同的程式碼。
Mapper Phase Code
!/usr/bin/python import sys # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Break the line into words words = myline.split() # Iterate the words list for myword in words: # Write the results to standard output print '%s\t%s' % (myword, 1)
確保該檔案具有執行許可權(chmod +x /home/ expert/hadoop-1.2.1/mapper.py)。
Reducer Phase Code
#!/usr/bin/python from operator import itemgetter import sys current_word = "" current_count = 0 word = "" # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Split the input we got from mapper.py word, count = myline.split('\t', 1) # Convert count variable to integer try: count = int(count) except ValueError: # Count was not a number, so silently ignore this line continue if current_word == word: current_count += count else: if current_word: # Write result to standard output print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # Do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)
將mapper和reducer程式碼分別儲存在Hadoop home目錄下的mapper.py and reducer.py 檔案。確保這些檔案具有執行許可權(chmod +x mapper.py)。和chmod +x reducer.py)。由於python對縮排敏感,所以可以從下面的連結下載相同的程式碼。
WordCount程式的執行
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1. 2.1.jar \ -input input_dirs \ -output output_dir \ -mapper <path/mapper.py \ -reducer <path/reducer.py
其中“\”用於行延續,以確保清晰的可讀性。
例如
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py
流是如何工作的
在上面的例子中,mapper和reducer都是python指令碼,它們從標準輸入讀取輸入並將輸出輸出到標準輸出。該實用程式將建立Map/Reduce作業,將作業提交到適當的叢集,並監視作業的進度,直到作業完成。
當為mappers指定指令碼時,每個mapper任務將在初始化mapper時作為單獨的程序啟動指令碼。當mapper任務執行時,它將其輸入轉換為行,並將這些行提供給流程的標準輸入(STDIN)。同時,mapper從流程的標準輸出(STDOUT)中收集面向行的輸出,並將每一行轉換為鍵/值對,作為mapper的輸出進行收集。預設情況下,直到第一個製表符的行字首是鍵,行其餘部分(不包括製表符)是值。如果行中沒有製表符,則將整行視為鍵,值為null。但是,這可以根據需要定製。
當為reducers指定指令碼時,每個reducer任務將作為單獨的程序啟動指令碼,然後初始化reducer。當reducer任務執行時,它將輸入鍵/值對轉換為行,並將這些行提供給流程的標準輸入(STDIN)。同時,reducer從流程的標準輸出(STDOUT)中收集面向行的輸出,將每一行轉換為鍵/值對,作為reducer的輸出進行收集。預設情況下,直到第一個製表符的行字首是鍵,行其餘部分(不包括製表符)是值。但是,這可以根據特定的需求進行定製。
重要的命令
Parameters | Description |
---|---|
-input directory/file-name | Input location for mapper. (Required) |
-output directory-name | Output location for reducer. (Required) |
-mapper executable or script or JavaClassName | Mapper executable. (Required) |
-reducer executable or script or JavaClassName | Reducer executable. (Required) |
-file file-name | Makes the mapper, reducer, or combiner executable available locally on the compute nodes. |
-inputformat JavaClassName | Class you supply should return key/value pairs of Text class. If not specified, TextInputFormat is used as the default. |
-outputformat JavaClassName | Class you supply should take key/value pairs of Text class. If not specified, TextOutputformat is used as the default. |
-partitioner JavaClassName | Class that determines which reduce a key is sent to. |
-combiner streamingCommand or JavaClassName | Combiner executable for map output. |
-cmdenv name=value | Passes the environment variable to streaming commands. |
-inputreader | For backwards-compatibility: specifies a record reader class (instead of an input format class). |
-verbose | Verbose output. |
-lazyOutput | Creates output lazily. For example, if the output format is based on FileOutputFormat, the output file is created only on the first call to output.collect (or Context.write). |
-numReduceTasks | Specifies the number of reducers. |
-mapdebug | Script to call when map task fails. |
-reducedebug | Script to call when reduce task fails. |
原文連結:ofollow,noindex">https://www.tutorialspoint.com/...