1. 程式人生 > >hadoop streaming引數配置

hadoop streaming引數配置

Streaming簡介

Hadoop Streaming 是Hadoop提供的一個程式設計工具,Streamining框架允許任何可執行檔案或者指令碼檔案作為Mapper和Reducer在Hadoop MapReduce中使用,方便已有程式向Hadoop平臺移植。因此可以說對於hadoop的擴充套件性意義重大。

Streamining的原理:mapper和reducer會從標準輸入中讀取資料,一行一行處理後傳送給標準輸出,Streming 工具會建立MapReduce作業,傳送給各個tasktracker,同時監控整個作業的執行過程。

如果一個檔案(可執行檔案或指令碼)作為mapper,mapper初始化時,每一個mapper任務會把該檔案作為一個單獨程序啟動,mapper任務執行時,它把輸入切分成行並把每一行提供給可執行檔案程序的標準輸入,同時,mapper收集可執行檔案程序標準輸出的內容,並把收到的每一行內容轉化成key/value對,作為mapper的輸出,預設情況下,一行中第一個tab之前的部分作為key,之後的作為value。如果沒有tab,整行作為key,value值為null。

基本用法

HADOOPHOME/bin/hadoopjarHADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar [options]

options:

 - -input: 輸入檔案路徑
 - -output:輸出檔案路徑
 - -mapper: 使用者自己寫的mapper程式,可以是可執行檔案或指令碼
 -  -reducer: 使用者自己寫的reducer程式,可以是可執行檔案或指令碼
 - -file:打包檔案到提交的作業中,可以是mapper或者reducer要用的輸入檔案,如配置檔案、字典等
 - -partitioner:使用者自定義的partitioner程式
 - -combiner:使用者自定義的combiner程式
 - -D:作業的一些屬性(以前用的是-joncof)

例1:

${BIN_HADOOP} fs -rm -r "${OUTPUT}"
${BIN_HADOOP} jar "${jarPack}" \
    -input "${INPUT}/part*" \
    -output "${OUTPUT}" \
    -mapper "${PYTHON_BIN}  map.py" \
    -reducer "${PYTHON_BIN} reduce.py" \
    -jobconf mapred.job.name="*****" \
    -jobconf mapred.job.priority=VERY_HIGH \
    -jobconf mapred.job.map.capacity=200
\
-jobconf mapred.job.reduce.capacity=200 \ -jobconf mapred.reduce.tasks=200 \ -jobconf mapred.min.split.size=20480000000 \ -jobconf mapred.map.memory.limit=2000 \ -jobconf mapred.map.over.capacity.allowed=true \ -cacheArchive "${PYTHON_PACK}#taskenv" \ -file map.py \ -file init.conf

上面例子上傳了當前環境的python包(就是將當前環境python環境下的bin include lib share壓縮tar),然後上傳到HDFS(PYTHON_PACK),啟動Streamining時,使用-cacheArchive選項將其分發到計算結點並解壓到taskenv目錄,PYTHON_BIN=”taskenv/python27/bin/python”,本地打包時要進入目錄app而不是在app的上層目錄打包,否則要通過app/app/mapper.pl才能訪問到mapper.pl檔案。

例2:

${BIN_HADOOP} fs -rm -r "${OUTPUT}"
${BIN_HADOOP} jar "${jarPack}" \
    -D mapreduce.map.memory.mb=2800 \
    -D mapred.map.tasks=400 \
    -D stream.non.zero.exit.is.failure=false \
    -file map.py \
    -input "${INPUT}" \
    -output "${OUTPUT}" \
    -mapper map.py \
    -jobconf mapred.map.over.capacity.allowed=true \
    -cacheFile "${PYTHON_FILE}#tasklink" \
    -jobconf mapred.job.map.capacity=20 \
    -jobconf mapreduce.job.name="****"

上例上傳了一個檔案,其中在map.py可以通過(“./tasklink”)直接訪問${PYTHON_FILE}檔案。

Streamining不足

  • Hadoop Streamining 預設只能處理文字資料,無法直接處理二進位制資料進行處理
  • Streaming中的mapper和reducer預設只能標準輸出寫資料,不能方便的處理多路輸出

基礎配置引數介紹

