【譯】Yarn上常駐Spark-Streaming程序調優

分類:IT技術 時間:2017-10-01

作者從容錯、性能等方面優化了長時間運行在yarn上的spark-Streaming作業

 

 

對於長時間運行的Spark Streaming作業,一旦提交到YARN群集便需要永久運行,直到有意停止。任何中斷都會引起嚴重的處理延遲,並可能導致數據丟失或重復。YARN和Apache Spark都不是為了執行長時間運行的服務而設計的。但是,它們已經成功地滿足了近實時數據處理作業的常駐需求。成功並不一定意味著沒有技術挑戰。

這篇博客總結了在安全的YARN集群上,運行一個關鍵任務且長時間的Spark Streaming作業的經驗。您將學習如何將Spark Streaming應用程序提交到YARN群集,以避免在值班時候的不眠之夜。

 

Fault tolerance

 在YARN集群模式下,Spark驅動程序與Application Master(應用程序分配的第一個YARN容器)在同一容器中運行。此過程負責從YARN 驅動應用程序和請求資源(Spark執行程序)。重要的是,Application Master消除了在應用程序生命周期中運行的任何其他進程的需要。即使一個提交Spark Streaming作業的邊緣Hadoop節點失敗,應用程序也不會受到影響。

要以集群模式運行Spark Streaming應用程序,請確保為spark-submit命令提供以下參數:

spark-submit --master yarn --deploy-mode cluster

由於Spark驅動程序和Application Master共享一個JVM,Spark驅動程序中的任何錯誤都會阻止我們長期運行的工作。幸運的是,可以配置重新運行應用程序的最大嘗試次數。設置比默認值2更高的值是合理的(從YARN集群屬性yarn.resourcemanager.am.max嘗試中導出)。對我來說,4工作相當好,即使失敗的原因是永久性的,較高的值也可能導致不必要的重新啟動。

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4

如果應用程序運行數天或數周,而不重新啟動或重新部署在高度使用的群集上,則可能在幾個小時內耗盡4次嘗試。為了避免這種情況,嘗試計數器應該在每個小時都重置。

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h

 

另一個重要的設置是在應用程序發生故障之前executor失敗的最大數量。默認情況下是max(2 * num executors,3),非常適合批處理作業,但不適用於長時間運行的作業。該屬性具有相應的有效期間,也應設置。

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h

對於長時間運行的作業,您也可以考慮在放棄作業之前提高任務失敗的最大數量。默認情況下,任務將重試4次,然後作業失敗。

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8

 

Performance

當Spark Streaming應用程序提交到集群時,必須定義運行作業的YARN隊列。我強烈建議使用YARN Capacity Scheduler並將長時間運行的作業提交到單獨的隊列。沒有一個單獨的YARN隊列,您的長時間運行的工作遲早將被的大量Hive查詢搶占。

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue

 

Spark Streaming工作的另一個重要問題是保持處理時間的穩定性和高度可預測性。處理時間應保持在批次持續時間以下以避免延誤。我發現Spark的推測執行有很多幫助,特別是在繁忙的群集中。當啟用推測性執行時,批處理時間更加穩定。只有當Spark操作是冪等時,才能啟用推測模式。

spark-submit --master yarn --deploy-mode cluster \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures={8 * num_executors} \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    --queue realtime_queue \
    --conf spark.speculation=true

 

Security

在安全的HDFS群集上,長時間運行的Spark Streaming作業由於Kerberos票據到期而失敗。沒有其他設置,當Spark Streaming作業提交到集群時,會發布Kerberos票證。當票證到期時Spark Streaming作業不能再從HDFS寫入或讀取數據。

在理論上(基於文檔),應該將Kerberos主體和keytab作為spark-submit命令傳遞:

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab

 

實際上,由於幾個錯誤(HDFS-9276, SPARK-11182)必須禁用HDFS緩存。如果沒有,Spark將無法從HDFS上的文件讀取更新的令牌。

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true

Mark Grover指出,這些錯誤只影響在HA模式下配置了NameNodes的HDFS集群。謝謝,馬克

 

 

