1. 程式人生 > >Spark rest api 提交監控任務

Spark rest api 提交監控任務

場景

  有些時候我們希望搭建自己的web平臺對spark任務進行啟動、監控和管理。spark也提供了restful api對任務進行監控,但是對於如何從外部提交任務並沒有說明。

一、提交任務

 我們使用java進行後臺呼叫restful api,附上提交任務的格式:

curl -X POST http://spark-cluster-ip:6066/v1/submissions/create --header "Content-Type:application/json;charset=UTF-8" --data '{
  "action" : "CreateSubmissionRequest",
  "appArgs" : [ "args1, args2,..." ], 
  "appResource" : "file:/myfilepath/spark-job-1.0.jar", 
  "clientSparkVersion" : "2.1.0",
  "environmentVariables" : {
    "SPARK_ENV_LOADED" : "1"
  },
  "mainClass" : "com.mycompany.MyJob",
  "sparkProperties" : {
    "spark.jars" : "file:/myfilepath/spark-job-1.0.jar",
    "spark.driver.supervise" : "false",
    "spark.app.name" : "MyJob",
    "spark.eventLog.enabled": "true",
    "spark.submit.deployMode" : "cluster",
    "spark.master" : "spark://spark-cluster-ip:6066"
  }
}'

現在我們需要後臺提交需要進行序列化,首先要構建一個JavaBean:

package com.dci.log.logsearch;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


@JsonInclude(JsonInclude.Include.NON_NULL)
class JobSubmitRequest {

    private Action action;

    private String appResource;

    private List<String> appArgs;

    private String clientSparkVersion;

    public Action getAction() {
        return action;
    }

    public void setAction(Action action) {
        this.action = action;
    }

    public String getAppResource() {
        return appResource;
    }

    public void setAppResource(String appResource) {
        this.appResource = appResource;
    }

    public List<String> getAppArgs() {
        return appArgs;
    }

    public void setAppArgs(List<String> appArgs) {
        this.appArgs = appArgs;
    }

    public String getClientSparkVersion() {
        return clientSparkVersion;
    }

    public void setClientSparkVersion(String clientSparkVersion) {
        this.clientSparkVersion = clientSparkVersion;
    }

    public String getMainClass() {
        return mainClass;
    }

    public void setMainClass(String mainClass) {
        this.mainClass = mainClass;
    }

    public Map<String, String> getEnvironmentVariables() {
        return environmentVariables;
    }

    public void setEnvironmentVariables(Map<String, String> environmentVariables) {
        this.environmentVariables = environmentVariables;
    }

    public SparkProperties getSparkProperties() {
        return sparkProperties;
    }

    public void setSparkProperties(SparkProperties sparkProperties) {
        this.sparkProperties = sparkProperties;
    }

    private String mainClass;

    private Map<String,String> environmentVariables;

    private SparkProperties sparkProperties;


    @JsonInclude(JsonInclude.Include.NON_NULL)
    static class SparkProperties {

        @JsonProperty(value = "spark.jars")
        private String jars;

        @JsonProperty(value = "spark.app.name")
        private String appName;

        @JsonProperty(value = "spark.master")
        private String master;

        private Map<String,String> otherProperties = new HashMap<>();

        public String getJars() {
            return jars;
        }

        public void setJars(String jars) {
            this.jars = jars;
        }

        public String getAppName() {
            return appName;
        }

        public void setAppName(String appName) {
            this.appName = appName;
        }

        public String getMaster() {
            return master;
        }

        public void setMaster(String master) {
            this.master = master;
        }

        public void setOtherProperties(Map<String, String> otherProperties) {
            this.otherProperties = otherProperties;
        }

        void setOtherProperties(String key, String value) {
            this.otherProperties.put(key,value);
        }

        @JsonAnyGetter
        Map<String,String> getOtherProperties() {
            return this.otherProperties;
        }

    }


然後我們進行傳參就可以執行了。

 final JobSubmitRequest jobSubmitRequest = new JobSubmitRequest();
        jobSubmitRequest.setAction(Action.CreateSubmissionRequest);
        List<String> appArgs = new ArrayList<>();
        appArgs.add(starttime);
        appArgs.add(endtime);
        appArgs.add(servicemaxnum);
        appArgs.add(key);
        jobSubmitRequest.setAppArgs(appArgs);
        jobSubmitRequest.setAppResource(jarpath);
        jobSubmitRequest.setClientSparkVersion("2.3.1");
        jobSubmitRequest.setMainClass(mainclass);
        Map<String, String> environmentVariables = new HashMap<>();
        environmentVariables.put("SPARK_ENV_LOADED", "1");
        jobSubmitRequest.setEnvironmentVariables(environmentVariables);
        JobSubmitRequest.SparkProperties sparkProperties = new JobSubmitRequest.SparkProperties();
        sparkProperties.setJars(jarpath);
        sparkProperties.setAppName("SubmitScalaJobToSpark");
        sparkProperties.setOtherProperties("spark.submit.deployMode", "cluster");
        sparkProperties.setMaster("spark://"+master);
        jobSubmitRequest.setSparkProperties(sparkProperties);
        HttpClient client = HttpClients.createDefault();
        final String url = "http://"+master+"/v1/submissions/create";
        final HttpPost post = new HttpPost(url);
        post.setHeader(HTTP.CONTENT_TYPE, "application/json;charset=UTF-8");

        try {
            final String message = MapperWrapper.MAPPER.writeValueAsString(jobSubmitRequest);
            post.setEntity(new StringEntity(message.toString()));
            final String stringResponse = client.execute(post, new BasicResponseHandler());
            if (stringResponse != null) {
                SparkResponse response = MapperWrapper.MAPPER.readValue(stringResponse, SparkResponse.class);
                return response.getSubmissionId();
            } else {
                return "FAILED";
            }
        } catch (Exception e) {
            System.out.println(e);
            return "FAILED";
        }

二、監控任務狀態

 監控任務狀態就很簡單了,輸入Submission ID就可以了
 

curl http://spark-cluster-ip:6066/v1/submissions/status/driver-20151008145126-0000

三、參考: