1. 程式人生 > >Spark on Yarn遇到的幾個問題

Spark on Yarn遇到的幾個問題

添加 shuffle tasks pil 生產 當前 lis file 被拒

1 概述

Spark的on Yarn模式。其資源分配是交給Yarn的ResourceManager來進行管理的。可是眼下的Spark版本號,Application日誌的查看,僅僅能通過Yarn的yarn logs命令實現。

在部署和執行Spark Application的過程中,假設不註意一些小的細節,或許會導致一些問題的出現。

2 防火墻

部署好Spark的包和配置文件,on yarn的兩種模式都無法執行,在NodeManager端的日誌都是說Connection Refused,連接不上Driver所在的client節點,可是client的80port能夠正常訪問!同一時候,在日誌中有類似信息出現:

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

內存肯定是夠的。但就是無法獲取資源。檢查防火墻,果然client僅僅開啟的對80port的訪問,其它都禁止了!

假設你的程序在執行的時候也有類似連接被拒絕的情況。最好也是先檢查下防火墻的配置!

3 Spark Driver程序host的指定

部署完Spark後,分別使用yarn-cluster模式和yarn-client模式執行Spark自帶的計算pi的演示樣例。

Spark的一些配置文件除了一些基本屬性外,均未做配置,結果執行的時候兩種執行模式出現了不同的狀況。

yarn-cluster模式可以正常執行,yarn-client模式總是執行失敗。查看ResourceManager、NodeManager端的日誌。發現程序總是找不到ApplicationMaster,這就奇怪了!

而且,客戶端的Driver程序開啟的port。在NodeManager端訪問被拒絕。非Spark的其它MR任務。可以正常執行。

檢查client配置文件。發現原來在client的/etc/hosts文件裏。client的一個IP相應了多個Host,Driver程序會默認去取最後相應的那個Host,比方是hostB,可是在NodeManager端是配置的其它Host。hostA。所以導致程序無法訪問。為了不影響其它的程序使用client的Host列表,這裏在Spark配置文件spark-defaults.conf

中使用屬性spark.driver.host來指定yarn-client模式執行中和Yarn通信的DriverHost。此時yarn-client模式能夠正常執行。

上面配置完了之後,發現yarn-cluster模式又不能執行了!

想想原因,肯定是上面那個配置參數搞的鬼,凝視掉之後,yarn-cluster模式能夠繼續執行。原因是,yarn-cluster模式下。spark的入口函數是在client執行,可是Driver的其它功能是在ApplicationMaster中執行的。上面的那個配置相當於指定了ApplicationMaster的地址,實際上的ApplicationMaster在yarn-master模式下是由ResourceManager隨機指定的。

4 on Yarn日誌的查看

測試環境下,通過yarn logs -applicationId xxx能夠查看執行結束的Application的日誌,可是搞到還有一個環境下發現使用上述命令查看日誌時,總是提演示樣例如以下信息:

Logs not available at /tmp/nm/remote/logs/hadoop/logs/application_xxx_xxx

Log aggregation has not completed or is not enabled.

去相應的NodeManger文件夾下,確實找不到日誌文件。

可是/tmp/nm/remote/logs卻是在yarn-site.xml中指定了的文件夾。這個是對的,究竟什麽原因呢?難道是Yarn的日誌聚集沒有起作用?

去NodeManager上查看相應Application的日誌:

2014-08-04 09:14:47,513 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Starting aggregate log-file for app application_xxx_xxx at /tmp/nm/remote/logs/spark/logs/application_xxx_xxx/hostB.tmp

2014-08-04 09:14:47,525 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Uploading logs for container container_xxx_xxx_01_000007. Current good log dirs are /data/nm/log

2014-08-04 09:14:47,526 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Uploading logs for container container_xxx_xxx_000001. Current good log dirs are /data/nm/log

2014-08-04 09:14:47,526 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Deleting path : /data/nm/log/application_xxx_xxx

2014-08-04 09:14:47,607 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Finished aggregate log-file for app application_xxx_xxx

可見,日誌聚集確實起作用了,可是為什麽通過命令不能查看!

猛然見看到日誌中“/tmp/nm/remote/logs/spark/logs/ application_xxx_xxx/hostB.tmp”。日誌的路徑有問題,在使用yarn logs命令查看的時候。用的是hadoop用戶。實際Spark Application的提交運行用的是spark用戶,而yarn logs命令默認去找的是當前用戶的路徑,這就是查看不到日誌的原因。切換到spark用戶再查看,日誌最終出來了。

5 LZO相關問題

假設在Spark中使用了LZO作為EventLog的的壓縮算法等,就得實現安裝好LZO這個東東,否則會出現類似於例如以下的異常:

Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.

 at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:134)

 at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:174)

 at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)

 ... 66 more

Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found

 at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1680)

 at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:127)

 ... 68 more

