1. 程式人生 > >spark提交任務的三種的方法

spark提交任務的三種的方法

在學習Spark過程中,資料中介紹的提交Spark Job的方式主要有三種:

第一種:

   通過命令列的方式提交Job,使用spark 自帶的spark-submit工具提交,官網和大多數參考資料都是已這種方式提交的,提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
引數含義就不解釋了,請參考官網資料。


 第二種:

   提交方式是已JAVA API程式設計的方式提交,這種方式不需要使用命令列,直接可以在IDEA中點選Run 執行包含Job的Main類就行,Spark 提供了以SparkLanuncher 作為唯一入口的API來實現。這種方式很方便(試想如果某個任務需要重複執行,但是又不會寫linux 指令碼怎麼搞?我想到的是以JAV API的方式提交Job, 還可以和Spring整合,讓應用在tomcat中執行),官網的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

根據官網的示例,通過JAVA API程式設計的方式提交有兩種方式:

        第一種是呼叫SparkLanuncher例項的startApplication方法,但是這種方式在所有配置都正確的情況下使用執行都會失敗的,原因是startApplication方法會呼叫LauncherServer啟動一個程序與叢集互動,這個操作貌似是非同步的,所以可能結果是main主執行緒結束了這個程序都沒有起起來,導致執行失敗。解決辦法是呼叫new SparkLanuncher().startApplication後需要讓主執行緒休眠一定的時間後者是使用下面的例子:

package com.learn.spark; 

import org.apache.spark.launcher.SparkAppHandle; 
import org.apache.spark.launcher.SparkLauncher; 

import java.io.IOException; 
import java.util.HashMap; 
import java.util.concurrent.CountDownLatch; 

public class LanuncherAppV { 
    public static void main(String[] args) throws IOException, InterruptedException { 

        HashMap env = new HashMap(); 
        //這兩個屬性必須設定 
        env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf"); 
        env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151"); 
        //可以不設定 
        //env.put("YARN_CONF_DIR",""); 
        CountDownLatch countDownLatch = new CountDownLatch(1); 
        //這裡呼叫setJavaHome()方法後,JAVA_HOME is not set 錯誤依然存在 
        SparkAppHandle handle = new SparkLauncher(env) 
        .setSparkHome("/usr/local/spark") 
        .setAppResource("/usr/local/spark/spark-demo.jar") 
        .setMainClass("com.learn.spark.SimpleApp") 
        .setMaster("yarn") 
        .setDeployMode("cluster") 
        .setConf("spark.app.id", "11222") 
        .setConf("spark.driver.memory", "2g") 
        .setConf("spark.akka.frameSize", "200") 
        .setConf("spark.executor.memory", "1g") 
        .setConf("spark.executor.instances", "32") 
        .setConf("spark.executor.cores", "3") 
        .setConf("spark.default.parallelism", "10") 
        .setConf("spark.driver.allowMultipleContexts", "true") 
        .setVerbose(true).startApplication(new SparkAppHandle.Listener() { 
        //這裡監聽任務狀態,當任務結束時(不管是什麼原因結束),isFinal()方法會返回true,否則返回false 
         @Override 
        public void stateChanged(SparkAppHandle sparkAppHandle) { 
            if (sparkAppHandle.getState().isFinal()) { 
                countDownLatch.countDown(); 
            } 
            System.out.println("state:" + sparkAppHandle.getState().toString()); 
        } 

        @Override 
        public void infoChanged(SparkAppHandle sparkAppHandle) { 
            System.out.println("Info:" + sparkAppHandle.getState().toString()); 
        } 
    }); 
    System.out.println("The task is executing, please wait ...."); 
    //執行緒等待任務結束 
    countDownLatch.await(); 
    System.out.println("The task is finished!"); 


    } 
}

 注意:如果部署模式是cluster,但是程式碼中有標準輸出的話將看不到,需要把結果寫到HDFS中,如果是client模式則可以看到輸出。

第二種方式是:通過SparkLanuncher.lanunch()方法獲取一個程序,然後呼叫程序的process.waitFor()方法等待執行緒返回結果,但是使用這種方式需要自己管理執行過程中的輸出資訊,比較麻煩,好處是一切都在掌握之中,即獲取的輸出資訊和通過命令提交的方式一樣,很詳細,實現如下:

package com.learn.spark; 

import org.apache.spark.launcher.SparkAppHandle; 
import org.apache.spark.launcher.SparkLauncher; 

import java.io.IOException; 
import java.util.HashMap; 

public class LauncherApp { 

public static void main(String[] args) throws IOException, InterruptedException { 

    HashMap env = new HashMap(); 
    //這兩個屬性必須設定 
    env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf"); 
    env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151"); 
    //env.put("YARN_CONF_DIR",""); 

    SparkLauncher handle = new SparkLauncher(env) 
        .setSparkHome("/usr/local/spark") 
        .setAppResource("/usr/local/spark/spark-demo.jar") 
        .setMainClass("com.learn.spark.SimpleApp") 
        .setMaster("yarn") 
        .setDeployMode("cluster") 
        .setConf("spark.app.id", "11222") 
        .setConf("spark.driver.memory", "2g") 
        .setConf("spark.akka.frameSize", "200") 
        .setConf("spark.executor.memory", "1g") 
        .setConf("spark.executor.instances", "32") 
        .setConf("spark.executor.cores", "3") 
        .setConf("spark.default.parallelism", "10") 
        .setConf("spark.driver.allowMultipleContexts","true") 
        .setVerbose(true); 


    Process process =handle.launch(); 
    InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input"); 
    Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); 
    inputThread.start(); 

    InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error"); 
    Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); 
    errorThread.start(); 

    System.out.println("Waiting for finish..."); 
    int exitCode = process.waitFor(); 
    System.out.println("Finished! Exit code:" + exitCode); 

    } 
}

使用的自定義InputStreamReaderRunnable類實現如下:

package com.learn.spark; 

import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.InputStreamReader; 

public class InputStreamReaderRunnable implements Runnable { 

  private BufferedReader reader; 

  private String name; 

  public InputStreamReaderRunnable(InputStream is, String name) { 
    this.reader = new BufferedReader(new InputStreamReader(is)); 
    this.name = name; 
  } 

  public void run() {
 
    System.out.println("InputStream " + name + ":"); 
    try { 
        String line = reader.readLine(); 
        while (line != null) { 
           System.out.println(line); 
           line = reader.readLine(); 
        } 
        reader.close(); 
      } catch (IOException e) { 
        e.printStackTrace(); 
      } 
   } 
}

第三種方式是通過yarn的rest api的方式提交(不太常用但在這裡也介紹一下):

Post請求示例: * http://<rm http address:port>/ws/v1/cluster/apps

請求所帶的引數列表:

Item Data Type Description
application-id string The application id
application-name string The application name
queue string The name of the queue to which the application should be submitted
priority int The priority of the application
am-container-spec object The application master container launch context, described below
unmanaged-AM boolean Is the application using an unmanaged application master
max-app-attempts int The max number of attempts for this application
resource object The resources the application master requires, described below
application-type string The application type(MapReduce, Pig, Hive, etc)
keep-containers-across-application-attempts boolean Should YARN keep the containers used by this application instead of destroying them
application-tags object List of application tags, please see the request examples on how to speciy the tags
log-aggregation-context object Represents all of the information needed by the NodeManager to handle the logs for this application
attempt-failures-validity-interval long The failure number will no take attempt failures which happen out of the validityInterval into failure count
reservation-id string Represent the unique id of the corresponding reserved resource allocation in the scheduler
am-black-listing-requests object Contains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold”