1. 程式人生 > >大資料面試題目第二部分

大資料面試題目第二部分

  1. 簡要描述如何安裝配置apache的一個開源Hadoop,只描述即可,無需列出具體步驟,列出具體步驟更好。
  1. 準備三臺客戶機(配置IP,配置主機名…)
  2. 安裝jdk,安裝hadoop
  3. 配置JAVA_HOME和HADOOP_HOME
  4. 使每個節點上的環境變數生效(source /etc/profile)
  5. 準備分發指令碼 xsync
    a) **在/user/atguigu/bin下建立指令碼:xsync
  6. 明確叢集的配置
  7. 修改配置檔案
    a) **core-site.xml
    b) **hadoop-env.sh
    c) **hdfs-site.xml
    d) **
    yarn-env.sh

    e) **yarn-site.xml
    f) **mapred-env.sh
    g) **mapred-site.xml
    h) **配置slaves
  8. 分發配置檔案
    a) **xsync /etc/hadoop
  9. 刪掉data和logs資料夾
  10. 配置ssh(hadoop102,hadoop103)
  11. 分發配置檔案
  12. 格式化hdfs(hdfs namenode -format)
  13. 群啟hdfs
  14. 群啟yarn
  1. Hadoop中需要哪些配置檔案,其作用是什麼?
    1)core-site.xml:
    (1)fs.defaultFS:hdfs://cluster1(域名),這裡的值指的是預設的HDFS路徑 。
    (2)hadoop.tmp.dir:/export/data/hadoop_tmp,這裡的路徑預設是NameNode、DataNode、secondaryNamenode等存放資料的公共目錄。使用者也可以自己單獨指定這三類節點的目錄。
    (3)ha.zookeeper.quorum:hadoop101:2181,hadoop102:2181,hadoop103:2181,這裡是ZooKeeper叢集的地址和埠。注意,數量一定是奇數,且不少於三個節點 。
    2)
    hadoop-env.sh
    : 只需設定jdk的安裝路徑,如:export JAVA_HOME=/usr/local/jdk。
    3)hdfs-site.xml:
    (1) dfs.replication:他決定著系統裡面的檔案塊的資料備份個數,預設為3個。
    (2) dfs.data.dir:datanode節點儲存在檔案系統的目錄 。
    (3) dfs.name.dir:是namenode節點儲存hadoop檔案系統資訊的本地系統路徑 。
    4)mapred-site.xml:
    mapreduce.framework.name: yarn指定mr執行在yarn上。
  2. 請列出正常工作的Hadoop叢集中Hadoop都分別需要啟動哪些程序,它們的作用分別是什麼?
    1)NameNode它是hadoop中的主伺服器,管理檔案系統名稱空間和對叢集中儲存的檔案的訪問,儲存有metadate。
    2)SecondaryNameNode它不是namenode的冗餘守護程序,而是提供週期檢查點和清理任務。幫助NN合併editslog,減少NN啟動時間。
    3)DataNode它負責管理連線到節點的儲存(一個叢集中可以有多個節點)。每個儲存資料的節點執行一個datanode守護程序。
    4)ResourceManager(JobTracker)JobTracker負責排程DataNode上的工作。每個DataNode有一個TaskTracker,它們執行實際工作。
    5)NodeManager(TaskTracker)執行任務。
    6)DFSZKFailoverController高可用時它負責監控NN的狀態,並及時的把狀態資訊寫入ZK。它通過一個獨立執行緒週期性的呼叫NN上的一個特定介面來獲取NN的健康狀態。FC也有選擇誰作為Active NN的權利,因為最多隻有兩個節點,目前選擇策略還比較簡單(先到先得,輪換)。
    7)JournalNode 高可用情況下存放namenode的editlog檔案。
  3. 簡述Hadoop的幾個預設埠及其含義。
    1)dfs.namenode.http-address:50070
    2)SecondaryNameNode輔助名稱節點埠號:50090
    3)dfs.datanode.address:50010
    4)fs.defaultFS:8020 或者9000
    5)yarn.resourcemanager.webapp.address:8088
    HDFS
  4. HDFS的儲存機制(讀寫流程)。
    HDFS儲存機制,包括HDFS的寫入過程和讀取過程兩個部分

