1. 程式人生 > >java提交spark任務到yarn平臺

java提交spark任務到yarn平臺

一、背景

    採用spark的方式處理,所以需要將spark的功能整合到程式碼,採用yarn客戶端的方式管理spark任務。不需要將cdh的一些配置檔案放到resource路徑下,只需要配置一些配置即可,非常方便

二、任務管理架構

三、介面

1、任務提交

1.  /**

2.   * 提交任務到yarn叢集

3.   * 

4.   * @param conditions

5.   *            yarn叢集,sparkhdfs具體資訊,引數等

6.   * @return appid

7.   */

8.  public String submitSpark(YarnSubmitConditions conditions) {  

9.      logger.info("初始化spark on yarn引數");  

10.

11. // 初始化yarn客戶端

12.     logger.info("初始化spark on yarn客戶端");  

13.     List<String> args = Lists.newArrayList("--jar", conditions.getApplicationJar(), "--class",  

14.             conditions.getMainClass());  

15. if (conditions.getOtherArgs() != null && conditions.getOtherArgs().size() > 

0) {  

16. for (String s : conditions.getOtherArgs()) {  

17.             args.add("--arg");  

18.             args.add(org.apache.commons.lang.StringUtils.join(new String[] { s }, ","));  

19.         }  

20.     }  

21.

22. // identify that you will be using Spark as YARN mode

23.     System.setProperty("SPARK_YARN_MODE"

"true");  

24.     SparkConf sparkConf = new SparkConf();  

25. if (org.apache.commons.lang.StringUtils.isNotEmpty(conditions.getJobName())) {  

26.         sparkConf.setAppName(conditions.getJobName());  

27.     }  

28.

29.     sparkConf.set("spark.yarn.jars", conditions.getSparkYarnJars());  

30. if (conditions.getAdditionalJars() != null && conditions.getAdditionalJars().length > 0) {  

31.         sparkConf.set("spark.jars", org.apache.commons.lang.StringUtils.join(conditions.getAdditionalJars(), ","));  

32.     }  

33.

34. if (conditions.getFiles() != null && conditions.getFiles().length > 0) {  

35.         sparkConf.set("spark.files", org.apache.commons.lang.StringUtils.join(conditions.getFiles(), ","));  

36.     }  

37. for (Map.Entry e : conditions.getSparkProperties().entrySet()) {  

38.         sparkConf.set(e.getKey().toString(), e.getValue().toString());  

39.     }  

40.

41. // 新增這個引數,不然spark會一直請求0.0.0.0:8030,一直重試

42.     sparkConf.set("yarn.resourcemanager.hostname", conditions.getYarnResourcemanagerAddress().split(":")[0]);  

43. // 設定為true,不刪除快取的jar包,因為現在提交yarn任務是使用的程式碼配置,沒有配置檔案,刪除快取的jar包有問題,

44.     sparkConf.set("spark.yarn.preserve.staging.files""true");  

45.

46. // 初始化 yarn的配置

47.     Configuration cf = new Configuration();  

48.     String os = System.getProperty("os.name");  

49. boolean cross_platform = false;  

50. if (os.contains("Windows")) {  

51.         cross_platform = true;  

52.     }  

53.     cf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平臺提交任務

54. // 設定yarn資源,不然會使用localhost:8032

55.     cf.set("yarn.resourcemanager.address", conditions.getYarnResourcemanagerAddress());  

56. // 設定namenode的地址,不然jar包會分發,非常噁心

57.     cf.set("fs.defaultFS", conditions.getSparkFsDefaultFS());  

58.

59.     ClientArguments cArgs = new ClientArguments(args.toArray(new String[args.size()]));  

60.     Client client = new Client(cArgs, cf, sparkConf);  

61.     logger.info("提交任務,任務名稱:" + conditions.getJobName());  

62.

63. try {  

64.

65.         ApplicationId appId = client.submitApplication();  

66.

67. return appId.toString();  

68.

69.     } catch (Exception e) {  

70.         logger.error("提交spark任務失敗", e);  

71. returnnull;  

72.     } finally {  

73. if (client != null) {  

74.             client.stop();  

75.         }  

76.     }  

77. }  

2、任務進度獲取

1.  /**

2.   * 停止spark任務

3.   * 

4.   * @param yarnResourcemanagerAddress

5.   *            yarn資源管理器地址,例如:master:8032,檢視yarn叢集獲取具體地址

6.   * @param appIdStr

7.   *            需要取消的任務id

8.   */

