1. 程式人生 > >第一節 Spark2.3原始碼解析之spark2-submit程式提交流程

第一節 Spark2.3原始碼解析之spark2-submit程式提交流程

本系列”spark2原始碼解析”,均以最新spark2.3.0版本為藍本進行編寫,轉載請註明出處

spark2-submit指令碼應用程式提交流程

一 目錄

1.打包程式提交的流程demo,注意cdh spark2.3.x已改為spark2-submit

2.spark-submit指令碼會載入環境變數和jar包,啟動spark launcher Main 類;

3.再啟動deploy SparkSubmit類,呼叫裡面submit方法執行doRunMain 設定系統引數,應用引數,childMain Path

4.最終呼叫runMain ,啟動程式

二 明細

在spark-master/docs/submitting-applications.md可以看到很多demo

# Run application locally on 8 cores
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  /path/to/examples.jar \
  100

  # Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
--executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # Run on a Spark standalone cluster in cluster deploy mode with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --deploy-mode cluster \ --supervise \
--executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000 # Run on a YARN cluster export HADOOP_CONF_DIR=XXX ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ # can be client for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000

首先,分析spark-submit指令碼內容,實際是呼叫spark-class指令碼


  #!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

#spark-submit最終呼叫的是spark-class指令碼
#傳入的類是org.apache.spark.deploy.SparkSubmit
#及其它傳入的引數,如deploy mode、executor-memory等
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "[email protected]"

spark-submit 指令碼會載入spark配置的環境變數資訊,定位依賴包,增加啟動器構建目錄等,然後再呼叫org.apache.spark.launcher.Main正式啟動Spark程式的執行,具體如下:

 #!/usr/bin/env bash


if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
# 找Java 環境
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Find Spark jars.
# 找spark依賴包
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

# Add the launcher build dir to the classpath if requested.
# 請求時增加啟動器構建目錄
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "[email protected]"
  printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "[email protected]")

COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

下面為Main.java的部分程式碼

class Main {
  public static void main(String[] argsArray) throws Exception {
    checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");

    List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    String className = args.remove(0);

    boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
    AbstractCommandBuilder builder;
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      try {
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        printLaunchCommand = false;
        System.err.println("Error: " + e.getMessage());
        System.err.println();

        MainClassOptionParser parser = new MainClassOptionParser();
        try {
          parser.parse(args);
        } catch (Exception ignored) {
          // Ignore parsing exceptions.
        }

        List<String> help = new ArrayList<>();
        if (parser.className != null) {
          help.add(parser.CLASS);
          help.add(parser.className);
        }
        help.add(parser.USAGE_ERROR);
        builder = new SparkSubmitCommandBuilder(help);
      }
    } else {
      builder = new SparkClassCommandBuilder(className, args);
    }

    ...
    }

從上述程式碼中,可以看到,需要先啟動org.apache.spark.launcher.Main類,此類裡呼叫org.apache.spark.deploy.SparkSubmit再執行,SparkSubmit的部分原始碼如下


def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit()
    submit.doSubmit(args)
  }

def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

  /**
   * Submit the application using the provided parameters.
   *
   * This runs in two steps. First, we prepare the launch environment by setting up
   * the appropriate classpath, system properties, and application arguments for
   * running the child main class based on the cluster manager and the deploy mode.
   * Second, we use this launch environment to invoke the main method of the child
   * main class.
   */
  @tailrec
  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
            } else {
              throw e
            }
        }
      } else {
        runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
      }
    }

    // Let the main class re-initialize the logging system once it starts.
    if (uninitLog) {
      Logging.uninitialize()
    }

    // In standalone cluster mode, there are two submission gateways:
    //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
    //   (2) The new REST-based gateway introduced in Spark 1.3
    // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
    // to use the legacy gateway if the master endpoint turns out to be not a REST server.
    if (args.isStandaloneCluster && args.useRest) {
      try {
        logInfo("Running Spark using the REST application submission protocol.")
      } catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          logWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args, false)
      }
    // In all other modes, just run the main class as prepared
    } else {
      doRunMain()
    }
  }

最終呼叫的runMain


    /**
   * Run the main method of the child class using the provided launch environment.
   *
   * Note that this main class will not be the one provided by the user if we're
   * running cluster deploy mode or python applications.
   */
  private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sparkConf: SparkConf,
      childMainClass: String,
      verbose: Boolean): Unit = {
    if (verbose) {
      logInfo(s"Main class:\n$childMainClass")
      logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
      // sysProps may contain sensitive information, so redact before printing
      logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
      logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
      logInfo("\n")
    }

    val loader =
      if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)

    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        logWarning(s"Failed to load $childMainClass.", e)
        if (childMainClass.contains("thriftserver")) {
          logInfo(s"Failed to load main class $childMainClass.")
          logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
        }
        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
      case e: NoClassDefFoundError =>
        logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
        if (e.getMessage.contains("org/apache/hadoop/hive")) {
          logInfo(s"Failed to load hive class.")
          logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
        }
        throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
    }

    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.newInstance().asInstanceOf[SparkApplication]
    } else {
      // SPARK-4170
      if (classOf[scala.App].isAssignableFrom(mainClass)) {
        logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
      }
      new JavaMainApplication(mainClass)
    }

    @tailrec
    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: Throwable =>
        e
    }

    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    }
  }

app.start(childArgs.toArray,sparkConf) 方法執行完畢,進入org.apache.spark.examples.SparkPi的main方法,執行spark應用程式.
至此,完成了整個spark應用提交過程.

☆┈━═┈━═┈━═┈━═┈━═☆因為您的讚賞,您會讀到更多優質文章☆┈━═┈━═┈━═┈━═┈━═☆
這裡寫圖片描述

相關推薦

第一 Spark2.3原始碼解析spark2-submit程式提交流程

本系列”spark2原始碼解析”,均以最新spark2.3.0版本為藍本進行編寫,轉載請註明出處 spark2-submit指令碼應用程式提交流程 一 目錄 1.打包程式提交的流程demo,注意cdh spark2.3.x已改為spark2-submi

hadoop2.7.3原始碼解析hdfs刪除檔案全流程分析