1)客戶端向namenode請求上傳檔案,namenode檢查目標檔案是否已存在,父目錄是否存在。
2)namenode返回是否可以上傳。
3)客戶端請求第一個 block上傳到哪幾個datanode伺服器上。
4)namenode返回3個datanode節點,分別為dn1、dn2、dn3。
5)客戶端請求dn1上傳資料,dn1收到請求會繼續呼叫dn2,然後dn2呼叫dn3,將這個通訊管道建立完成。
6)dn1、dn2、dn3逐級應答客戶端
7)客戶端開始往dn1上傳第一個block(先從磁碟讀取資料放到一個本地記憶體快取),以packet為單位,dn1收到一個packet就會傳給dn2,dn2傳給dn3;dn1每傳一個packet會放入一個應答佇列等待應答
8)當一個block傳輸完成之後,客戶端再次請求namenode上傳第二個block的伺服器。(重複執行3-7步)

1)客戶端向namenode請求下載檔案,namenode通過查詢元資料,找到檔案塊所在的datanode地址。
2)挑選一臺datanode(就近原則,然後隨機)伺服器,請求讀取資料。
3)datanode開始傳輸資料給客戶端(從磁盤裡面讀取資料放入流,以packet為單位來做校驗)。
4)客戶端以packet為單位接收,先在本地快取,然後寫入目標檔案。
2. SecondaryNameNode 工作機制。
1)第一階段:namenode啟動
(1)第一次啟動namenode格式化後,建立fsimage和edits檔案。如果不是第一次啟動,直接載入編輯日誌和映象檔案到記憶體。
(2)客戶端對元資料進行增刪改的請求
(3)namenode記錄操作日誌,更新滾動日誌。
(4)namenode在記憶體中對資料進行增刪改查
2)第二階段:Secondary NameNode工作
(1)Secondary NameNode詢問namenode是否需要checkpoint。直接帶回namenode是否檢查結果。
(2)Secondary NameNode請求執行checkpoint。
(3)namenode滾動正在寫的edits日誌
(4)將滾動前的編輯日誌和映象檔案拷貝到Secondary NameNode
(5)Secondary NameNode載入編輯日誌和映象檔案到記憶體,併合並。
(6)生成新的映象檔案fsimage.chkpoint
(7)拷貝fsimage.chkpoint到namenode
(8)namenode將fsimage.chkpoint重新命名成fsimage
3. NameNode與SecondaryNameNode 的區別與聯絡?
1)機制流程同上;
2)區別
(1)NameNode負責管理整個檔案系統的元資料,以及每一個路徑(檔案)所對應的資料塊資訊。
(2)SecondaryNameNode主要用於定期合併名稱空間映象和名稱空間映象的編輯日誌。
3)聯絡:
(1)SecondaryNameNode中儲存了一份和namenode一致的映象檔案(fsimage)和編輯日誌(edits)。
(2)在主namenode發生故障時(假設沒有及時備份資料),可以從SecondaryNameNode恢復資料。
4. 服役新資料節點和退役舊節點步驟
1)節點上線操作:
當要新上線資料節點的時候,需要把資料節點的名字追加在 dfs.hosts 檔案中
(1)關閉新增節點的防火牆
(2)在 NameNode 節點的 hosts 檔案中加入新增資料節點的 hostname
(3)在每個新增資料節點的 hosts 檔案中加入 NameNode 的 hostname
(4)在 NameNode 節點上增加新增節點的 SSH 免密碼登入的操作
(5)在 NameNode 節點上的 dfs.hosts 中追加上新增節點的 hostname,
(6)在其他節點上執行重新整理操作:hdfs dfsadmin -refreshNodes
(7)在 NameNode 節點上,更改 slaves 檔案,將要上線的資料節點 hostname 追加到 slaves 檔案中
(8)啟動 DataNode 節點
(9)檢視 NameNode 的監控頁面看是否有新增加的節點
2)節點下線操作:
(1)修改/conf/hdfs-site.xml 檔案
(2)確定需要下線的機器,dfs.osts.exclude 檔案中配置好需要下架的機器,這個是阻止下架的機器去連線 NameNode。
(3)配置完成之後進行配置的重新整理操作./bin/hadoop dfsadmin -refreshNodes,這個操作的作用是在後臺進行 block 塊的移動。
(4)當執行三的命令完成之後,需要下架的機器就可以關閉了,可以檢視現在叢集上連線的節點,正在執行 Decommission,會顯示:Decommission Status : Decommission in progress 執行完畢後,會顯示:Decommission Status : Decommissioned
(5)機器下線完畢,將他們從excludes 檔案中移除。
5. Namenode掛了怎麼辦?
方法一:將SecondaryNameNode中資料拷貝到namenode儲存資料的目錄;
方法二:使用-importCheckpoint選項啟動namenode守護程序,從而將SecondaryNameNode中資料拷貝到namenode目錄中。
MapReduce

  1. 談談Hadoop序列化和反序列化及自定義bean物件實現序列化?
    1)序列化和反序列化
    序列化就是把記憶體中的物件,轉換成位元組序列(或其他資料傳輸協議)以便於儲存(持久化)和網路傳輸。
    反序列化就是將收到位元組序列(或其他資料傳輸協議)或者是硬碟的持久化資料,轉換成記憶體中的物件。
    Java的序列化是一個重量級序列化框架(Serializable),一個物件被序列化後,會附帶很多額外的資訊(各種校驗資訊,header,繼承體系等),不便於在網路中高效傳輸。所以,hadoop自己開發了一套序列化機制(Writable),精簡、高效。
    2)自定義bean物件要想序列化傳輸步驟及注意事項:。
    (1)必須實現Writable介面
    (2)反序列化時,需要反射呼叫空參建構函式,所以必須有空參構造
    (3)重寫序列化方法
    (4)重寫反序列化方法
    (5)注意反序列化的順序和序列化的順序完全一致
    (6)要想把結果顯示在檔案中,需要重寫toString(),且用”\t”分開,方便後續用
    (7)如果需要將自定義的bean放在key中傳輸,則還需要實現comparable介面,因為mapreduce框中的shuffle過程一定會對key進行排序

  2. FileInputFormat切片機制
    (1)簡單地按照檔案的內容長度進行切片
    (2)切片大小,預設等於block大小
    (3)切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片

  3. 自定義InputFormat流程
    (1)自定義一個類繼承FileInputFormat
    (2)改寫RecordReader,實現一次讀取一個完整檔案封裝為KV

  4. 如何決定一個job的map和reduce的數量?
    1)map數量
    splitSize=max{minSize,min{maxSize,blockSize}}
    map數量由處理的資料分成的block數量決定default_num = total_size / split_size;
    2)reduce數量
    reduce的數量job.setNumReduceTasks(x);x 為reduce的數量。不設定的話預設為 1。

  5. Maptask的個數由什麼決定?
    一個job的map階段MapTask並行度(個數),由客戶端提交job時的切片個數決定。

  6. MapTask工作機制

    (1)Read階段:Map Task通過使用者編寫的RecordReader,從輸入InputSplit中解析出一個個key/value。
    (2)Map階段:該節點主要是將解析出的key/value交給使用者編寫map()函式處理,併產生一系列新的key/value。
    (3)Collect收集階段:在使用者編寫map()函式中,當資料處理完成後,一般會呼叫OutputCollector.collect()輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫Partitioner),並寫入一個環形記憶體緩衝區中。
    (4)Spill階段:即“溢寫”,當環形緩衝區滿後,MapReduce會將資料寫到本地磁碟上,生成一個臨時檔案。需要注意的是,將資料寫入本地磁碟之前,先要對資料進行一次本地排序,並在必要時對資料進行合併、壓縮等操作。
    溢寫階段詳情:
    步驟1:利用快速排序演算法對快取區內的資料進行排序,排序方式是,先按照分割槽編號partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分割槽為單位聚集在一起,且同一分割槽內所有資料按照key有序。
    步驟2:按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時檔案output/spillN.out(N表示當前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之前,對每個分割槽中的資料進行一次聚集操作。
    步驟3:將分割槽資料的元資訊寫到記憶體索引資料結構SpillRecord中,其中每個分割槽的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大小超過1MB,則將記憶體索引寫到檔案output/spillN.out.index中。
    (5)Combine階段:當所有資料處理完成後,MapTask對所有臨時檔案進行一次合併,以確保最終只會生成一個數據檔案。
    當所有資料處理完後,MapTask會將所有臨時檔案合併成一個大檔案,並儲存到檔案output/file.out中,同時生成相應的索引檔案output/file.out.index。
    在進行檔案合併過程中,MapTask以分割槽為單位進行合併。對於某個分割槽,它將採用多輪遞迴合併的方式。每輪合併io.sort.factor(預設100)個檔案,並將產生的檔案重新加入待合併列表中,對檔案排序後,重複以上過程,直到最終得到一個大檔案。
    讓每個MapTask最終只生成一個數據檔案,可避免同時開啟大量檔案和同時讀取大量小檔案產生的隨機讀取帶來的開銷。

  7. ReduceTask工作機制。

    (1)Copy階段:ReduceTask從各個MapTask上遠端拷貝一片資料,並針對某一片資料,如果其大小超過一定閾值,則寫到磁碟上,否則直接放到記憶體中。
    (2)Merge階段:在遠端拷貝資料的同時,ReduceTask啟動了兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,以防止記憶體使用過多或磁碟上檔案過多。
    (3)Sort階段:按照MapReduce語義,使用者編寫reduce()函式輸入資料是按key進行聚集的一組資料。為了將key相同的資料聚在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現對自己的處理結果進行了區域性排序,因此,ReduceTask只需對所有資料進行一次歸併排序即可。
    (4)Reduce階段:reduce()函式將計算結果寫到HDFS上。

  8. 請描述mapReduce有幾種排序及排序發生的階段。
    1)排序的分類:
    (1)部分排序:
    MapReduce根據輸入記錄的鍵對資料集排序。保證輸出的每個檔案內部排序。
    (2)全排序:
    如何用Hadoop產生一個全域性排序的檔案?最簡單的方法是使用一個分割槽。但該方法在處理大型檔案時效率極低,因為一臺機器必須處理所有輸出檔案,從而完全喪失了MapReduce所提供的並行架構。
    替代方案:首先建立一系列排好序的檔案;其次,串聯這些檔案;最後,生成一個全域性排序的檔案。主要思路是使用一個分割槽來描述輸出的全域性排序。例如:可以為待分析檔案建立3個分割槽,在第一分割槽中,記錄的單詞首字母a-g,第二分割槽記錄單詞首字母h-n, 第三分割槽記錄單詞首字母o-z。
    (3)輔助排序:(GroupingComparator分組)
    Mapreduce框架在記錄到達reducer之前按鍵對記錄排序,但鍵所對應的值並沒有被排序。甚至在不同的執行輪次中,這些值的排序也不固定,因為它們來自不同的map任務且這些map任務在不同輪次中完成時間各不相同。一般來說,大多數MapReduce程式會避免讓reduce函式依賴於值的排序。但是,有時也需要通過特定的方法對鍵進行排序和分組等以實現對值的排序。
    (4)二次排序:
    在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序。
    2)自定義排序WritableComparable
    bean物件實現WritableComparable介面重寫compareTo方法,就可以實現排序
    @Override
    public int compareTo(FlowBean o) {
    // 倒序排列,從大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
    }
    3)排序發生的階段:
    (1)一個是在map side發生在spill後partition前。
    (2)一個是在reduce side發生在copy後 reduce前。

  9. 請描述mapReduce中shuffle階段的工作流程,如何優化shuffle階段?
    分割槽,排序,溢寫,拷貝到對應reduce機器上,增加combiner,壓縮溢寫的檔案。

  10. 請描述mapReduce中combiner的作用是什麼,一般使用情景,哪些情況不需要,及和reduce的區別?
    1)Combiner的意義就是對每一個maptask的輸出進行區域性彙總,以減小網路傳輸量。
    2)Combiner能夠應用的前提是不能影響最終的業務邏輯,而且,Combiner的輸出kv應該跟reducer的輸入kv型別要對應起來。
    3)Combiner和reducer的區別在於執行的位置。
    Combiner是在每一個maptask所在的節點執行;
    Reducer是接收全域性所有Mapper的輸出結果。

  11. Mapreduce的工作原理,請舉例子說明mapreduce是怎麼執行的?

  12. 如果沒有定義partitioner,那資料在被送達reducer前是如何被分割槽的?
    如果沒有自定義的 partitioning,則預設的 partition 演算法,即根據每一條資料的 key
    的 hashcode 值摸運算(%)reduce 的數量,得到的數字就是“分割槽號”。

  13. MapReduce 怎麼實現 TopN?
    可以自定義groupingcomparator,或者在map端對資料進行排序,然後再reduce輸出時,控制只輸出前n個數。就達到了topn輸出的目的。

  14. 有可能使 Hadoop 任務輸出到多個目錄中麼?如果可以,怎麼做?
    1)可以輸出到多個目錄中,採用自定義OutputFormat。
    2)實現步驟:
    (1)自定義outputformat,
    (2)改寫recordwriter,具體改寫輸出資料的方法write()

  15. 簡述hadoop實現join的幾種方法及每種方法的實現。
    1)reduce side join
    Map端的主要工作:為來自不同表(檔案)的key/value對打標籤以區別不同來源的記錄。然後用連線欄位作為key,其餘部分和新加的標誌作為value,最後進行輸出。
    Reduce端的主要工作:在reduce端以連線欄位作為key的分組已經完成,我們只需要在每一個分組當中將那些來源於不同檔案的記錄(在map階段已經打標誌)分開,最後進行合併就ok了。
    2)map join
    在map端快取多張表,提前處理業務邏輯,這樣增加map端業務,減少reduce端資料的壓力,儘可能的減少資料傾斜。
    具體辦法:採用distributedcache
    (1)在mapper的setup階段,將檔案讀取到快取集合中。
    (2)在驅動函式中載入快取。
    job.addCacheFile(new URI(“file:/e:/mapjoincache/pd.txt”));// 快取普通檔案到task執行節點

  16. 請簡述hadoop怎樣實現二級排序。
    對map端輸出的key進行排序,實現的compareTo方法。 在compareTo方法中排序的條件有二個。

  17. 參考下面的MR系統的場景:
    –hdfs塊的大小為128MB
    –輸入型別為FileInputFormat
    –有三個檔案的大小分別是:64KB 130MB 260MB
    Hadoop框架會把這些檔案拆分為多少塊?
    4塊:64K,130M,128M,132M

  18. Hadoop中RecordReader的作用是什麼?
    (1)以怎樣的方式從分片中讀取一條記錄,每讀取一條記錄都會呼叫RecordReader類;
    (2)系統預設的RecordReader是LineRecordReader
    (3)LineRecordReader是用每行的偏移量作為map的key,每行的內容作為map的value;
    (4)應用場景:自定義讀取每一條記錄的方式;自定義讀入key的型別,如希望讀取的key是檔案的路徑或名字而不是該行在檔案中的偏移量。

  19. 給你一個1G的資料檔案。分別有id,name,mark,source四個欄位,按照mark分組,id排序,手寫一個MapReduce?其中有幾個Mapper?
    在map端對mark排序,在reduce端對id分組。
    @Override
    public int compareTo(GroupBean o) {
    int result;

    result = this.id > o.id ? -1 : 1;
    
    return result;
    

    }