9.  publicvoid killJob(String yarnResourcemanagerAddress, String appIdStr) {  

10.     logger.info("取消spark任務,任務id" + appIdStr);  

11. // 初始化 yarn的配置

12.     Configuration cf = new Configuration();  

13.     String os = System.getProperty("os.name");  

14. boolean cross_platform = false;  

15. if (os.contains("Windows")) {  

16.         cross_platform = true;  

17.     }  

18.     cf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平臺提交任務

19. // 設定yarn資源,不然會使用localhost:8032

20.     cf.set("yarn.resourcemanager.address", yarnResourcemanagerAddress);  

21.

22. // 建立yarn的客戶端,此類中有殺死任務的方法

23.     YarnClient yarnClient = YarnClient.createYarnClient();  

24. // 初始化yarn的客戶端

25.     yarnClient.init(cf);  

26. // yarn客戶端啟動

27.     yarnClient.start();  

28. try {  

29. // 根據應用id,殺死應用

30.         yarnClient.killApplication(getAppId(appIdStr));  

31.     } catch (Exception e) {  

32.         logger.error("取消spark任務失敗", e);  

33.     }  

34. // 關閉yarn客戶端

35.     yarnClient.stop();  

36.

37. }  

3、任務取消

1.  /**

2.   * 獲取spark任務狀態

3.   * 

4.   * 

5.   * @param yarnResourcemanagerAddress

6.   *            yarn資源管理器地址,例如:master:8032,檢視yarn叢集獲取具體地址

7.   * @param appIdStr

8.   *            需要取消的任務id

9.   */

10. public SparkTaskState getStatus(String yarnResourcemanagerAddress, String appIdStr) {  

11.     logger.info("獲取任務狀態啟動,任務id" + appIdStr);  

12. // 初始化 yarn的配置

13.     Configuration cf = new Configuration();  

14.     String os = System.getProperty("os.name");  

15. boolean cross_platform = false;  

16. if (os.contains("Windows")) {  

17.         cross_platform = true;  

18.     }  

19.     cf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平臺提交任務

20. // 設定yarn資源,不然會使用localhost:8032

21.     cf.set("yarn.resourcemanager.address", yarnResourcemanagerAddress);  

22.     logger.info("獲取任務狀態,任務id:" + appIdStr);  

23.

24.     SparkTaskState taskState = new SparkTaskState();  

25. // 設定任務id

26.     taskState.setAppId(appIdStr);  

27.     YarnClient yarnClient = YarnClient.createYarnClient();  

28. // 初始化yarn的客戶端

29.     yarnClient.init(cf);  

30. // yarn客戶端啟動

31.     yarnClient.start();  

32.     ApplicationReport report = null;  

33. try {  

34.         report = yarnClient.getApplicationReport(getAppId(appIdStr));  

35.     } catch (Exception e) {  

36.         logger.error("獲取spark任務狀態失敗");  

37.     }  

38.

39. if(report != null){  

40.         YarnApplicationState state = report.getYarnApplicationState();  

41.         taskState.setState(state.name());  

42. // 任務執行進度

43. float progress = report.getProgress();  

44.         taskState.setProgress(progress);  

45. // 最終狀態

46.         FinalApplicationStatus status = report.getFinalApplicationStatus();  

47.         taskState.setFinalStatus(status.name());  

48.     }else{  

49.         taskState.setState(ConstParam.SPARK_FAILED);  

50.         taskState.setProgress(0.0f);  

51.         taskState.setFinalStatus(ConstParam.SPARK_FAILED);  

52.     }  

53.

54. // 關閉yarn客戶端

55.     yarnClient.stop();  

56.     logger.info("獲取任務狀態結束,任務狀態:" + JSON.toJSONString(taskState));  

57. return taskState;  

58. }  

四、yarn引數調節

1、可分配給容器的實體記憶體數量,一個nodemanage分配的記憶體,如果機器記憶體是128g,儘量分配2/3

yarn.nodemanager.resource.memory-mb80g

2、可以為容器分配的虛擬 CPU 核心的數量。該引數在 CDH 4.4 以前版本中無效。一個nodemanage分配的核數。如果機器是64和,儘量分配2/3.

yarn.nodemanager.resource.cpu-vcores40

3Java 程序堆疊記憶體的最大大小(以位元組為單位)。已傳遞到 Java -Xmx

ResourceManager Java 堆疊大小(位元組)

ResourceManager Default Group 

 B千位元組兆位元組吉位元組