Spark修煉之道(高階篇)——Spark原始碼閱讀:第一節 Spark應用程式提交流程
作者:搖擺少年夢
微訊號: zhouzhihubeyond
spark-submit 指令碼應用程式提交流程
在執行Spar應用程式時,會將spark應用程式打包後使用spark-submit指令碼提交到Spark中執行,執行提交命令如下:
root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin#
./spark-submit --master spark://sparkmaster:7077
--class SparkWordCount --executor-memory 1g
/root/IdeaProjects/SparkWordCount/out /artifacts/SparkWord
Count_jar/SparkWordCount.jar hdfs://ns1/README.md
hdfs://ns1/SparkWordCountResult
為弄清楚整個流程,我們先來分析一下spark-submit指令碼,spark-submit指令碼內容如下:
#!/usr/bin/env bash
SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
#spark-submit最終呼叫的是spark-class指令碼
#傳入的類是org.apache.spark.deploy.SparkSubmit
#及其它傳入的引數,如deploy mode、executor-memory等
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "[email protected]"
spark-class指令碼會載入spark配置的環境變數資訊、定位依賴包spark-assembly-1.5.0-hadoop2.4.0.jar檔案(以spark1.5.0為例)等,然後再呼叫org.apache.spark.launcher.Main正式啟動Spark應用程式的執行,具體如下:
# Figure out where Spark is installed
#定位SAPRK_HOME目錄
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
#載入load-spark-env.sh,執行環境相關資訊
#例如配置檔案conf下的spark-env.sh等
. "$SPARK_HOME"/bin/load-spark-env.sh
# Find the java binary
# 定位JAVA_HOME目錄
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Find assembly jar
#定位spark-assembly-1.5.0-hadoop2.4.0.jar檔案(以spark1.5.0為例)
#這意味著任務提交時無需將該JAR檔案打包
SPARK_ASSEMBLY_JAR=
if [ -f "$SPARK_HOME/RELEASE" ]; then
ASSEMBLY_DIR="$SPARK_HOME/lib"
else
ASSEMBLY_DIR="$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION"
fi
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
echo "$ASSEMBLY_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#執行org.apache.spark.launcher.Main作為Spark應用程式的主入口
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "[email protected]")
exec "${CMD[@]}"
從上述程式碼中,可以看到,通過org.apache.spark.launcher.Main類啟動org.apache.spark.deploy.SparkSubmit的執行,SparkSubmit部分原始碼如下:
//SparkSubmit Main方法
def main(args: Array[String]): Unit = {
//任務提交時設定的引數,見圖2
val appArgs = new SparkSubmitAarguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
//任務提交時,執行submit(appArgs)
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
圖1 appArgs = new SparkSubmitAarguments(args)引數
進入submit方法:
/**
* Submit the application using the provided parameters.
*
* This runs in two steps. First, we prepare the launch environment by setting up
* the appropriate classpath, system properties, and application arguments for
* running the child main class based on the cluster manager and the deploy mode.
* Second, we use this launch environment to invoke the main method of the child
* main class.
*/
private def submit(args: SparkSubmitArguments): Unit = {
//執行引數等資訊,見圖2
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
//定義在submit方法中的方法doRunMain()
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
//執行runMain方法
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
//執行runMain方法
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
//呼叫submit方法中的doRunMain方法
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
//呼叫submit方法中的doRunMain方法
doRunMain()
}
}
圖2 任務提交時設定的引數,
從上面的程式碼可以看到,最終呼叫的是runMain方法,其原始碼如下:
/**
* Run the main method of the child class using the provided launch environment.
*
* Note that this main class will not be the one provided by the user if we're
* running cluster deploy mode or python applications.
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
try {
//複用反射載入childMainClass,這裡為SparkWordCount
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
//呼叫反射機制載入main方法,即SparkWordCount中的main方法
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
try {
//執行main方法,即執行SparkWordCount
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
throw findCause(t)
}
}
mainMethod.invoke(null, childArgs.toArray)方法執行完畢後,進入SparkWordCount的main方法,執行Spark應用程式,如下圖
至此,正式完成Spark應用程式執行的提交。
相關推薦
Spark修煉之道(高階篇)——Spark原始碼閱讀:第十三節 Spark SQL之SQLContext(一)
作者:周志湖 1. SQLContext的建立 SQLContext是Spark SQL進行結構化資料處理的入口,可以通過它進行DataFrame的建立及SQL的執行,其建立方式如下: //sc為SparkContext val sqlContext
Spark修煉之道(高階篇)——Spark原始碼閱讀:第十二節 Spark SQL 處理流程分析
作者:周志湖 下面的程式碼演示了通過Case Class進行表Schema定義的例子: // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLConte
Spark修煉之道(高階篇)——Spark原始碼閱讀:第八節 Task執行
Task執行 在上一節中,我們提到在Driver端CoarseGrainedSchedulerBackend中的launchTasks方法向Worker節點中的Executor傳送啟動任務命令,該命令的接收者是CoarseGrainedExecutorBack
Spark修煉之道(高階篇)——Spark原始碼閱讀:第一節 Spark應用程式提交流程
作者:搖擺少年夢 微訊號: zhouzhihubeyond spark-submit 指令碼應用程式提交流程 在執行Spar應用程式時,會將spark應用程式打包後使用spark-submit指令碼提交到Spark中執行,執行提交命令如下: root@s
Spark修煉之道(基礎篇)——Linux大資料開發基礎:第十三節:Shell程式設計入門(五)
本節主要內容 while expression do command command done (1)計數器格式 適用於迴圈次數已知或固定時 root@sparkslave02:~/ShellLearning/Chapter13# vim w
Spark修煉之道(進階篇)——Spark入門到精通:第一節 Spark 1.5.0叢集搭建
作者:周志湖 網名:搖擺少年夢 微訊號:zhouzhihubeyond 本節主要內容 作業系統環境準備 Hadoop 2.4.1叢集搭建 Spark 1.5.0 叢集部署 注:在利用CentOS 6.5作業系統安裝spark 1.5叢集過程中,
Spark修煉之道(進階篇)——Spark入門到精通:第十四節 Spark Streaming 快取、Checkpoint機制
作者:周志湖 微訊號:zhouzhihubeyond 主要內容 Spark Stream 快取 Checkpoint 案例 1. Spark Stream 快取 通過前面一系列的課程介紹,我們知道DStream是由一系列的RDD構成的,
Spark修煉之道(進階篇)——Spark入門到精通:第十六節 Spark Streaming與Kafka
作者:周志湖 主要內容 Spark Streaming與Kafka版的WordCount示例(一) Spark Streaming與Kafka版的WordCount示例(二) 1. Spark Streaming與Kafka版本的WordCount示例
Spark修煉之道(進階篇)——Spark入門到精通:第十節 Spark SQL案例實戰(一)
作者:周志湖 放假了,終於能抽出時間更新部落格了……. 1. 獲取資料 本文通過將github上的Spark專案git日誌作為資料,對SparkSQL的內容進行詳細介紹 資料獲取命令如下: [[email protected] spa
Spark修煉之道(進階篇)——Spark入門到精通:第十三節 Spark Streaming—— Spark SQL、DataFrame與Spark Streaming
主要內容 Spark SQL、DataFrame與Spark Streaming 1. Spark SQL、DataFrame與Spark Streaming import org.apache.spark.SparkConf import org
Spark修煉之道(進階篇)——Spark入門到精通:第十五節 Kafka 0.8.2.1 叢集搭建
作者:周志湖 微訊號:zhouzhihubeyond 本節為下一節Kafka與Spark Streaming做鋪墊 主要內容 1.kafka 叢集搭建 1. kafka 叢集搭建 kafka 安裝與配置 tar -zxvf kafka_2
Spark修煉之道(進階篇)——Spark入門到精通:第九節 Spark SQL執行流程解析
1.整體執行流程 使用下列程式碼對SparkSQL流程進行分析,讓大家明白LogicalPlan的幾種狀態,理解SparkSQL整體執行流程 // sc is an existing SparkContext. val sqlContext = new or
Spark修煉之道(進階篇)——Spark入門到精通:第六節 Spark程式設計模型(三)
作者:周志湖 網名:搖擺少年夢 微訊號:zhouzhihubeyond 本節主要內容 RDD transformation(續) RDD actions 1. RDD transformation(續) (1)repartitionAnd
Spark修煉之道(進階篇)——Spark入門到精通:第十節 Spark Streaming(一)
本節主要內容 Spark流式計算簡介 Spark Streaming相關核心類 入門案例 1. Spark流式計算簡介 Hadoop的MapReduce及Spark SQL等只能進行離線計算,無法滿足實時性要求較高的業務需求,例如實時推薦、實時
[轉載]我的WafBypass之道(Misc篇)
review uoj 行程 onclick aid 歷史 eth reads sim 厲害,滿滿的幹貨,讓我膜一下 現在位置: 首頁 > 文章 > Web安全 > 文章 > 代碼審計 > 正文 我的WafBypass之道(Misc篇)
從誌願軍“斷刀”再論敏捷之道(上篇)
慢慢 失敗 多個 之一 朝鮮 無法 一次 mark 學習 從誌願軍“斷刀”再論敏捷之道(上篇) 作者:歐德張(原創) ??在現在的IT項目中,以往常用的是瀑布模型套路,這些年敏捷模式大受歡迎,關於敏捷,現在諸人開口PMI-ACP,閉口則SCRUM,又有諸多實踐、案例遵行其
程式設計師修煉之道(通俗版)——第七章
《程式設計師修煉之道》這本書中的內容挺不錯,裡面包含了很多精華,但一些句子很拗口,所以我就根據國人的閱讀習慣,在不改變原意的情況下對詞句稍加修改,標題中的“通俗版”就是這麼來的。 1、在討論使用者介面時,需求、政策和實現之間的區別會變得非常模糊。“系統必須能讓
程式碼簡潔之道(判斷篇)
第一個例子 if (state === 1) { return true } else if (state === 2) { return true } else if (state === 3) { return true } else if (state === 4){ r
Ruby程式設計師修煉之道(第2版).epub
【下載地址】 這是一本深受好評的書。它不僅是一本純Ruby的書,也不僅是一本純Rails的書,而是一本為Rails程式設計師“優化”過的Ruby書。 本書從Ruby程式語言的基礎開始一直講到動態特性,其中包含大量的真實程式碼示例並附有詳細的註解,對日常使用Ruby進行程
Numpy 修煉之道 (12)—— genfromtxt函式
定義輸入 genfromtxt的唯一強制引數是資料的源。它可以是字串,字串列表或生成器。如果提供了單個字串,則假定它是本地或遠端檔案或具有read方法的開啟的類檔案物件的名稱,例如檔案或StringIO.StringIO物件。如果提供了字串列表或返回字串的生成器,則每個字串在檔案中被視為一行。當傳遞