@Override
public int compare(WritableComparable a, WritableComparable b) {

	GroupBean aBean = (GroupBean) a;
	GroupBean bBean = (GroupBean) b;

	int result;
	if (aBean.getMark() > bBean. getMark()) {
		result = 1;
	} else if (aBean. getMark() < bBean. getMark()) {
		result = -1;
	} else {
		result = 0;
	}
	return result;
}

2)幾個mapper
(1)1024m/128m=8塊
Yarn

  1. 簡述Hadoop1與Hadoop2 的架構異同。
    加入了yarn解決了資源排程的問題。
    加入了對zookeeper的支援實現比較可靠的高可用。
  2. 為什麼會產生yarn,它解決了什麼問題,有什麼優勢?
    Yarn最主要的功能就是解決執行的使用者程式與yarn框架完全解耦。
    Yarn上可以執行各種型別的分散式運算程式(mapreduce只是其中的一種),比如mapreduce、storm程式,spark程式……
  3. MR作業提交全過程。
    1)作業提交過程之YARN

2)作業提交過程之MapReduce

3)作業提交過程之讀資料

4)作業提交過程之寫資料

  1. HDFS的資料壓縮演算法?及每種演算法的應用場景?
    1)gzip壓縮
    優點:壓縮率比較高,而且壓縮/解壓速度也比較快;hadoop本身支援,在應用中處理gzip格式的檔案就和直接處理文字一樣;大部分linux系統都自帶gzip命令,使用方便。
    缺點:不支援split。
    應用場景:當每個檔案壓縮之後在130M以內的(1個塊大小內),都可以考慮用gzip壓縮格式。例如說一天或者一個小時的日誌壓縮成一個gzip檔案,執行mapreduce程式的時候通過多個gzip檔案達到併發。hive程式,streaming程式,和java寫的mapreduce程式完全和文字處理一樣,壓縮之後原來的程式不需要做任何修改。
    2)Bzip2壓縮
    優點:支援split;具有很高的壓縮率,比gzip壓縮率都高;hadoop本身支援,但不支援native;在linux系統下自帶bzip2命令,使用方便。
    缺點:壓縮/解壓速度慢;不支援native。
    應用場景:適合對速度要求不高,但需要較高的壓縮率的時候,可以作為mapreduce作業的輸出格式;或者輸出之後的資料比較大,處理之後的資料需要壓縮存檔減少磁碟空間並且以後資料用得比較少的情況;或者對單個很大的文字檔案想壓縮減少儲存空間,同時又需要支援split,而且相容之前的應用程式(即應用程式不需要修改)的情況。
    3)Lzo壓縮
    優點:壓縮/解壓速度也比較快,合理的壓縮率;支援split,是hadoop中最流行的壓縮格式;可以在linux系統下安裝lzop命令,使用方便。
    缺點:壓縮率比gzip要低一些;hadoop本身不支援,需要安裝;在應用中對lzo格式的檔案需要做一些特殊處理(為了支援split需要建索引,還需要指定inputformat為lzo格式)。
    應用場景:一個很大的文字檔案,壓縮之後還大於200M以上的可以考慮,而且單個檔案越大,lzo優點越越明顯。
    4)Snappy壓縮
    優點:高速壓縮速度和合理的壓縮率。
    缺點:不支援split;壓縮率比gzip要低;hadoop本身不支援,需要安裝;
    應用場景:當Mapreduce作業的Map輸出的資料比較大的時候,作為Map到Reduce的中間資料的壓縮格式;或者作為一個Mapreduce作業的輸出和另外一個Mapreduce作業的輸入。

  2. Hadoop的排程器總結。
    目前,Hadoop作業排程器主要有三種:FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2預設的資源排程器是Capacity Scheduler。
    具體設定詳見:yarn-default.xml檔案

    The class to use as the resource scheduler.
    yarn.resourcemanager.scheduler.class
    org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler

      1)先進先出排程器(FIFO)
    2)容量排程器(Capacity Scheduler)
    3)公平排程器(Fair Scheduler)

  3. mapreduce推測執行演算法及原理。
    1)作業完成時間取決於最慢的任務完成時間
    一個作業由若干個Map任務和Reduce任務構成。因硬體老化、軟體Bug等,某些任務可能執行非常慢。
    典型案例:系統中有99%的Map任務都完成了,只有少數幾個Map老是進度很慢,完不成,怎麼辦?
    2)推測執行機制:
    發現拖後腿的任務,比如某個任務執行速度遠慢於任務平均速度。為拖後腿任務啟動一個備份任務,同時執行。誰先執行完,則採用誰的結果。
    3)執行推測任務的前提條件
    (1)每個task只能有一個備份任務;
    (2)當前job已完成的task必須不小於0.05(5%)
    (3)開啟推測執行引數設定。Hadoop2.7.2 mapred-site.xml檔案中預設是開啟的。

    mapreduce.map.speculative
    true
    If true, then multiple instances of some map tasks
    may be executed in parallel.

