1. 程式人生 > >基於springboot建立Spark應用的submit服務

基於springboot建立Spark應用的submit服務

目錄



背景

一直很好奇web後臺如何啟動Spark應用程式,查詢Api後發現可以使用org.apache.spark.launcher.SparkLauncher來做到這一點。我想得動手測試一下,而且要做的體面一些,所以搞個簡易的web工程吧,順便學習熟悉一下使用springboot框架。在這裡將整個折騰的過程記錄下來,新手上路,有任何搞錯的地方,或者走了彎路,還請大家不吝指出,幫我進步

準備工作

1. 搭建hadoop叢集,我這邊用的是兩臺主機的分散式叢集
2. 安裝Spark,測試能執行spark-submit即可,然後配置好HistoryServer
3. 安裝Mysql,建立一個Spark應用資訊表,只有mainClass和jarPath兩個欄位
4. 熟悉Springboot框架的基本使用

主要流程

我設想主要有三個html頁面

1. 查詢已經開發好的spark應用(應用資訊提前入到資料庫裡)
2. 設定執行引數後提交(引數包括mainclass、jar包路徑、driver記憶體、executor記憶體等)
3. 顯示應用執行結果

效果截圖

1. 查詢Spark應用,點選應用進入submit頁面

2. 執行引數設定

3. 提交應用程式,正在執行中

4. 執行結束後跳轉,檢視執行結果。點選Tracking URL會跳轉到Yarn的Application管理
頁面,還能檢視Spark應用的job資訊。


