1. 程式人生 > >Spark-利用SparkLauncher 類以JAVA API 程式設計的方式提交spark job

Spark-利用SparkLauncher 類以JAVA API 程式設計的方式提交spark job

一.環境說明和使用軟體的版本說明:

hadoop-version:hadoop-2.9.0.tar.gz 

spark-version:spark-2.2.0-bin-hadoop2.7.tgz

java-version:jdk1.8.0_151

叢集環境:單機偽分散式環境。

二.適用背景

 在學習Spark過程中,資料中介紹的提交Spark Job的方式主要有兩種(我所知道的):第一種是通過命令列的方式提交Job,使用spark 自帶的spark-submit工具提交,官網和大多數參考資料都是已這種方式提交的,提交命令示例如下:

./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3  ../spark-demo.jar

引數含義就不解釋了,請參考官網資料。

第二種提交方式是已JAVA API程式設計的方式提交,這種方式不需要使用命令列,直接可以在IDEA中點選Run 執行包含Job的Main類就行,Spark 提供了以SparkLanuncher 作為唯一入口的API來實現。這種方式很方便(試想如果某個任務需要重複執行,但是又不會寫linux 指令碼怎麼搞?我想到的是以JAV API的方式提交Job, 還可以和Spring整合,讓應用在tomcat中執行),官網的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

三.文章的目地

官網已有demo和API的情況下寫這篇文章的目地:官網給出的demo 放在本機跑不了。出現的現象是程式結束了,什麼輸出都沒有或者輸出JAVA_HOME is not set,雖然我呼叫方法設定了,然而沒啥用,因此把我搜索和加上在自己思考後能夠執行的demo記錄下來。

四.相關demo

   根據官網的示例這裡有兩種方式:

第一種是呼叫SparkLanuncher例項的startApplication方法,但是這種方式在所有配置都正確的情況下使用執行都會失敗的,原因是startApplication方法會呼叫LauncherServer啟動一個程序與叢集互動,這個操作貌似是非同步的,所以可能結果是main主執行緒結束了這個程序都沒有起起來,導致執行失敗。解決辦法是呼叫new SparkLanuncher

().startApplication後需要讓主執行緒休眠一定的時間後者是使用下面的例子:

package com.learn.spark;

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;

public class LanuncherAppV {
    public static void main(String[] args) throws IOException, InterruptedException {


        HashMap env = new HashMap();
        //這兩個屬性必須設定
        env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
        env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
        //可以不設定
        //env.put("YARN_CONF_DIR","");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //這裡呼叫setJavaHome()方法後,JAVA_HOME is not set 錯誤依然存在
        SparkAppHandle handle = new SparkLauncher(env)
                .setSparkHome("/usr/local/spark")
                .setAppResource("/usr/local/spark/spark-demo.jar")
                .setMainClass("com.learn.spark.SimpleApp")
                .setMaster("yarn")
                .setDeployMode("cluster")
                .setConf("spark.app.id", "11222")
                .setConf("spark.driver.memory", "2g")
                .setConf("spark.akka.frameSize", "200")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.instances", "32")
                .setConf("spark.executor.cores", "3")
                .setConf("spark.default.parallelism", "10")
                .setConf("spark.driver.allowMultipleContexts", "true")
                .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                    //這裡監聽任務狀態,當任務結束時(不管是什麼原因結束),isFinal()方法會返回true,否則返回false
                    @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        if (sparkAppHandle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                        System.out.println("state:" + sparkAppHandle.getState().toString());
                    }
                    
                    
                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                        System.out.println("Info:" + sparkAppHandle.getState().toString());
                    }
                });
        System.out.println("The task is executing, please wait ....");
        //執行緒等待任務結束
        countDownLatch.await();
        System.out.println("The task is finished!");


    }
}
注意:如果部署模式是cluster,但是程式碼中有標準輸出的話將看不到,需要把結果寫到HDFS中,如果是client模式則可以看到輸出。

第二種方式是:通過SparkLanuncher.lanunch()方法獲取一個程序,然後呼叫程序的process.waitFor()方法等待執行緒返回結果,但是使用這種方式需要自己管理執行過程中的輸出資訊,比較麻煩,好處是一切都在掌握之中,即獲取的輸出資訊和通過命令提交的方式一樣,很詳細,實現如下:

package com.learn.spark;

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;
import java.util.HashMap;

public class LauncherApp {

    public static void main(String[] args) throws IOException, InterruptedException {
        
        HashMap env = new HashMap();
        //這兩個屬性必須設定
        env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
        env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
        //env.put("YARN_CONF_DIR","");

        SparkLauncher handle = new SparkLauncher(env)
                .setSparkHome("/usr/local/spark")
                .setAppResource("/usr/local/spark/spark-demo.jar")
                .setMainClass("com.learn.spark.SimpleApp")
                .setMaster("yarn")
                .setDeployMode("cluster")
                .setConf("spark.app.id", "11222")
                .setConf("spark.driver.memory", "2g")
                .setConf("spark.akka.frameSize", "200")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.instances", "32")
                .setConf("spark.executor.cores", "3")
                .setConf("spark.default.parallelism", "10")
                .setConf("spark.driver.allowMultipleContexts","true")
                .setVerbose(true);


         Process process =handle.launch();
        InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
        Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
        inputThread.start();

        InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
        Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
        errorThread.start();

        System.out.println("Waiting for finish...");
        int exitCode = process.waitFor();
        System.out.println("Finished! Exit code:" + exitCode);

    }
}

    使用的自定義InputStreamReaderRunnable類實現如下:
package com.learn.spark;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class InputStreamReaderRunnable implements Runnable {

    private BufferedReader reader;

    private String name;

    public InputStreamReaderRunnable(InputStream is, String name) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.name = name;
    }

    public void run() {
        System.out.println("InputStream " + name + ":");
        try {
            String line = reader.readLine();
            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
五.總結

   官網給出的例子在某種配置下肯定是可以執行的,後續需要繼續探索。