1. 程式人生 > >spark記錄單個task卡住的,導致作業不結束的問題

spark記錄單個task卡住的,導致作業不結束的問題

實際上是由於資料的傾斜問題,採用reparation將資料重分割槽就ok了,還有一點可以加入spark推測機制來容錯複雜的叢集網路環境,可能由於某個單節點存在異常,網路不穩定或是磁碟io滿了,使用推測顯得尤為重要。以下為常見可呼叫引數:

  1. 資源相關引數
    (1) mapreduce.map.memory.mb: 一個Map Task可使用的資源上限(單位:MB),預設為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。
    (2) mapreduce.reduce.memory.mb: 一個Reduce Task可使用的資源上限(單位:MB),預設為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。
    (3) mapreduce.map.java.opts: Map Task的JVM引數,你可以在此配置預設的java heap size等引數, e.g.
    “-Xmx1024m -verbose:gc -Xloggc:/tmp/@
    [email protected]
    ” (@[email protected]會被Hadoop框架自動換為相應的taskid), 預設值: “”
    (4) mapreduce.reduce.java.opts: Reduce Task的JVM引數,你可以在此配置預設的java heap size等引數, e.g.
    “-Xmx1024m -verbose:gc -Xloggc:/tmp/@[email protected]”, 預設值: “”
    (5) mapreduce.map.cpu.vcores: 每個Map task可使用的最多cpu core數目, 預設值: 1
    (6) mapreduce.map.cpu.vcores: 每個Reduce task可使用的最多cpu core數目, 預設值: 1
  2. 容錯相關引數
    (1) mapreduce.map.maxattempts: 每個Map Task最大重試次數,一旦重試引數超過該值,則認為Map Task執行失敗,預設值:4。
    (2) mapreduce.reduce.maxattempts: 每個Reduce Task最大重試次數,一旦重試引數超過該值,則認為Map Task執行失敗,預設值:4。
    (3) mapreduce.map.failures.maxpercent: 當失敗的Map Task失敗比例超過該值為,整個作業則失敗,預設值為0. 如果你的應用程式允許丟棄部分輸入資料,則該該值設為一個大於0的值,比如5,表示如果有低於5%的Map Task失敗(如果一個Map Task重試次數超過mapreduce.map.maxattempts,則認為這個Map Task失敗,其對應的輸入資料將不會產生任何結果),整個作業扔認為成功。
    (4) mapreduce.reduce.failures.maxpercent: 當失敗的Reduce Task失敗比例超過該值為,整個作業則失敗,預設值為0.
    (5) mapreduce.task.timeout: Task超時時間,經常需要設定的一個引數,該引數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的資料,也沒有輸出資料,則認為該task處於block狀態,可能是卡住了,也許永遠會卡主,為了防止因為使用者程式永遠block住不退出,則強制設定了一個該超時時間(單位毫秒),預設是300000。如果你的程式對每條輸入資料的處理時間過長(比如會訪問資料庫,通過網路拉取資料等),建議將該引數調大,該引數過小常出現的錯誤提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
  3. 本地執行mapreduce 作業
    設定以下幾個引數:
    mapreduce.framework.name=local
    mapreduce.jobtracker.address=local
    fs.defaultFS=local
  4. 效率和穩定性相關引數
    (1) mapreduce.map.speculative: 是否為Map Task開啟推測執行機制,預設為false
    (2) mapreduce.reduce.speculative: 是否為Reduce Task開啟推測執行機制,預設為false
    (3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:當同一個class同時出現在使用者jar包和hadoop jar中時,優先使用哪個jar包中的class,預設為false,表示優先使用hadoop jar中的class。
    (4) mapreduce.input.fileinputformat.split.minsize: 每個Map Task處理的資料量(僅針對基於檔案的Inputformat有效,比如TextInputFormat,SequenceFileInputFormat),預設為一個block大小,即 134217728。
    HBase 相關配置引數
    (1) hbase.rpc.timeout:rpc的超時時間,預設60s,不建議修改,避免影響正常的業務,在線上環境剛開始配置的是3秒,執行半天后發現了大量的timeout error,原因是有一個region出現瞭如下問題阻塞了寫操作:“Blocking updates … memstore size 434.3m is >= than blocking 256.0m size”可見不能太低。
    (2) ipc.socket.timeout:socket建立連結的超時時間,應該小於或者等於rpc的超時時間,預設為20s
    (3) hbase.client.retries.number:重試次數,預設為14,可配置為3
    (4) hbase.client.pause:重試的休眠時間,預設為1s,可減少,比如100ms
    (5) hbase.regionserver.lease.period:scan查詢時每次與server互動的超時時間,預設為60s,可不調整。
    Spark 相關配置引數
  5. 效率及穩定性相關引數
    建議開啟map(注意,在spark引擎中,也只有map和reduce兩種task,spark叫ShuffleMapTask和ResultTask)中間結果合併及推測執行功能:
    spark.shuffle.consolidateFiles=true

  6. 容錯相關引數
    建議將這些值調大,比如:
    spark.task.maxFailures=8
    spark.akka.timeout=300
    spark.network.timeout=300
    spark.yarn.max.executor.failures=100

3.推測機制引數
與以下幾個引數有關:
spark.speculation=trure
1. spark.speculation.interval 100:檢測週期,單位毫秒;
2. spark.speculation.quantile 0.75:完成task的百分比時啟動推測;
3. spark.speculation.multiplier 1.5:比其他的慢多少倍時啟動推測。