主要程式碼

  1. 搭建一個springboot專案,配置依賴DevTools

    + web + thymeleaf + mysql + mybatis
    DevTools模組使Spring Boot應用支援熱部署,提高開發者的開發效率,修改後無需手動重啟Spring Boot應用。可以先不配,需要用的時候再說。

  2. Spark應用資訊表,只有三個欄位:mainClass是應用程式的main方法,jarPath是jar包存放路徑,note是應用說明
    在這裡插入圖片描述

  3. 實體類
    這裡只用到兩個實體類:Spark應用資訊AppInfo和Spark應用執行引數SparkAppPara

    public class AppInfo {
    	String mainClass;//應用程式的mainClass
    String jarPath;//應用程式jar包的存放位置,可以是本地或HDFS String note;//應用說明 //省略getter和setter }
    public class SparkAppPara {
    	String mainClass;
    	String jarPath;
    	String master;//可以是Yarn或StandAlone
    	String deployMode;//可以是Cluster或Client
    	String driverMemory ;//driver記憶體
    	String executorMemory;//executor記憶體
    	String executorInstances;//executor個數
    	String executorCores;//executor核數
    	String defaultParallelism;//引數spark.default.parallelism的值
    	//省略getter和setter
    }
    
  4. Controller
    (1)訪問應用資訊頁面

    @RequestMapping("/appInfo")
    public String appInfo(){
    	return "appInfo";
    }			
    

    (2)查詢Spark應用資訊

    @RequestMapping("/getAllAppInfo")
    @ResponseBody
    public String getAllAppInfo(){
        return sparkAppInfoService.getAllAppInfo();
    }			
    

    (3)點選某個應用,跳轉到提交頁面

    @RequestMapping("/submitApp")
    public ModelAndView submitApp(String mainClass,String jarPath){
    	ModelAndView mav = new ModelAndView();
    	mav.setViewName("submitApp");
    	mav.addObject("mainClass",mainClass);
    	mav.addObject("jarPath",jarPath);
    	return mav;
    }		
    

    這裡我希望跳轉之後,自動填寫mainClass和jarPath,我的做法是把這倆引數通過後臺轉給新頁面。由於頁面不是jsp,所以不能用el表示式獲取model值。需要靠Thymeleaf的語法th:xxx=${…}來獲取渲染資料。

    <div class="icon">
    	<label class="cd-label" for="mainClass">mainClass</label>
    	<input class="mainClass" type="text" name="mainClass" id="mainClass" th:value=${mainClass}>
    </div> 
    
    <div class="icon">
    	<label class="cd-label" for="jarPath">jarPath</label>
    	<input class="jarPath" type="text" name="jarPath" id="jarPath" th:value=${jarPath}>
    </div> 
    

    (4)提交任務

    @RequestMapping(value = "/submit")
    @ResponseBody
    public String Submit(@RequestBody SparkAppPara sparkAppPara) throws IOException, InterruptedException {
       return submitService.submitApp(sparkAppPara);
    }			
    

    (5)執行完後跳轉到結果頁面
    在這裡我希望拿到執行結果json之後,跳轉到結果頁面展示。我的做法是在Ajax請求成功後帶引數跳轉頁面,我覺得肯定有更好的辦法,在此拋磚引玉。

    success: function(data)
    {
        window.location.href=host+'/result?resultJson='+ encodeURIComponent(data);
    }
    

    因為url請求裡不能有大小括號等特殊字元,所以請求之前需要使用encodeURIComponent方法進行編碼。

    @RequestMapping("/result")
    public ModelAndView toResult(String resultJson){
       ModelAndView mav = new ModelAndView();
       mav.setViewName("result");
       mav.addObject("resultJson",resultJson);
       return mav;
    }		
    

    關於在結果頁面的JS程式碼裡獲取resultJson:
    第(3)步中,Thymeleaf直接把model值渲染到html標籤中。而在結果頁面中,我需要先拿到resultJson,進行一些處理後再渲染。在JS程式碼裡,我們可以像下面這樣來獲取resultJson。

    <script th:inline="javascript">
        var resultJson = JSON.parse([[${resultJson}]]);
    
        $("#trackingUrl").attr("href",yarnAppUrl+resultJson.id);
        $("#applicationId").html(resultJson.id);
        $("#applicationName").html(resultJson.name);
    	//次要程式碼省略
    </script>
    

    這裡需要注意的是,這部分JS程式碼只能內嵌在html頁面中,外聯JS中不會生效

  5. Service和Mapper
    (1)獲取Spark應用資訊的Service和Mapper

    @Service
    public class SparkAppInfoService {
        @Autowired
        private AppInfoMapper appInfo;
    
        public String getAllAppInfo(){
            List<AppInfo> list = appInfo.getAllAppInfo();
            return JSONObject.toJSONString(list);
        }
    }	
    
    @Component
    public interface AppInfoMapper {
        @Select("SELECT * FROM appinfo")
        @Results({
                @Result(property = "mainClass",  column = "mainclass"),
                @Result(property = "jarPath", column = "jarpath"),
                @Result(property = "note", column = "note")
        })
        List<AppInfo> getAllAppInfo();
    }
    

    (2)提交Spark應用的Service
    提交spark應用的API不止一種,我用的是org.apache.spark.launcher.SparkLauncher

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-launcher_2.12</artifactId>
        <version>2.4.0</version>
    </dependency>		
    
    @Service
    public class SparkSubmitService {
    
        public String submitApp(SparkAppPara sparkAppPara) throws IOException, InterruptedException {
            HashMap env = new HashMap();
            //這兩個屬性必須設定
            env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/hadoop");
            env.put("JAVA_HOME", "/usr/lib/jdk/jdk1.8.0_191/");
            CountDownLatch countDownLatch = new CountDownLatch(1);
            SparkAppHandle handle = new SparkLauncher(env)
               .setSparkHome("/usr/local/spark/")
                .setAppResource(sparkAppPara.getJarPath())
                .setMainClass(sparkAppPara.getMainClass())
                .setMaster(sparkAppPara.getMaster())
                .setDeployMode(sparkAppPara.getDeployMode())
                .setConf("spark.driver.memory", sparkAppPara.getDriverMemory()+"g")
                .setConf("spark.executor.memory", sparkAppPara.getExecutorMemory()+"g")
                .setConf("spark.executor.instances", sparkAppPara.getExecutorInstances())
                .setConf("spark.executor.cores", sparkAppPara.getExecutorCores())
                .setConf("spark.default.parallelism", sparkAppPara.getDefaultParallelism())
                .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                   @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        if (sparkAppHandle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                        System.out.println("state:" + sparkAppHandle.getState().toString());
                    }
    
                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                        System.out.println("Info:" + sparkAppHandle.getState().toString());
                    }
                });
            System.out.println("The task is executing, please wait ....");
            //執行緒等待任務結束
            countDownLatch.await();
            System.out.println("The task is finished!");
    		//通過Spark原生的監測api獲取執行結果資訊
            String restUrl = "http://master:18080/api/v1/applications/"+handle.getAppId();
            String resultJson = RestUtil.httpGet(restUrl,null);
    
            return resultJson;
        }
    }
    
  6. Http請求工具
    我們使用這個工具,傳送rest請求,就可以獲取Spark應用執行結果的json資訊(我覺得有一個前提是需要配置好History Server服務並啟動)。

    public class RestUtil {
        public static String httpGet(String urlStr, List<String> urlParam) throws IOException, InterruptedException {
            // 例項一個URL資源
            URL url = new URL(urlStr);	
            HttpURLConnection connet = null;
            int i = 0;
            while(connet==null || connet.getResponseCode() != 200 ){
                connet = (HttpURLConnection) url.openConnection();
                connet.setRequestMethod("GET");
                connet.setRequestProperty("Charset", "UTF-8");
                connet.setRequestProperty("Content-Type", "application/json");
                connet.setConnectTimeout(15000);// 連線超時 單位毫秒
                connet.setReadTimeout(15000);// 讀取超時 單位毫秒
                i++;
                if (i==50)break;
                Thread.sleep(500);
            }
            //將返回的值存入到String中
            BufferedReader brd = new BufferedReader(new InputStreamReader(connet.getInputStream(),"UTF-8"));
            StringBuilder  sb  = new StringBuilder();
            String line;
            while((line = brd.readLine()) != null){
                sb.append(line);
            }
            brd.close();
            connet.disconnect();
            return sb.toString();
        }
    }
    

外部引用

專案裡引用的第三方模板和外掛如下,如有侵權請聯絡我刪除

  1. 應用查詢頁面——https://www.lanrenzhijia.com/others/6564.html
  2. 任務提交頁面——https://www.lanrenzhijia.com/jquery/3981.html
  3. ajax非同步請求等待特效——http://www.jq22.com/jquery-info15050

參考資料

https://blog.csdn.net/sparkexpert/article/details/51045762
https://blog.csdn.net/u011244682/article/details/79170134
http://spark.apache.org/docs/latest/monitoring.html