1. 程式人生 > >Spark修煉之道(高階篇)——Spark原始碼閱讀:第一節 Spark應用程式提交流程

Spark修煉之道(高階篇)——Spark原始碼閱讀:第一節 Spark應用程式提交流程

作者:搖擺少年夢
微訊號: zhouzhihubeyond

spark-submit 指令碼應用程式提交流程

在執行Spar應用程式時,會將spark應用程式打包後使用spark-submit指令碼提交到Spark中執行,執行提交命令如下:

root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# 
./spark-submit --master spark://sparkmaster:7077 
--class SparkWordCount --executor-memory 1g
 /root/IdeaProjects/SparkWordCount/out
/artifacts/SparkWord Count_jar/SparkWordCount.jar hdfs://ns1/README.md hdfs://ns1/SparkWordCountResult

為弄清楚整個流程,我們先來分析一下spark-submit指令碼,spark-submit指令碼內容如下:

#!/usr/bin/env bash

SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

#spark-submit最終呼叫的是spark-class指令碼
#傳入的類是org.apache.spark.deploy.SparkSubmit #及其它傳入的引數,如deploy mode、executor-memory等 exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "[email protected]"

spark-class指令碼會載入spark配置的環境變數資訊、定位依賴包spark-assembly-1.5.0-hadoop2.4.0.jar檔案(以spark1.5.0為例)等,然後再呼叫org.apache.spark.launcher.Main正式啟動Spark應用程式的執行,具體如下:

# Figure out where Spark is installed
#定位SAPRK_HOME目錄
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"

#載入load-spark-env.sh,執行環境相關資訊
#例如配置檔案conf下的spark-env.sh等
. "$SPARK_HOME"/bin/load-spark-env.sh

# Find the java binary
# 定位JAVA_HOME目錄
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Find assembly jar
#定位spark-assembly-1.5.0-hadoop2.4.0.jar檔案(以spark1.5.0為例)
#這意味著任務提交時無需將該JAR檔案打包
SPARK_ASSEMBLY_JAR=
if [ -f "$SPARK_HOME/RELEASE" ]; then
  ASSEMBLY_DIR="$SPARK_HOME/lib"
else
  ASSEMBLY_DIR="$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION"
fi

num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then
  echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
  echo "You need to build Spark before running this program." 1>&2
  exit 1
fi
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
  echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
  echo "$ASSEMBLY_JARS" 1>&2
  echo "Please remove all but one jar." 1>&2
  exit 1
fi

SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#執行org.apache.spark.launcher.Main作為Spark應用程式的主入口
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "[email protected]")
exec "${CMD[@]}"

從上述程式碼中,可以看到,通過org.apache.spark.launcher.Main類啟動org.apache.spark.deploy.SparkSubmit的執行,SparkSubmit部分原始碼如下:

//SparkSubmit Main方法
def main(args: Array[String]): Unit = {
    //任務提交時設定的引數,見圖2
    val appArgs = new SparkSubmitAarguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      //任務提交時,執行submit(appArgs)
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

這裡寫圖片描述
圖1 appArgs = new SparkSubmitAarguments(args)引數

進入submit方法:

  /**
   * Submit the application using the provided parameters.
   *
   * This runs in two steps. First, we prepare the launch environment by setting up
   * the appropriate classpath, system properties, and application arguments for
   * running the child main class based on the cluster manager and the deploy mode.
   * Second, we use this launch environment to invoke the main method of the child
   * main class.
   */
  private def submit(args: SparkSubmitArguments): Unit = {
   //執行引數等資訊,見圖2
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
    //定義在submit方法中的方法doRunMain()
    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              //執行runMain方法
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              // scalastyle:off println
              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
              // scalastyle:on println
              exitFn(1)
            } else {
              throw e
            }
        }
      } else {
        //執行runMain方法
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      }
    }

     // In standalone cluster mode, there are two submission gateways:
     //   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
     //   (2) The new REST-based gateway introduced in Spark 1.3
     // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
     // to use the legacy gateway if the master endpoint turns out to be not a REST server.
    if (args.isStandaloneCluster && args.useRest) {
      try {
        // scalastyle:off println
        printStream.println("Running Spark using the REST application submission protocol.")
        // scalastyle:on println
        //呼叫submit方法中的doRunMain方法
        doRunMain()
      } catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          printWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args)
      }
    // In all other modes, just run the main class as prepared
    } else {
       //呼叫submit方法中的doRunMain方法
      doRunMain()
    }
  }

這裡寫圖片描述
圖2 任務提交時設定的引數,

從上面的程式碼可以看到,最終呼叫的是runMain方法,其原始碼如下:

/**
   * Run the main method of the child class using the provided launch environment.
   *
   * Note that this main class will not be the one provided by the user if we're
   * running cluster deploy mode or python applications.
   */
  private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {
    // scalastyle:off println
    if (verbose) {
      printStream.println(s"Main class:\n$childMainClass")
      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
      printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
      printStream.println("\n")
    }
    // scalastyle:on println

    val loader =
      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)

    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    for ((key, value) <- sysProps) {
      System.setProperty(key, value)
    }

    var mainClass: Class[_] = null

    try {
      //複用反射載入childMainClass,這裡為SparkWordCount
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        e.printStackTrace(printStream)
        if (childMainClass.contains("thriftserver")) {
          // scalastyle:off println
          printStream.println(s"Failed to load main class $childMainClass.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    }

    // SPARK-4170
    if (classOf[scala.App].isAssignableFrom(mainClass)) {
      printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
    }

    //呼叫反射機制載入main方法,即SparkWordCount中的main方法
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: Throwable =>
        e
    }

    try {
      //執行main方法,即執行SparkWordCount
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    }
  }

mainMethod.invoke(null, childArgs.toArray)方法執行完畢後,進入SparkWordCount的main方法,執行Spark應用程式,如下圖
這裡寫圖片描述
至此,正式完成Spark應用程式執行的提交。

相關推薦

Spark修煉高階——Spark原始碼閱讀第十三節 Spark SQLSQLContext一)

作者:周志湖 1. SQLContext的建立 SQLContext是Spark SQL進行結構化資料處理的入口,可以通過它進行DataFrame的建立及SQL的執行,其建立方式如下: //sc為SparkContext val sqlContext

Spark修煉高階——Spark原始碼閱讀第十二 Spark SQL 處理流程分析

作者:周志湖 下面的程式碼演示了通過Case Class進行表Schema定義的例子: // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLConte

Spark修煉高階——Spark原始碼閱讀第八節 Task執行

Task執行 在上一節中,我們提到在Driver端CoarseGrainedSchedulerBackend中的launchTasks方法向Worker節點中的Executor傳送啟動任務命令,該命令的接收者是CoarseGrainedExecutorBack

Spark修煉高階——Spark原始碼閱讀第一 Spark應用程式提交流程

作者:搖擺少年夢 微訊號: zhouzhihubeyond spark-submit 指令碼應用程式提交流程 在執行Spar應用程式時,會將spark應用程式打包後使用spark-submit指令碼提交到Spark中執行,執行提交命令如下: root@s

Spark修煉基礎——Linux大資料開發基礎第十三節Shell程式設計入門五)

本節主要內容 while expression do command command done (1)計數器格式 適用於迴圈次數已知或固定時 root@sparkslave02:~/ShellLearning/Chapter13# vim w

Spark修煉進階——Spark入門到精通第一 Spark 1.5.0叢集搭建

作者:周志湖 網名:搖擺少年夢 微訊號:zhouzhihubeyond 本節主要內容 作業系統環境準備 Hadoop 2.4.1叢集搭建 Spark 1.5.0 叢集部署 注:在利用CentOS 6.5作業系統安裝spark 1.5叢集過程中,

Spark修煉進階——Spark入門到精通第十四 Spark Streaming 快取、Checkpoint機制

作者:周志湖 微訊號:zhouzhihubeyond 主要內容 Spark Stream 快取 Checkpoint 案例 1. Spark Stream 快取 通過前面一系列的課程介紹,我們知道DStream是由一系列的RDD構成的,

Spark修煉進階——Spark入門到精通第十六 Spark Streaming與Kafka