下面引數中-files,-archives,-libjars為hadoop命令

  • -file將客戶端本地檔案打成jar包上傳到HDFS然後分發到計算節點
  • -cacheFile將HDFS檔案分發到計算節點
  • -cacheArchive將HDFS壓縮檔案分發到計算節點並解壓,也可以指定符號連結
  • -files:將指定的本地/hdfs檔案分發到各個Task的工作目錄下,不對檔案進行任何處理,例如:-files hdfs://host:fs_port/user/testfile.txt#testlink ;
  • -archives選項允許使用者複製jar包到當前任務的工作路徑,並且自動解壓jar包,例如:-archives hdfs://host:fs_port/user/testfile.jar#testlink3 。在這個例子中,符號連結testlink3建立在當前任務的工作路徑中。符號連結指向存放解壓jar包的檔案路徑
  • -libjars:指定待分發的jar包,Hadoop將這些jar包分發到各個節點上後,會將其自動新增到任務的CLASSPATH環境變數中
  • -input:指定作業輸入,可以是檔案或者目錄,可以使用*萬用字元,也可以使用多次指定多個檔案或者目錄作為輸入
  • -output:指定作業輸出目錄,並且必須不存在,並且必須有建立該目錄的許可權,-output只能使用一次
  • -mapper:指定mapper可執行程式或Java類,必須指定且唯一
  • -reducer:指定reducer可執行程式或Java類
  • -numReduceTasks: 指定reducer的個數,如果設定為0或者-reducer NONE 則沒有reducer程式,mapper的輸出直接作為整個作業的輸出

其他引數配置

一般用 -jobconf | -D NAME=VALUE 指定作業引數,指定 mapper or reducer 的 task 官方上說要用 -jobconf 但是這個引數已經過時,不可以用了,官方說要用 -D, 注意這個-D是要作為最開始的配置出現的,因為是在maper 和 reducer 執行之前,就需要硬性指定好的,所以要出現在引數的最前面 ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D ………-input …….. 類似這樣

  • mapred.job.name=”jobname” 設定作業名,特別建議
  • mapred.job.priority=VERY_HIGH|HIGH|NORMAL|LOW|VERY_LOW 設定作業優先順序
  • * mapred.job.map.capacity=M設定同時最多執行M個map任務*
  • mapred.job.reduce.capacity=N 設定同時最多執行N個reduce任務
  • mapred.map.tasks 設定map任務個數
  • mapred.reduce.tasks 設定reduce任務個數
  • mapred.job.groups 作業可執行的計算節點分組
  • mapred.task.timeout 任務沒有響應(輸入輸出)的最大時間
  • mapred.compress.map.output 設定map的輸出是否壓縮
  • mapred.map.output.compression.codec 設定map的輸出壓縮方式
  • mapred.output.compress 設定map的輸出是否壓縮
  • mapred.output.compression.codec 設定reduce的輸出壓縮方式
  • stream.map.output.field.separator 指定map輸出分隔符,預設情況下Streaming框架將map輸出的每一行的第一個’\t’之前的部分作為key,之後的部分作為value,key\value再作為reduce的輸入;
  • stream.num.map.output.key.fields 設定分隔符的位置,如果設定為2,指定在第2個分隔符處分割,也就是第二個之前作為key,之後作為value,如果分隔符少於2個,則以整行為key,value為空
  • stream.reduce.output.field.separator 指定reduce輸出分隔符
  • stream.num.reduce.output.key.fields 指定reduce輸出分隔符位置

其中sort和partition的引數用的比較多
map.output.key.field.separator: map中key內部的分隔符
num.key.fields.for.partition: 分桶時,key按前面指定的分隔符分隔之後,用於分桶的key佔的列數。通俗地講,就是partition時候按照key中的前幾列進行劃分,相同的key會被打到同一個reduce裡。
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 前兩個引數,要配合partitioner選項使用!

stream.map.output.field.separator: map中的key與value分隔符
stream.num.map.output.key.fields: map中分隔符的位置
stream.reduce.output.field.separator: reduce中key與value的分隔符
stream.num.reduce.output.key.fields: reduce中分隔符的位置

另外還有壓縮引數

Job輸出結果是否壓縮

mapred.output.compress

是否壓縮,預設值false。

mapred.output.compression.type

壓縮型別,有NONE, RECORD和BLOCK,預設值RECORD。

mapred.output.compression.codec

壓縮演算法,預設值org.apache.hadoop.io.compress.DefaultCodec。

map task輸出是否壓縮

mapred.compress.map.output

是否壓縮,預設值false

mapred.map.output.compression.codec

壓縮演算法,預設值org.apache.hadoop.io.compress.DefaultCodec