或者

[ERROR] [2014-08-05 10:34:41 933] com.hadoop.compression.lzo.GPLNativeCodeLoader [main] (GPLNativeCodeLoader.java:36) Could not load native gpl library

java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path

解決的方法就是得安裝好LZO,而且在HDFS、SPARK中配置好相關的包、文件等,詳細步驟見:

http://find.searchhub.org/document/a128707a98fe4ec6

https://github.com/twitter/hadoop-lzo/blob/master/README.md

http://hsiamin.com/posts/2014/05/03/enable-lzo-compression-on-hadoop-pig-and-spark/

6 Spark Hive無法訪問Mysql的問題

生產環境下,節點之間肯定是有防火墻限制的。並且Hive的元數據庫Mysql,更是對請求的IP和用戶等限制的嚴格,假設在Spark集群中使用yarn-cluster模式進行提交Spark的Application。其執行時Driver是和ApplicationMaster執行在一起。由Yarn的ResourceManager負責分配到集群中的某個NodeManager節點上,假設在Hive-site.xml中僅僅配置了Mysql數據庫而沒有配置MetaStore的話,或許會遇到連接元數據庫失敗的問題,此時,就得看下Hive-site.xml的配置,是否Mysql的相關權限配置正確、MetaStore服務能否夠正常連接。

7 內存溢出問題

在Spark中使用hql方法運行hive語句時。因為其在查詢過程中調用的是Hive的獲取元數據信息、SQL解析,而且使用Cglib等進行序列化反序列化。中間可能產生較多的class文件,導致JVM中的持久代使用較多,假設配置不當,可能引起類似於例如以下的OOM問題:

Exception in thread "Thread-2" java.lang.OutOfMemoryError: PermGen space

原因是實際使用時。假設用的是JDK1.6版本號,Server模式的持久代默認大小是64M,Client模式的持久代默認大小是32M,而Driver端進行SQL處理時。其持久代的使用可能會達到90M,導致OOM溢出。任務失敗。

解決方法就是在Spark的conf文件夾中的spark-defaults.conf裏。添加對Driver的JVM配置。由於Driver才負責SQL的解析和元數據獲取。

配置例如以下:

spark.driver.extraJavaOptions -XX:PermSize=128M -XX:MaxPermSize=256M   

可是,上述情況是在yarn-cluster模式下出現。yarn-client模式執行時倒是正常的。原來在$SPARK_HOME/bin/spark-class文件裏已經設置了持久代大小:

JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"

當以yarn-client模式執行時。driver就執行在客戶端的spark-submit進程中。其JVM參數是取的spark-class文件裏的設置,所謂未出現持久代溢出現象。

總結一下Spark中各個角色的JVM參數設置:

(1)Driver的JVM參數:
-Xmx。-Xms,假設是yarn-client模式,則默認讀取spark-env文件裏的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值一樣大小;假設是yarn-cluster模式。則讀取的是spark-default.conf文件裏的spark.driver.extraJavaOptions相應的JVM參數值。


PermSize,假設是yarn-client模式,則是默認讀取spark-class文件裏的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;假設是yarn-cluster模式。讀取的是spark-default.conf文件裏的spark.driver.extraJavaOptions相應的JVM參數值。
GC方式,假設是yarn-client模式,默認讀取的是spark-class文件裏的JAVA_OPTS。假設是yarn-cluster模式,則讀取的是spark-default.conf文件裏的spark.driver.extraJavaOptions相應的參數值。
以上值最後均可被spark-submit工具中的--driver-java-options參數覆蓋。

(2)Executor的JVM參數:
-Xmx,-Xms。假設是yarn-client模式,則默認讀取spark-env文件裏的SPARK_EXECUTOR_MEMORY值,-Xmx。-Xms值一樣大小。假設是yarn-cluster模式,則讀取的是spark-default.conf文件裏的spark.executor.extraJavaOptions相應的JVM參數值。
PermSize。兩種模式都是讀取的是spark-default.conf文件裏的spark.executor.extraJavaOptions相應的JVM參數值。
GC方式。兩種模式都是讀取的是spark-default.conf文件裏的spark.executor.extraJavaOptions相應的JVM參數值。

(3)Executor數目及所占CPU個數
假設是yarn-client模式。Executor數目由spark-env中的SPARK_EXECUTOR_INSTANCES指定,每一個實例的數目由SPARK_EXECUTOR_CORES指定;假設是yarn-cluster模式。Executor的數目由spark-submit工具的--num-executors參數指定,默認是2個實例,而每一個Executor使用的CPU數目由--executor-cores指定,默覺得1核。
每一個Executor執行時的信息能夠通過yarn logs命令查看到,類似於例如以下:

14/08/13 18:12:59 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError=kill %p, -Xms1024m -Xmx1024m , -XX:PermSize=256M -XX:MaxPermSize=256M -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -Xloggc:/tmp/spark_gc.log, -Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://[email protected]:41606/user/CoarseGrainedScheduler, 1, sparktest2, 3, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)

當中。akka.tcp:[email protected]:[email protected]在節點,後面的1表示Executor編號。sparktest2表示ApplicationMaster的host,接著的3表示當前Executor所占用的CPU數目。

8 序列化異常

在Spark上運行hive語句的時候,出現類似於例如以下的異常:

org.apache.spark.SparkDriverExecutionException: Execution error
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:849)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet
    at org.apache.spark.sql.execution.BroadcastNestedLoopJoin$$anonfun$7.apply(joins.scala:336)
    at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:813)
    at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:810)
    at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:845)

  排查其前後的日誌。發現大都是序列化的東西:

14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Serialized task 8.0:3 as 20849 bytes in 0 ms
14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Finished TID 813 in 25 ms on sparktest0 (progress: 3/200)

  而在spark-default.conf中,事先設置了序列化方式為Kryo:

spark.serializer org.apache.spark.serializer.KryoSerializer

依據異常信息,可見是HashSet轉為BitSet類型轉換失敗。Kryo把松散的HashSet轉換為了緊湊的BitSet,把序列化方式凝視掉之後,任務能夠正常運行。難道Spark的Kryo序列化做的還不到位?此問題須要進一步跟蹤。

9 Executor僵死問題

執行一個Spark任務,發現其執行速度遠遠慢於執行相同SQL語句的Hive的執行,甚至出現了OOM的錯誤,最後卡住達幾小時!

而且Executor進程在瘋狂GC。

截取其一Task的OOM異常信息:

技術分享

能夠看到這是在序列化過程中發生的OOM。

依據節點信息。找到相應的Executor進程,觀察其Jstack信息:

Thread 36169: (state = BLOCKED)
 - java.lang.Long.valueOf(long) @bci=27, line=557 (Compiled frame)
 - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=5, line=113 (Compiled frame)
 - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=103 (Compiled frame)
 - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
 - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=158, line=338 (Compiled frame)
 - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=293 (Compiled frame)
 - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
 - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
 - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
 - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame)
 - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame)
 - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame)
 - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame)
 - org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=118 (Compiled frame)
 - org.apache.spark.serializer.DeserializationStream$$anon$1.getNext() @bci=10, line=125 (Compiled frame)
 - org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame)
 - org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext() @bci=4, line=1031 (Compiled frame)
 - scala.collection.Iterator$$anon$13.hasNext() @bci=4, line=371 (Compiled frame)
 - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=30 (Compiled frame)
 - org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame)
 - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame)
 - org.apache.spark.sql.execution.HashJoin$$anonfun$execute$1.apply(scala.collection.Iterator, scala.collection.Iterator) @bci=14, line=77 (Compiled frame)
 - org.apache.spark.sql.execution.HashJoin$$anonfun$execute$1.apply(java.lang.Object, java.lang.Object) @bci=9, line=71 (Interpreted frame)
 - org.apache.spark.rdd.ZippedPartitionsRDD2.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=48, line=87 (Interpreted frame)
 - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=262 (Interpreted frame)


有大量的BLOCKED線程,繼續觀察GC信息。發現大量的FULL GC。

分析。在插入Hive表的時候,實際上須要寫HDFS,在此過程的HashJoin時,伴隨著大量的Shuffle寫操作。JVM的新生代不斷GC,Eden Space寫滿了就往Survivor Space寫,同一時候超過一定大小的數據會直接寫到老生代。當新生代寫滿了之後,也會把老的數據搞到老生代。假設老生代空間不足了。就觸發FULL GC,還是空間不夠,那就OOM錯誤了,此時線程被Blocked。導致整個Executor處理數據的進程被卡住。

當處理大數據的時候,假設JVM配置不當就easy引起上述問題。解決辦法就是增大Executor的使用內存。合理配置新生代和老生代的大小,能夠將老生代的空間適當的調大點。


10 小節

問題是比較嚴重,Application都直接無法執行了。可是引起問題的解決辦法都比較小,歸根結底還是部署的時候環境較為復雜,不夠細致!再接再礪!

以後遇到相關的問題,會再這裏持續更新,方便自己,也方便遇到類似問題的朋友們!

-------------------------------------------------------------------------------

假設您看了本篇博客,認為對您有所收獲。請點擊下方的 [頂]

假設您想轉載本博客。請註明出處

假設您對本文有意見或者建議,歡迎留言

感謝您的閱讀。請關註我的興許博客

Spark on Yarn遇到的幾個問題