mapreduce.reduce.speculative true If true, then multiple instances of some reduce tasks may be executed in parallel. 4)不能啟用推測執行機制情況 (1)任務間存在嚴重的負載傾斜; (2)特殊任務,比如任務向資料庫中寫資料。 5)演算法原理:

優化

  1. mapreduce 跑的慢的原因?
    Mapreduce 程式效率的瓶頸在於兩點:
    1)計算機效能
    CPU、記憶體、磁碟健康、網路
    2)I/O 操作優化
    (1)資料傾斜
    (2)map和reduce數設定不合理
    (3)reduce等待過久
    (4)小檔案過多
    (5)大量的不可分塊的超大檔案
    (6)spill次數過多
    (7)merge次數過多等。
  2. mapreduce 優化方法。
    1)資料輸入:
    (1)合併小檔案:在執行mr任務前將小檔案進行合併,大量的小檔案會產生大量的map任務,增大map任務裝載次數,而任務的裝載比較耗時,從而導致 mr 執行較慢。
    (2)採用ConbinFileInputFormat來作為輸入,解決輸入端大量小檔案場景。
    2)map階段
    (1)減少spill次數:通過調整io.sort.mb及sort.spill.percent引數值,增大觸發spill的記憶體上限,減少spill次數,從而減少磁碟 IO。
    (2)減少merge次數:通過調整io.sort.factor引數,增大merge的檔案數目,減少merge的次數,從而縮短mr處理時間。
    (3)在 map 之後先進行combine處理,減少 I/O。
    3)reduce階段
    (1)合理設定map和reduce數:兩個都不能設定太少,也不能設定太多。太少,會導致task等待,延長處理時間;太多,會導致 map、reduce任務間競爭資源,造成處理超時等錯誤。
    (2)設定map、reduce共存:調整slowstart.completedmaps引數,使map執行到一定程度後,reduce也開始執行,減少reduce的等待時間。
    (3)規避使用reduce,因為Reduce在用於連線資料集的時候將會產生大量的網路消耗。
    (4)合理設定reduc端的buffer,預設情況下,資料達到一個閾值的時候,buffer中的資料就會寫入磁碟,然後reduce會從磁碟中獲得所有的資料。也就是說,buffer和reduce是沒有直接關聯的,中間多個一個寫磁碟->讀磁碟的過程,既然有這個弊端,那麼就可以通過引數來配置,使得buffer中的一部分資料可以直接輸送到reduce,從而減少IO開銷:mapred.job.reduce.input.buffer.percent,預設為0.0。當值大於0的時候,會保留指定比例的記憶體讀buffer中的資料直接拿給reduce使用。這樣一來,設定buffer需要記憶體,讀取資料需要記憶體,reduce計算也要記憶體,所以要根據作業的執行情況進行調整。
    4)IO傳輸
    (1)採用資料壓縮的方式,減少網路IO的的時間。安裝Snappy和LZOP壓縮編碼器。
    (2)使用SequenceFile二進位制檔案
    5)資料傾斜問題
    (1)資料傾斜現象
    資料頻率傾斜——某一個區域的資料量要遠遠大於其他區域。
    資料大小傾斜——部分記錄的大小遠遠大於平均值。
    (2)如何收集傾斜資料
    在reduce方法中加入記錄map輸出鍵的詳細情況的功能。
    public static final String MAX_VALUES = “skew.maxvalues”;
    private int maxValueThreshold;

