1. 程式人生 > >Spark原始碼解析(一):Spark執行流程和指令碼

Spark原始碼解析(一):Spark執行流程和指令碼

Spark執行流程

Spark帶註釋原始碼

對於整個Spark原始碼分析系列,我將帶有註釋的Spark原始碼和分析的檔案放在我的GitHub上Spark原始碼剖析歡迎大家fork和star

過程描述:

1.通過Shell指令碼啟動Master,Master類繼承Actor類,通過ActorySystem建立並啟動。

2.通過Shell指令碼啟動Worker,Worker類繼承Actor類,通過ActorySystem建立並啟動。

3.Worker通過Akka或者Netty傳送訊息向Master註冊並彙報自己的資源資訊(記憶體以及CPU核數等),以後就是定時彙報,保持心跳。

4.Master接受訊息後儲存(原始碼中通過持久化引擎持久化)併發送訊息表示Worker註冊成功,並且定時排程,移除超時的Worker。

5.通過Spark-Submit提交作業或者通過Spark Shell指令碼連線叢集,都會啟動一個Spark程序Driver。

6.Master拿到作業後根據資源篩選Worker並與Worker通訊,傳送資訊,主要包含Driver的地址等。

7.Worker進行收到訊息後,啟動Executor,Executor與Driver通訊。

8.Driver端計算作業資源,transformation在Driver 端完成,劃分各個Stage後提交Task給Executor。

9.Exectuor針對於每一個Task讀取HDFS檔案,然後計算結果,最後將計算的最終結果聚合到Driver端或者寫入到持久化元件中。

Spark啟動Shell指令碼

1.start-all.sh

可以中指令碼檔案看出,spark啟動事首選啟動spark-config.sh,然後啟動start-master.sh 最後啟動指令碼

2.start-master.sh指令碼

${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS1 \
--ip $SPARK_MASTER_IP--port $SPARK_MASTER_PORT--webui-port $SPARK_MASTER_WEBUI_PORT\
$ORIGINAL_ARGS

start-master.sh指令碼主要執行的是這一塊邏輯,主要使用spark-daemon.sh利用傳入的引數啟動Spark。

3.spark-daemon.sh指令碼

nohup nice -n"$SPARK_NICENESS""${SPARK_HOME}"/bin/spark-class $command"[email protected]">> "$log"2>&1 < /dev/null &

指令碼中啟動執行的主要指令碼命令是上述程式碼,主要通過${SPARK_HOME}/bin/spark-class啟動,接下來我們看一下spark-class

4.spark-class檔案

done< <("$RUNNER"-cp"$LAUNCH_CLASSPATH"org.apache.spark.launcher.Main "[email protected]")
exec "${CMD[@]}"

//這裡是$RUNNER代表的含義
if-n"${JAVA_HOME}"]; then
 RUNNER="${JAVA_HOME}/bin/java"
else
 if`command -v java`]; then
   RUNNER="java"
 else
   echo"JAVA_HOME is not set">&2
   exit1
 fi
fi
//這裡是$LAUNCH_CLASSPATH表示的含義
SPARK_ASSEMBLY_JAR=
if-f"${SPARK_HOME}/RELEASE"]; then
 ASSEMBLY_DIR="${SPARK_HOME}/lib"
else
 ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION"
fi

SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

從上面的程式碼可以看出主要是通過獲取Java環境然後啟動Spark程式

Spark Submit指令碼

關於作業提交的主要指令碼是spark-submit指令碼,主要執行的程式碼是

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

這裡主要的用到的類是org.apache.spark.deploy.SparkSubmit,接下來我們將要看SparkSubmit程式碼,這裡的Spark程式碼基於Spark1.3.1

SparkSubmit類

/bin/spark-submit \
 --classcn.edu.hust.WordCount \
 --masterspark://207.184.161.138:7077 \
 --executor-memory20G \
 --total-executor-cores100\
/path/to/examples.jar \
 1000

下面是SparkSubmit類的時序圖:


具體敘述流程如下 :

1.SparkSubmit在main()方法中執行,然後根據提交的型別呼叫相應的方法,這裡是"Submit",呼叫submit()方法,submit()裡面進行一些判斷後,使用反射Class.forName(childMainClass, true, loader),然後呼叫invoke()方法來呼叫程式設計師自己寫的類,我們這裡是WordCount。

2.在WordCount類中,main()方法裡有呼叫SparkContext,SparkContext構造器使用createSparkEnv()方法,這個方法使用SparkEnv.createDriverEnv(conf, isLocal, listenerBus)方法建立SparkEnv物件;在SparkEnv類,呼叫create()方法來進行建立SparkEnv,在這個方法內部,有一個AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)的呼叫過程,主要用來產生Akka中的ActorSystem以及得到繫結的埠號。

3.在建立SparkEnv物件後,SparkContext構造器使用程式碼SparkContext.createTaskScheduler(this, master)建立TaskScheduler物件,這裡根據實際的提交模式來進行建立TaskScheduler物件,提交模式有:local、Mesos、Zookeeper、Simr、Spark,這裡模們主要分析Spark叢集下的模式;然後還需要建立一個SparkDeploySchedulerBackend物件;在建立TaskScheduler物件呼叫initialize()方法,這裡選擇排程模式,主要有兩種模式,FIFO和FAIR,預設的排程模式;最後呼叫taskScheduler的start()方法,裡面主要呼叫SparkDeploySchedulerBackend物件的start()方法,首先呼叫父類的start()方法產生一個用於和Executor通訊的DriverActor物件,然後裡面主要建立一個AppClient物件內部有ClientActor類物件,用於Driver和Master進行RPC通訊。

相關係列文章

微信公眾號

有興趣的同學可以關注一下小編喲!