客戶端刪除檔案 先來一段簡單的程式碼,用java的api刪除hdfs的 檔案 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(co

spark2.3原始碼分析in-memory collection

AppendOnlyMap 概述 一個只可以新增資料的hash table的實現。它的key值永遠不會刪除,而每個key的value值可能會改變。 該hash table使用開放探測方法中的二次探測法儲存資料,所以內部只有一個數組的資料結構。 該hash table的

Laravel原始碼解析入口,程式設計師必學

前言 提升能力的方法並非使用更多工具,而是解刨自己所使用的工具。今天我們從Laravel啟動的第一步開始講起。 入口檔案 laravel是單入口框架,所有請求必將經過index.php define(‘LARAVEL_START’, microtime(true

hadoop原始碼解析hdfs寫資料全流程分析---客戶端處理

DFSOutputStream介紹 DFSOutputStream概況介紹 這一節我們介紹hdfs寫資料過程中,客戶端的處理部分。客戶端的處理主要是用到了DFSOutputStream物件,從名字我們可以看出,這個是對dfs檔案系統輸出流的一個

Spark2.2.2原始碼解析: 2.啟動master節點流程分析

本文主要說明在啟動master節點的時候,程式碼的流程走向。   授予檔案執行許可權 chmod755  兩個目錄裡的檔案: /workspace/spark-2.2.2/bin  --所有檔案 /workspace/spark-2.2.2/sb

OkHttp 3.x 原始碼解析Interceptor 攔截器

OkHttp攔截器原理解析 攔截器 Java裡的攔截器是動態攔截Action呼叫的物件。它提供了一種機制可以使開發者可以定義在一個action執行的前後執行的程式碼,也可以在一個action執行前阻止其執行,同時也提供了一種可以提取action中

Spring Security4.0.3原始碼分析http標籤解析

最近在學習安全框架Spring Security,想弄清楚其中實現的具體步驟,於是下定決心,研究一下Spring Security原始碼,這篇部落格的目的是想把學習過程記錄下來。學習過程中主要參考了http://dead-knight.iteye.com/cat

Spark修煉道(高階篇)——Spark原始碼閱讀:第一 Spark應用程式提交流程

作者:搖擺少年夢 微訊號: zhouzhihubeyond spark-submit 指令碼應用程式提交流程 在執行Spar應用程式時,會將spark應用程式打包後使用spark-submit指令碼提交到Spark中執行,執行提交命令如下: root@s

Android框架原始碼解析(四)Picasso

這次要分析的原始碼是 Picasso 2.5.2 ,四年前的版本,用eclipse寫的,但不影響這次我們對其原始碼的分析 地址:https://github.com/square/picasso/tree/picasso-parent-2.5.2 Picasso的簡單使用

Android框架原始碼解析(三)ButterKnife

注:所有分析基於butterknife:8.4.0 原始碼目錄:https://github.com/JakeWharton/butterknife 其中最主要的3個模組是: Butterknife註解處理器https://github.com/JakeWharton/

Android框架原始碼解析(二)OKhttp

原始碼在:https://github.com/square/okhttp 包實在是太多了,OKhttp核心在這塊https://github.com/square/okhttp/tree/master/okhttp 直接匯入Android Studio中即可。 基本使用:

Android框架原始碼解析(一)Volley

前幾天面試CVTE,HR面掛了。讓內部一個學長幫我查看了一下面試官評價,發現二面面試官的評價如下: 廣度OK,但缺乏深究能力,深度與實踐不足 原始碼:只能說流程,細節程式碼不清楚,retrofit和volley都是。 感覺自己一方面:自己面試技巧有待提高吧(框

Android原始碼解析應用程式資源管理器(Asset Manager)的建立過程分析

轉載自:https://blog.csdn.net/luoshengyang/article/details/8791064 我們分析了Android應用程式資源的編譯和打包過程,最終得到的應用程式資源就與應用程式程式碼一起打包在一個APK檔案中。Android應用程式在執行的過程中,是通過一個

Spring-web原始碼解析Filter-OncePerRequestFilter

轉自:  http://blog.csdn.net/ktlifeng/article/details/50630934 基於4.1.7.RELEASE 我們先看一個filter-mapping的配置 

spring原始碼解析AOP原理

一、準備工作   在這裡我先簡單記錄下如何實現一個aop: AOP:【動態代理】 指在程式執行期間動態的將某段程式碼切入到指定方法指定位置進行執行的程式設計方式; 1、匯入aop模組;Spring AOP:(spring-aspects) 2、定義一個業務邏輯類(

Dubbo原始碼解析服務端接收訊息

準備 dubbo 版本:2.5.4 服務端接收訊息流程 Handler鏈路 DubboProtocol private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent("c

Dubbo原始碼解析服務釋出與註冊

準備 dubbo版本:2.5.4 Spring自定義擴充套件 dubbo 是基於 spring 配置來實現服務釋出,並基於 spring 的擴充套件機制定義了一套自定義標籤,要實現自定義擴充套件, spring 中提供了 NamespaceHandler 、BeanDefinit

MyBatis原始碼解析日誌記錄

一 .概述 MyBatis沒有提供日誌的實現類,需要接入第三方的日誌元件,但第三方日誌元件都有各自的Log級別,且各不相同,但MyBatis統一提供了trace、debug、warn、error四個級別; 自動掃描日誌實現,並且第三方日誌外掛載入優先順序如下:slf4J → commonsLoging →

MyBatis原始碼解析資料來源(含資料庫連線池簡析)

一.概述: 常見的資料來源元件都實現了javax.sql.DataSource介面; MyBatis不但要能整合第三方的資料來源元件,自身也提供了資料來源的實現; 一般情況下,資料來源的初始化過程引數較多,比較複雜; 二.設計模式: 為什麼要使用工廠模式     資料來