1. 程式人生 > >spark入門之二 spark作業提交流程

spark入門之二 spark作業提交流程

spark作業提交流程

  在執行Spar應用程式時,會將Spark應用程式打包後使用spark-submit指令碼提交到Spark中執行,執行提交命令如下:

./bin/spark-submit examples/src/main/r/dataframe.

1.1為弄清楚整個流程,我們先來分析一下spark-submit指令碼,spark-submit指令碼內容主要功能如下:

指令碼中呼叫SparkSubmit類

/bin/spark-class org.apache.spark.deploy.SparkSubmit 
1.2 提交流程進入到SparkSubmit.scala類中,進入此類後執行main方法,見下圖

       a: 把傳遞過來的引數封裝到appArgs變數中;

       b: 匹配submitAction 種類,按submit做案列說明


 1.3  在判斷是submit action 後會呼叫submit()方法

         a: Submit the application using the provided parameters  用配置的引數提交application

         b: This runs in two steps. First, we prepare the launch environment by setting up

 the appropriate classpath, system properties, and application arguments for
         running the child main class based on the cluster manager and the deploy mode.
Second, we use this launch environment to invoke the main method of the child main class.
    run 方法分兩步:第一步通過設定的路徑、系統引數、和application引數準備launch environment
    第二部:使用launch environment 呼叫main方法

            

1.3 後續呼叫runMain()方法

      後續用反射方法呼叫自己寫的application,完成作業的提交

  /**
   * Run the main method of the child class using the provided launch environment.
   *
   * Note that this main class will not be the one provided by the user if we're
   * running cluster deploy mode or python applications.
   */
  private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {
    // scalastyle:off println
    if (verbose) {
      printStream.println(s"Main class:\n$childMainClass")
      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
      printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
      printStream.println("\n")
    }
    // scalastyle:on println

    val loader =
      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)
    //載入jar包
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }
    //載入引數
    for ((key, value) <- sysProps) {
      System.setProperty(key, value)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        e.printStackTrace(printStream)
        if (childMainClass.contains("thriftserver")) {
          // scalastyle:off println
          printStream.println(s"Failed to load main class $childMainClass.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
      case e: NoClassDefFoundError =>
        e.printStackTrace(printStream)
        if (e.getMessage.contains("org/apache/hadoop/hive")) {
          // scalastyle:off println
          printStream.println(s"Failed to load hive class.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    }

    // SPARK-4170
    if (classOf[scala.App].isAssignableFrom(mainClass)) {
      printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
    }
   //通過反射獲取main方法
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
    //判斷main方法是否為static
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    @tailrec
    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: Throwable =>
        e
    }
   try {
     //呼叫main方法
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        findCause(t) match {
          case SparkUserAppException(exitCode) =>
            System.exit(exitCode)

          case t: Throwable =>
            throw t
        }
    }
  }