作者:周志湖 主要內容 Spark Streaming與Kafka版的WordCount示例(一) Spark Streaming與Kafka版的WordCount示例(二) 1. Spark Streaming與Kafka版本的WordCount示例

Spark修煉進階——Spark入門到精通第十 Spark SQL案例實戰

作者:周志湖 放假了,終於能抽出時間更新部落格了……. 1. 獲取資料 本文通過將github上的Spark專案git日誌作為資料,對SparkSQL的內容進行詳細介紹 資料獲取命令如下: [[email protected] spa

Spark修煉進階——Spark入門到精通第十三節 Spark Streaming—— Spark SQL、DataFrame與Spark Streaming

主要內容 Spark SQL、DataFrame與Spark Streaming 1. Spark SQL、DataFrame與Spark Streaming import org.apache.spark.SparkConf import org

Spark修煉進階——Spark入門到精通第十五 Kafka 0.8.2.1 叢集搭建

作者:周志湖 微訊號:zhouzhihubeyond 本節為下一節Kafka與Spark Streaming做鋪墊 主要內容 1.kafka 叢集搭建 1. kafka 叢集搭建 kafka 安裝與配置 tar -zxvf kafka_2

Spark修煉進階——Spark入門到精通第九 Spark SQL執行流程解析

1.整體執行流程 使用下列程式碼對SparkSQL流程進行分析,讓大家明白LogicalPlan的幾種狀態,理解SparkSQL整體執行流程 // sc is an existing SparkContext. val sqlContext = new or

Spark修煉進階——Spark入門到精通第六 Spark程式設計模型三)

作者:周志湖 網名:搖擺少年夢 微訊號:zhouzhihubeyond 本節主要內容 RDD transformation(續) RDD actions 1. RDD transformation(續) (1)repartitionAnd

Spark修煉進階——Spark入門到精通第十 Spark Streaming一)

本節主要內容 Spark流式計算簡介 Spark Streaming相關核心類 入門案例 1. Spark流式計算簡介 Hadoop的MapReduce及Spark SQL等只能進行離線計算,無法滿足實時性要求較高的業務需求,例如實時推薦、實時

[轉載]我的WafBypassMisc

review uoj 行程 onclick aid 歷史 eth reads sim 厲害,滿滿的幹貨,讓我膜一下 現在位置: 首頁 > 文章 > Web安全 > 文章 > 代碼審計 > 正文 我的WafBypass之道(Misc篇)

從誌願軍“斷刀”再論敏捷

慢慢 失敗 多個 之一 朝鮮 無法 一次 mark 學習 從誌願軍“斷刀”再論敏捷之道(上篇) 作者:歐德張(原創) ??在現在的IT項目中,以往常用的是瀑布模型套路,這些年敏捷模式大受歡迎,關於敏捷,現在諸人開口PMI-ACP,閉口則SCRUM,又有諸多實踐、案例遵行其

程式設計師修煉通俗版——第七章

《程式設計師修煉之道》這本書中的內容挺不錯,裡面包含了很多精華,但一些句子很拗口,所以我就根據國人的閱讀習慣,在不改變原意的情況下對詞句稍加修改,標題中的“通俗版”就是這麼來的。 1、在討論使用者介面時,需求、政策和實現之間的區別會變得非常模糊。“系統必須能讓

程式碼簡潔判斷

第一個例子 if (state === 1) { return true } else if (state === 2) { return true } else if (state === 3) { return true } else if (state === 4){ r

Ruby程式設計師修煉第2版.epub

【下載地址】 這是一本深受好評的書。它不僅是一本純Ruby的書,也不僅是一本純Rails的書,而是一本為Rails程式設計師“優化”過的Ruby書。 本書從Ruby程式語言的基礎開始一直講到動態特性,其中包含大量的真實程式碼示例並附有詳細的註解,對日常使用Ruby進行程

Numpy 修煉 12—— genfromtxt函式

定義輸入 genfromtxt的唯一強制引數是資料的源。它可以是字串,字串列表或生成器。如果提供了單個字串,則假定它是本地或遠端檔案或具有read方法的開啟的類檔案物件的名稱,例如檔案或StringIO.StringIO物件。如果提供了字串列表或返回字串的生成器,則每個字串在檔案中被視為一行。當傳遞