@Override
public void configure(JobConf job) {
maxValueThreshold = job.getInt(MAX_VALUES, 100);
}
@Override
public void reduce(Text key, Iterator values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
int i = 0;
while (values.hasNext()) {
values.next();
i++;
}

 if (++i > maxValueThreshold) {
     log.info("Received " + i + " values for key " + key);
 }

}
(3)減少資料傾斜的方法
方法1:抽樣和範圍分割槽
可以通過對原始資料進行抽樣得到的結果集來預設分割槽邊界值。
方法2:自定義分割槽
另一個抽樣和範圍分割槽的替代方案是基於輸出鍵的背景知識進行自定義分割槽。例如,如果map輸出鍵的單詞來源於一本書。其中大部分必然是省略詞(stopword)。那麼就可以將自定義分割槽將這部分省略詞傳送給固定的一部分reduce例項。而將其他的都發送給剩餘的reduce例項。
方法3:Combine
使用Combine可以大量地減小資料頻率傾斜和資料大小傾斜。在可能的情況下,combine的目的就是聚合並精簡資料。
6)常用的調優引數
(1)資源相關引數
(a)以下引數是在使用者自己的mr應用程式中配置就可以生效(mapred-default.xml)
配置引數 引數說明
mapreduce.map.memory.mb 一個Map Task可使用的資源上限(單位:MB),預設為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。
mapreduce.reduce.memory.mb 一個Reduce Task可使用的資源上限(單位:MB),預設為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。
mapreduce.map.cpu.vcores 每個Map task可使用的最多cpu core數目,預設值: 1
mapreduce.reduce.cpu.vcores 每個Reduce task可使用的最多cpu core數目,預設值: 1
mapreduce.reduce.shuffle.parallelcopies 每個reduce去map中拿資料的並行數。預設值是5
mapreduce.reduce.shuffle.merge.percent buffer中的資料達到多少比例開始寫入磁碟。預設值0.66
mapreduce.reduce.shuffle.input.buffer.percent buffer大小佔reduce可用記憶體的比例。預設值0.7
mapreduce.reduce.input.buffer.percent 指定多少比例的記憶體用來存放buffer中的資料,預設值是0.0
(b)應該在yarn啟動之前就配置在伺服器的配置檔案中才能生效(yarn-default.xml)
配置引數 引數說明
yarn.scheduler.minimum-allocation-mb 1024 給應用程式container分配的最小記憶體
yarn.scheduler.maximum-allocation-mb 8192 給應用程式container分配的最大記憶體
yarn.scheduler.minimum-allocation-vcores 1 每個container申請的最小CPU核數
yarn.scheduler.maximum-allocation-vcores 32 每個container申請的最大CPU核數
yarn.nodemanager.resource.memory-mb 8192 給containers分配的最大實體記憶體
(c)shuffle效能優化的關鍵引數,應在yarn啟動之前就配置好(mapred-default.xml)
配置引數 引數說明
mapreduce.task.io.sort.mb 100 shuffle的環形緩衝區大小,預設100m
mapreduce.map.sort.spill.percent 0.8 環形緩衝區溢位的閾值,預設80%
(2)容錯相關引數(mapreduce效能優化)
配置引數 引數說明
mapreduce.map.maxattempts 每個Map Task最大重試次數,一旦重試引數超過該值,則認為Map Task執行失敗,預設值:4。
mapreduce.reduce.maxattempts 每個Reduce Task最大重試次數,一旦重試引數超過該值,則認為Map Task執行失敗,預設值:4。
mapreduce.task.timeout Task超時時間,經常需要設定的一個引數,該引數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的資料,也沒有輸出資料,則認為該task處於block狀態,可能是卡住了,也許永遠會卡主,為了防止因為使用者程式永遠block住不退出,則強制設定了一個該超時時間(單位毫秒),預設是600000。如果你的程式對每條輸入資料的處理時間過長(比如會訪問資料庫,通過網路拉取資料等),建議將該引數調大,該引數過小常出現的錯誤提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

  1. HDFS小檔案優化方法。
    1)HDFS小檔案弊端
    HDFS上每個檔案都要在namenode上建立一個索引,這個索引的大小約為150byte,這樣當小檔案比較多的時候,就會產生很多的索引檔案,一方面會大量佔用namenode的記憶體空間,另一方面就是索引檔案過大是的索引速度變慢。
    2)解決方案
    1)Hadoop Archive:
    是一個高效地將小檔案放入HDFS塊中的檔案存檔工具,它能夠將多個小檔案打包成一個HAR檔案,這樣在減少namenode記憶體使用的同時。
    2)Sequence file:
    sequence file由一系列的二進位制key/value組成,如果key為檔名,value為檔案內容,則可以將大批小檔案合併成一個大檔案。
    3)CombineFileInputFormat:
    CombineFileInputFormat是一種新的inputformat,用於將多個檔案合併成一個單獨的split,另外,它會考慮資料的儲存位置。
    4)開啟JVM重用
    對於大量小檔案Job,可以開啟JVM重用會減少45%執行時間。
    JVM重用理解:一個map執行一個jvm,重用的話,在一個map在jvm上執行完畢後,jvm繼續執行其他jvm
    具體設定:mapreduce.job.jvm.numtasks值在10-20之間。
  2. MapReduce怎麼解決資料均衡問題,如何確定分割槽號?
    資料均衡問題指的就是某個節點或者某幾個節點的任務執行的比較慢,拖慢了整個Job的進度。實際上資料均衡問題就是資料傾斜問題,解決方案同解決資料傾斜的方案。
    MapReduce中分割槽預設是按hashcode來分的,使用者可以自定義分割槽類,需要繼承系統的Partitioner類,重寫getPartition()方法即可。
  3. Hadoop中job和Tasks之間的區別是什麼?
    編寫好的一個程式,我們稱為Mapreduce程式,一個Mapreduce程式就是一個Job,而一個Job裡面可以有一個或多個Task,Task又可以區分為Map Task和Reduce Task.