另外,Hadoop本身還自帶一些好用的Mapper和Reducer:

1、Hadoop聚集功能:Aggregate提供一個特殊的reducer類和一個特殊的combiner類,並且有一系列的“聚合器”(例如:sum、max、min等)用於聚合一組value的序列。可以使用Aggredate定義一個mapper外掛類,這個類用於為mapper輸入的每個key/value對產生“可聚合項”。combiner/reducer利用適當的聚合器聚合這些可聚合項。要使用Aggregate,只需指定”-reducer aggregate”。
2、欄位的選取(類似於Unix中的’cut’):Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduc幫助使用者高效處理文字資料,就像unix中的“cut”工具。工具類中的map函式把輸入的key/value對看作欄位的列表。 使用者可以指定欄位的分隔符(預設是tab),可以選擇欄位列表中任意一段(由列表中一個或多個欄位組成)作為map輸出的key或者value。 同樣,工具類中的reduce函式也把輸入的key/value對看作欄位的列表,使用者可以選取任意一段作為reduce輸出的key或value。

二次排序 Partitioner

一個典型的Map-Reduce的過程包括:Input->Map->Patition->Reduce->Output,Partition負責把Map任務輸出的中間結果按key分發給不同的Reduce任務進行處理。Hadoop提供了一個非常實用的partitioner類KeyFieldBasedPartitioner,使用方法:

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

一般會配合

-D map.output.key.field.separator  ###key內部的分隔符
-D num.key.fields.for.partition    ###對key分出來的前即部分做parititon,而不是整個key

使用,但是map.output.key.field.separator對tab鍵好像不管用,待驗證

例3:

測試資料:
1,2,1,1,1

1,2,2,1,1

1,3,1,1,1

1,3,2,1,1

1,3,3,1,1

1,2,3,1,1

1,3,1,1,1

1,3,2,1,1

1,3,3,1,1

-D stream.map.output.field.separator=, /    

-D stream.num.map.output.key.fields=4 /    

-D map.output.key.field.separator=, /    

-D num.key.fields.for.partition=2 /    

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner /

結果資料:
$ hadoop fs –cat /app/test/test_result/part-00003

1,2,1,1 1

1,2,2,1 1

1,2,3,1 1

$ hadoop fs –cat /app/test/test_result/part-00004

1,3,1,1 1

1,3,1,1 1

1,3,2,1 1

1,3,2,1 1

1,3,3,1 1

1,3,3,1 1

例4

測試資料:
D&10.2.3.40 1
D&11.22.33.33 1
W&www.renren.com 1
W&www.baidu.com 1
D&10.2.3.40 1
然後通過傳遞命令引數
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner //指定要求二次排序
-jobconf map.output.key.field.separator=’&’ //這裡如果不加兩個單引號的話我的命令會死掉
-jobconf num.key.fields.for.partition=1 //這裡指第一個 & 符號來分割,保證不會出錯

-combiner??
Comparator類

相關推薦

hadoop streaming引數配置

Streaming簡介 Hadoop Streaming 是Hadoop提供的一個程式設計工具,Streamining框架允許任何可執行檔案或者指令碼檔案作為Mapper和Reducer在Hadoop MapReduce中使用,方便已有程式向Hadoop平臺移

hadoop streaming 引數設定

Hadoop Streaming用法 Usage: $HADOOP_HOME/bin/hadoop jar \ $HADOOP_HOME/hadoop-streaming.jar [options] options: (1)-input:輸入檔案路徑 (2)-output:輸出檔案路徑 (3)-mapper:

Hadoop叢集引數配置原則

dfs.datanode.handler.count datanode上用於處理RPC的執行緒數。預設為3,較大叢集,可適當調大些,比如8。需要注意的是,每新增一個執行緒,需要的記憶體增加。tasktracker.http.threads HTTP server上的執行緒數

hadoop 簡單入門與streaming常用配置引數說明

1. Hadoop包含兩核心部分 hdfs Hadoop distribute file system -- hadoop分散式檔案系統,儲存資料 Namenode、Datanode 常用命令形式:hadoop fs -ls  /  hadoop fs -mkdi

Hadoop 學習研究(四):MapReduce shuffle過程剖詳解及引數配置調優

MapReduce簡介    在Hadoop  MapReduce中,框架會確保reduce收到的輸入資料是根據key排序過的。資料從Mapper輸出到Reducer接收,是一個很複雜的過程,框架

