1. 程式人生 > >大資料基礎之Spark(1)Spark Submit即Spark任務提交過程

大資料基礎之Spark(1)Spark Submit即Spark任務提交過程

Spark版本2.1.1

一 Spark Submit本地解析

1.1 現象

提交命令:

spark-submit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar

程序:

hadoop 225653 0.0 0.0 11256 364 ? S Aug24 0:00 bash /$spark-dir/bin/spark-class org.apache.spark.deploy.SparkSubmit --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar
hadoop 225654 0.0 0.0 34424 2860 ? Sl Aug24 0:00 /$jdk_dir/bin/java -Xmx128m -cp /spark-dir/jars/* org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit

 --master local[10] --driver-memory 30g --class app.package.AppClass app-1.0.jar

1.2 執行過程

1.2.1 指令碼執行

-bash-4.1$ cat bin/spark-submit
#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "[email protected]"

註釋:這裡執行了另一個指令碼spark-class,具體如下:

 

-bash-4.1$ cat bin/spark-class

...

build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "[email protected]"
printf "%d\0" $?
}

CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "[email protected]")

...

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

註釋:這裡執行java class: org.apache.spark.launcher.Main,並傳入引數,具體如下:

 

1.2.2 程式碼執行

 

org.apache.spark.launcher.Main

...

        builder = new SparkSubmitCommandBuilder(help);

...

    List<String> cmd = builder.buildCommand(env);

...

      List<String> bashCmd = prepareBashCommand(cmd, env);

      for (String c : bashCmd) {

        System.out.print(c);

        System.out.print('\0');

      }

...

註釋:其中會呼叫SparkSubmitCommandBuilder來生成Spark Submit命令,具體如下:

 

org.apache.spark.launcher.SparkSubmitCommandBuilder

...

  private List<String> buildSparkSubmitCommand(Map<String, String> env)

...

    addPermGenSizeOpt(cmd);

    cmd.add("org.apache.spark.deploy.SparkSubmit");

    cmd.addAll(buildSparkSubmitArgs());

    return cmd;

...

註釋:這裡建立了本地命令,其中java class:org.apache.spark.deploy.SparkSubmit,具體如下:

 

org.apache.spark.deploy.SparkSubmit

  def main(args: Array[String]): Unit = {

    val appArgs = new SparkSubmitArguments(args) //parse command line parameter

    if (appArgs.verbose) {

      // scalastyle:off println

      printStream.println(appArgs)

      // scalastyle:on println

    }

    appArgs.action match {

      case SparkSubmitAction.SUBMIT => submit(appArgs)

      case SparkSubmitAction.KILL => kill(appArgs)

      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)

    }

  }

 

    private def submit(args: SparkSubmitArguments): Unit = {

    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) //merge all parameters from: command line, properties file, system property, etc...

 

    def doRunMain(): Unit = {

      ...

        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)

      ...

    }

         ...

 

  private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)

      : (Seq[String], Seq[String], Map[String, String], String) = {

    if (deployMode == CLIENT || isYarnCluster) {

      childMainClass = args.mainClass

      ...

    if (isYarnCluster) {

      childMainClass = "org.apache.spark.deploy.yarn.Client"

      ...

 

  private def runMain(

      childArgs: Seq[String],

      childClasspath: Seq[String],

      sysProps: Map[String, String],

      childMainClass: String,

      verbose: Boolean): Unit = {

    // scalastyle:off println

    if (verbose) {

      printStream.println(s"Main class:\n$childMainClass")

      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")

      printStream.println(s"System properties:\n${sysProps.mkString("\n")}")

      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")

      printStream.println("\n")

    }

    // scalastyle:on println

 

    val loader =

      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {

        new ChildFirstURLClassLoader(new Array[URL](0),

          Thread.currentThread.getContextClassLoader)

      } else {

        new MutableURLClassLoader(new Array[URL](0),

          Thread.currentThread.getContextClassLoader)

      }

    Thread.currentThread.setContextClassLoader(loader)

 

    for (jar <- childClasspath) {

      addJarToClasspath(jar, loader)

    }

 

    for ((key, value) <- sysProps) {

      System.setProperty(key, value)

    }

 

    var mainClass: Class[_] = null

 

    try {

      mainClass = Utils.classForName(childMainClass)

    } catch {

    ...

    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)

    ...

      mainMethod.invoke(null, childArgs.toArray)

      ...

註釋:這裡首先會解析命令列引數,比如mainClass,準備執行環境包括System Property以及classpath等,然後使用一個新的classloader:ChildFirstURLClassLoader來載入使用者的mainClass,然後反射呼叫mainClass的main方法,這樣使用者的app.package.AppClass的main方法就開始執行了。

 

org.apache.spark.SparkConf

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

 

  import SparkConf._

 

  /** Create a SparkConf that loads defaults from system properties and the classpath */

  def this() = this(true)

...

  if (loadDefaults) {

    loadFromSystemProperties(false)

  }

 

  private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {

    // Load any spark.* system properties

    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {

      set(key, value, silent)

    }

    this

  }

註釋:這裡可以看到spark是怎樣載入配置的

 

1.2.3 --verbose

spark-submit --master local[*] --class app.package.AppClass --jars /$other-dir/other.jar  --driver-memory 1g --verbose app-1.0.jar

輸出示例:

Main class:
app.package.AppClass
Arguments:

System properties:
spark.executor.logs.rolling.maxSize -> 1073741824
spark.driver.memory -> 1g
spark.driver.extraLibraryPath -> /$hadoop-dir/lib/native
spark.eventLog.enabled -> true
spark.eventLog.compress -> true
spark.executor.logs.rolling.time.interval -> daily
SPARK_SUBMIT -> true
spark.app.name -> app.package.AppClass
spark.driver.extraJavaOptions -> -XX:+PrintGCDetails -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:-UseCompressedClassPointers -XX:CompressedClassSpaceSize=3G -XX:+PrintGCTimeStamps -Xloggc:/export/Logs/hadoop/g1gc.log
spark.jars -> file:/$other-dir/other.jar
spark.sql.adaptive.enabled -> true
spark.submit.deployMode -> client
spark.executor.logs.rolling.maxRetainedFiles -> 10
spark.executor.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar
spark.eventLog.dir -> hdfs://myhdfs/spark/history
spark.master -> local[*]
spark.sql.crossJoin.enabled -> true
spark.driver.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar
Classpath elements:
file:/$other-dir/other.jar
file:/app-1.0.jar

 

啟動時新增--verbose引數後,可以輸出所有的執行時資訊,有助於判斷問題。