Logging

訪問Spark應用程序日誌的最簡單方法是配置Log4j控制臺追加程序,等待應用程序終止並使用yarn logs -applicationId [applicationId]命令。不幸的是終止長時間運行的Spark Streaming作業來訪問日誌是不可行的。

我建議安裝和配置Elastic,Logstash和Kibana(ELK套裝)。ELK的安裝和配置是超出了這篇博客的範圍,但請記住記錄以下上下文字段:

  • YARN application id
  • YARN container hostname
  • Executor id (Spark driver is always 000001, Spark executors start from 000002)
  • YARN attempt (to check how many times Spark driver has been restarted)

Log4j配置使用Logstash特定的appender和布局定義應該傳遞給spark-submit命令:

spark-submit --master yarn --deploy-mode cluster \
     --conf spark.yarn.maxAppAttempts=4 \
     --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
     --conf spark.yarn.max.executor.failures={8 * num_executors} \
     --conf spark.yarn.executor.failuresValidityInterval=1h \
     --conf spark.task.maxFailures=8 \
     --queue realtime_queue \
     --conf spark.speculation=true \
     --principal user/hostname@domain \
     --keytab /path/to/foo.keytab \
     --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \
     --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties \
     --files /path/to/log4j.properties

最後,Spark Job的Kibana儀表板可能如下所示:

 

 

Monitoring

長時間運行的工作全天候運行,所以了解歷史指標很重要。Spark UI僅在有限數量的批次中保留統計信息,並且在重新啟動後,所有度量標準都消失了。再次,需要外部工具。我建議安裝Graphite用於收集指標和Grafana來建立儀表板。

首先,Spark需要配置為將指標報告給Graphite,準備metrics.properties文件:

 

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port]
*.sink.graphite.prefix=some_meaningful_name

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

 

Graceful stop

最後一個難題是如何以優雅的方式停止部署在YARN上的Spark Streaming應用程序。停止(甚至殺死)YARN應用程序的標準方法是使用命令yarn application -kill [applicationId]。這個命令會停止Spark Streaming應用程序,但這可能發生在批處理中。因此,如果該作業是從Kafka讀取數據然後在HDFS上保存處理結果,並最終提交Kafka偏移量,當作業在提交偏移之前停止工作時,您應該預見到HDFS會有重復的數據。

解決優雅關機問題的第一個嘗試是在關閉程序時回調Spark Streaming Context的停止方法。

sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

令人失望的是,由於Spark應用程序幾乎立即被殺死,一個退出回調函數來不及完成已啟動的批處理任務。此外,不能保證JVM會調用shutdown hook。

在撰寫本博客文章時,唯一確認的YARN Spark Streaming應用程序的確切方法是通知應用程序關於計劃關閉,然後以編程方式停止流式傳輸(但不是關閉掛鉤)。命令yarn application -kill 如果通知應用程序在定義的超時後沒有停止,則應該僅用作最後手段。

可以使用HDFS上的標記文件(最簡單的方法)或使用驅動程序上公開的簡單Socket / HTTP端點(復雜方式)通知應用程序。

因為我喜歡KISS原理,下面你可以找到shell腳本偽代碼,用於啟動/停止Spark Streaming應用程序使用標記文件:

start() {
    hdfs dfs -touchz /path/to/marker/my_job_unique_name
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/my_job_unique_name
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}

在Spark Streaming應用程序中,後臺線程應該監視標記文件,當文件消失時停止上下文調用

streamingContext.stop(stopSparkContext = true, stopGracefully = true).

 

Summary

 可以看到,部署在YARN上的關鍵任務Spark Streaming應用程序的配置相當復雜。以上提出的技術,由一些非常聰明的開發人員經過漫長而冗長乏味的叠代學習。最終,部署在高可用的YARN集群上的長期運行的Spark Streaming應用非常穩定。

 

 

 

 

原文地址:http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/


Tags: 運行 長時 應用程序 Spark YARN Streaming

文章來源:


ads
ads

相關文章
ads

相關文章

ad