HDP 2.2 ( Hadoop 2.6 ) 叢集的記憶體引數配置引數調優 (Yarn/MapReduce2)

近期在根據叢集上的各節點的物理機配置對叢集的記憶體引數進行調整。  因此較系統的學習了一下hadoop裡對資源調配的各元件的相關引數的含義。 作為示例的配置叢集版本是2.6, hortonworks 2.2.  首先要理解, hadoop 中 yarn 作為資源管理器,

hadoop 引數配置

Hadoop引數彙總 @(hadoop)[配置] linux引數 以下引數最好優化一下: 檔案描述符ulimit -n 使用者最大程序 nproc (hbase需要 hbse book) 關閉swap分割槽 設定合理的預讀取緩衝區 Linux的核心的IO排程器

Hadoop --- 入門之配置引數

Hadoop引數彙總 @(hadoop)[配置] linux引數 以下引數最好優化一下: 檔案描述符ulimit -n 使用者最大程序 nproc (hbase需要 hbse book) 關閉swap分割槽 設定合理的預讀取緩衝區 Linux的核心的IO排

hadoop三個配置檔案的引數含義說明

hadoop常用埠配置 1. HDFS埠 引數 描述 預設 配置檔案 例子值 fs.default.name namenode RPC互動埠

hadoop streaming anaconda python 計算平均值

sdn cat pipe cal 存在 格式 ins too stream 原始Liunx 的python版本不帶numpy ,安裝了anaconda 之後,使用hadoop streaming 時無法調用anaconda python , 後來發現是參數沒設置好。。。

hadoop +streaming 排序總結

.lib fields 排序 1.4 stream 想要 output 廣泛 sep 參考http://blog.csdn.net/baidu_zhongce/article/details/49210787 hadoop用於對key的排序和分桶的設置選項比較多,在公司中

hadoop 使用ip配置導致hdfs啟動失敗

hadoop $2 value servers 無法 col reg dfs property dataNode 有守護進行,但hdfs web頁面上顯示沒有live node。 錯誤日誌: 2017-06-21 17:44:59,513 ERROR org.apa

hadoop-env.sh配置

hadoop-env.sh配置1.hadoop的hadoop-env.sh配置,主要配置內容如下,指定jdk目錄export JAVA_HOME=/usr/java/jdk1.7.0_79本文出自 “素顏” 博客,請務必保留此出處http://suyanzhu.blog.51cto.com/8050189/1

Hadoop Streaming開發要點

而不是 使用 節點 多次 spa cal hive 程序 col 一.shell腳本中的相關配置 1 HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop" 2 STREAM_JAR_PATH="/usr/local/s

hadoop streaming 語法

capacity hdfs 壓縮 ups har 格式 -o art str 1、hadoop streaming 命令格式 $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar -D mapred.job.name="s

Hadoop環境 IDE配置(在eclipse中安裝hadoop-eclipse-plugin-2.7.3.jar插件)

map bubuko other 9.png 查看 3.2 div 集群 點擊 一、hadoop-eclipse-plugin-2.7.3.jar插件下載點擊下載 二、把插件放到eclipse的安裝目錄dropins下 三、eclipse上的配置 3.1 打開Wind

大數據Hadoop Streaming編程實戰之C++、Php、Python

大數據編程 PHP語言 Python編程 C語言的應用 Streaming框架允許任何程序語言實現的程序在HadoopMapReduce中使用,方便已有程序向Hadoop平臺移植。因此可以說對於hadoop的擴展性意義重大。接下來我們分別使用C++、Php、Python語言實現HadoopWo

Hadoop Streaming

earch IT fault target generate 完成 hadoop集群 問題 tor 原文地址:http://hadoop.apache.org/docs/r1.0.4/cn/streaming.html Hadoop Streaming Stre

Hadoop用戶配置ssh免密登錄

Linux Hadoop 一般生產環境 Hadoop組件都是由hadoop用戶來啟動,首先需要配置hadoop用戶ssh免密登錄 1.創建Hadoop用戶 [root@hadoop000 ~]# useradd hadoop [root@hadoop000 ~]# id hadoop uid=1102

hadoop環境安裝配置介紹與步驟

關系 防止 shadow 設置 操作 data 環境 con 目的 在上一篇的分享文章中我是給大家分享了運行部署hadoop的一些安裝準備工作,這篇接上一篇繼續為大家分享一些個人的學習經驗總結。我學習用的是大快發行版DKHadoop,所以所有的經驗分享都是以DKHadoop