1. 程式人生 > >hadoop 簡單入門與streaming常用配置引數說明

hadoop 簡單入門與streaming常用配置引數說明

1. Hadoop包含兩核心部分

  1. hdfs
    1. Hadoop distribute file system -- hadoop分散式檔案系統,儲存資料
    2. NamenodeDatanode
    3. 常用命令形式hadoop fs -ls  /  hadoop fs -mkdir
  2. MapReduce
    1. 分而治之;map:實現分治;reduce:實現合併
    2. 解決資料可分割的計算問題
    3. 程式設計介面:常用Streaming;組成Job配置檔案、map函式,reduce函式

2. hdfs結構圖

  1. Namenode儲存元資料,資料資訊,資料備份資訊
  2. Datanode 資料備份:本機架備份、異地備份

3. MapReduce排程框架

  1. JobClient負責根據使用者指定引數生成一個mapreduce作業,提交到JobTracker
  2. JobTracker: Master節點,將Job所有task排程到TaskTracker
  3. TaskTracker: 部署在每臺計算機節點的一個service

4.  MapReduce 執行層

  1. Map階段,讀入資料,通過partition聚集相同key資料,並寫到本機磁碟
  2. Reduce階段,不同reduce,讀入Map階段各maps相應輸出

5. streaming 作業

  1. streaming mapper
    1. 先啟動使用者提交作業時指定的一個外部程式,一般是指令碼
    2. 這個外部程式作為streaming mapper的子程序,streaming mapper讀取使用者輸入後,不再是呼叫map函式處理,而是通過管道寫到子程序的標準輸入
    3. 從子程序的標準輸出讀取資料,寫到磁碟上
  2. streaming 作業資料流向
    1. 父程序是Java,負責讀取資料通過管道傳送給子程序
    2. 通過管道把結果再讀取回來

      一份資料在兩個不同程序中傳遞兩次

6 mapreduce – shuffle

  1. map --> shuffle –> reduce
  2. shuffle 從多個節點傳遞到多個節點,而不是多個節點到一個節點
  3. shuffle 包含partition、combiner
    1. partition : 資料歸併,分割map每個節點的結果,按照key分別對映給不同的reduce,預設是HashPartition,which reducer == (key.hashCode & Integer.MAX_VALUE)% numReduceTasks
    2. 作用:計算(key, value)所屬分割槽 ; 把同一分割槽資料合併、聚集
  4. combiner: combiner屬於優化方案,由於頻寬限制,應該儘量map和reduce之間的資料傳輸數量。它在Map端把同一個key的鍵值對合並在一起並計算,計算規則與reduce一致,所以combiner也可以看作特殊的Reducer

7. streaming 常用配置項

  1. stream.map.output.field.separator // 該引數屬於streaming作業引數,設定map輸出的欄位分隔符,預設為“\t”,該分隔符只對下面的stream.num.map.output.key.fields引數生效
  2. stream.num.map.output.key.fields // 設定map輸出的前幾個欄位作為key,一般與第二項stream.map.output.field.separator 結合使用
  3. mapred.text.key.partitioner.options  // 設定key內某個欄位或者某個欄位範圍用做partition
  4. mapred.text.key.comparator.options // 設定key中需要比較的欄位或位元組範圍
  5. partitioner // 主要用於對鍵值進行劃分,負責將map的輸出結果根據key進行分割。Key用於確定不同的key落到不同的reduce上,通常對key進行Hash以後對reduce取mod,該key對應的紀錄最終將根據mod值落到對應的reduce上進行處理。HashPartitioner, IndexUpdatePartitioner, KeyFieldBasedPartitioner, SleepJob,預設的為 HashPartitioner,即對key直接進行hash分到對應的reduce,具體見第6部分。更高階一點的為 KeyFieldBasedPartitioner,該partitioner可以指定key中前幾個欄位用於分割
  6. HashPartition  最基本的Partitioner,如果不指定Partitioner的話則預設使用該類。輸出格式為最基本的key”\t“value
  7. KeyFieldBasedPartitioner 可以看做HashPartitioner的擴充套件,他將原有的對單欄位Key的hash擴充套件到可以靈活地對多欄位key進行分桶並排序,對應配置引數如下:
  8. -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner // 該引數表示作業啟用KeyFieldBasedParitioner
  9. -D map.ouput.key.field.separator // 該引數屬於KeyFieldBasedPartitioner引數,只有當啟用KeyFieldBasedParititioner時,該引數才會生效;該引數指明map輸出結果中欄位之間的分隔符(該分隔符只對下面的num.key.fields.for.partition引數生效);
  10. -D num.key.fields.for.partition // 該引數同樣屬於KeyFieldBasedPartitioner引數;該引數指明map輸出結果的key按上述分隔符切分後,前幾個欄位將用來做partition;該引數不能與mapred.text.key.partitioner.options共用;
  11. -D mapred.text.key.partitioner.options // 該引數同樣屬於KeyFieldBasedPartitioner引數; 該引數指明map輸出結果的key按上述分隔符切分後,使用哪些欄位用來做partition;該引數不能與num.key.fields.for.partition共用,一起使用則以num.key.fields.for.partition為準;
  12. KeyFieldBaseComparator // 可以靈活設定比較位置的高階比較器,但是它和沒有自己獨有的比較邏輯,而是使用預設Text的基於字典序或者通過-n來基於數字比較,直觀來說,partition指定key中的分割槽元素,KeyFieldBaseComparator用作指定key排序欄位以及排序規則,引數配置如下:
  13. -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator // 該引數表示作業啟用KeyFieldBasedComparator
  14. -D mapred.text.key.comparator.options="-k3,3 -k4nr" // 以key中第三個欄位正序,第四個欄位逆序比較排序
  15. stream.memory.limit // 任務記憶體限制
  16. mapred.reduce.tasks // 指定reducer個數
  17. cmdenv //傳遞給streaming命令的環境變數
  18. -input //HDFS目錄或檔案路徑, Mapper的輸入資料,檔案要在任務提交前手動上傳到HDFS
  19. -output // reducer輸出結果的HDFS存放路徑,  不能已存在,但指令碼中一定要配置,多次-input,指定多個輸入檔案
  20. -mapper // 可執行命令,mapper程式
  21. -reducer // 可執行命令, reduce程式,不需要reduce處理就不指定
  22. -file //本地mapper、reducer程式檔案、程式執行需要的其他檔案,將本地檔案分發給計算節點;檔案作為作業的一部分,一起被打包並提交,所有分發的檔案最終會被放置在datanode該job的同一個專屬目錄下:jobcache/job_xxx/jar
  23. -cacheFile //分發HDFS檔案
  24. -cacheArchive // 分發HDFS壓縮檔案、壓縮檔案內部具有目錄結構
  25. mapred.job.priority //作業優先順序
  26. mapred.job.map.capacity // 最多同時執行map任務數
  27. mapred.job.reduce.capacity //最多同時執行reduce任務數
  28. mapred.job.name // job name

8. key-partition 例項

  1. key 不等於 partition,也就是說,分桶規則跟map階段的key有可能不是一回事
  2. 假設,檔案A中內容如下:

                      

  • 第一種作業方式(部分引數):
./hadoop streaming
-D stream.map.output.field.separator=.
-D stream.num.map.output.key.fields=2

                                 

        只是將map的輸出結果按兩個欄位切分成了key和value;再分桶上我們可以看出,它是以前兩個欄位作為一個整體來進行分桶的,e.5與e.9沒有分在一個reduce

  • 第二種作業:
./hadoop streaming
-D stream.map.output.field.separator=.
-D stream.num.map.output.key.fields=2
-D map.output.key.field.separator=.
-D num.key.fields.for.partition=1
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner\

                                   

              這裡啟用了KeyFieldBasedPartitioner,並且制定分桶以key的第一個欄位為準;我們可以看出mapred依然採用的是前兩個欄位為key,但是在分桶上只對第一個欄位做了雜湊函式,因此這次e.5和e.9分到了一個reducer內

  • 第三種作業
./hadoop streaming
-D stream.map.output.field.separator=.
-D stream.num.map.output.key.fields=3
-D map.output.key.field.separator=.
-D num.key.fields.for.partition=1  
-D mapred.text.key.partitioner.options=-k2,3
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

       這次將key的長度改成3個欄位,分桶標準也變成key的第2、3個欄位,可以看出e.5.1被分在了一起,而第三個欄位不同的e.5.9被分到了其他的reducer中

  • 第四種作業
./hadoop streaming \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D stream.num.map.output.key.fields=4 \
-D stream.map.output.field.separator=. \
-D map.output.key.field.separator=. \
-D mapred.text.key.partitioner.options=-k1,2 \
-D mapred.text.key.comparator.options="-k3,3 -k4nr" \

                                    

         這次key的長度為4,分桶標準為key的第一、二個欄位,可以看出,e.5被分到一個桶內,而輸出結果,按照key的第三個欄位的正序,第四個欄位的逆序排列輸出