1. 程式人生 > >通過Spark Rest 服務監控Spark任務執行情況

通過Spark Rest 服務監控Spark任務執行情況

com 理想 ask cin *** lib add pan etime

1、Rest服務

  Spark源為了方便用戶對任務做監控,從1.4版本啟用Rest服務,用戶可以通過訪問地址,得到application的運行狀態。

  Spark的REST API返回的信息是JSON格式的,開發者們可以很方便地通過這個API來創建可視化的Spark監控工具。目前

  這個API支持正在運行的應用程序,也支持歷史服務器。在請求URL都有/api/v1。比如,對於歷史服務器來說,我們可以通過   http://***:18080/api/v1 來獲取一些信息,端口可以改;對於正在運行的Spark應用程序,我們可以通過 https://***/api/v1   來獲取一些信息。   主要用途: 通過rest服務,可以輕松對任務時長、stage等做監控,同時可以配合時間序列數據庫,對集群各個任務做監控。

2、實例代碼(Python)

  

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 ‘‘‘
 4     Created by zhangy on Aug 25, 2017
 5 ‘‘‘
 6 import datetime
 7 import json, urllib2
 8 import os
 9 import time  
10 
11
12 if __name__ == __main__:
13     command = "yarn application -list |grep Noce |awk -F‘\t‘ ‘{print $1}‘"
14     val = os.popen(command).read()
15 appids = val.split("\n") 16 for pid in appids: 17 if pid.__eq__(""):continue 18 url = "http://th04-znwg-sgi620-001:18088/api/v1/applications/" + pid 19 req = urllib2.Request(url) 20 res_data = urllib2.urlopen(req) 21 res = res_data.read() 22 jo = json.loads(res)
23 dict1 = jo[attempts][0] 24 st = dict1[startTime] 25 GMT_FORMAT = %Y-%m-%dT%H:%M:%S.%fGMT 26 sti = datetime.datetime.strptime(st, GMT_FORMAT) 27 startTime = time.mktime(sti.timetuple()) + 8 * 60 * 60 28 nowTime = long(time.time()) 29 sub = nowTime - startTime 30 if sub > 4 * 60 * 60: 31 killCommand = "yarn application -kill " + pid 32 res = os.popen(command).read() 33 cc = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(float(nowTime))) 34 f = open("/home/noce1/run_noce/his/monitor/" + pid + ".txt", "a") 35 f.write(cc + " : " + "pid : " + "\n" + sub + " seconds") 36 f.write(res + "\n") 37 f.close() 38 39 40

  測試實例中,只是對spark任務的時間做了監控,如果任務超過理想執行時長(4個小時),則終止任務,釋放資源。

 結果:

 1 例如:
 2 http://132.12*****:18088/api/v1/applications/application_1502706935975_233268/
 3 
 4 返回內容:json格式
 5 {
 6   "id" : "application_1502706935975_233268",
 7   "name" : "FRT3_73",
 8   "attempts" : [ {
 9     "startTime" : "2017-09-04T01:29:53.986GMT",
10     "endTime" : "2017-09-04T01:31:52.955GMT",
11     "sparkUser" : "noce1",
12     "completed" : true
13   } ]
14 }

3、官方其他(2.1.0版本,http:**** :18080/api/v1/)

Endpoint Meaning
/applications A list of all applications.
?status=[completed|running] list only applications in the chosen state.
?minDate=[date] earliest start date/time to list.
?maxDate=[date] latest start date/time to list.
?minEndDate=[date] earliest end date/time to list.
?maxEndDate=[date] latest end date/time to list.
?limit=[limit] limits the number of applications listed.
Examples:
?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs A list of all jobs for a given application.
?status=[running|succeeded|failed|unknown] list only jobs in the specific state.
/applications/[app-id]/jobs/[job-id] Details for the given job.
/applications/[app-id]/stages A list of all stages for a given application.
/applications/[app-id]/stages/[stage-id] A list of all attempts for the given stage.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] Details for the given stage attempt.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary Summary metrics of all tasks in the given stage attempt.
?quantiles summarize the metrics with the given quantiles.
Example: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList A list of all tasks for the given stage attempt.
?offset=[offset]&length=[len] list tasks in the given range.
?sortBy=[runtime|-runtime] sort the tasks.
Example: ?offset=10&length=50&sortBy=runtime
/applications/[app-id]/executors A list of all active executors for the given application.
/applications/[app-id]/allexecutors A list of all(active and dead) executors for the given application.
/applications/[app-id]/storage/rdd A list of stored RDDs for the given application.
/applications/[app-id]/storage/rdd/[rdd-id] Details for the storage status of a given RDD.
/applications/[base-app-id]/logs Download the event logs for all attempts of the given application as files within a zip file.
/applications/[base-app-id]/[attempt-id]/logs Download the event logs for a specific application attempt as a zip file.
/applications/[app-id]/streaming/statistics Statistics for the streaming context.
/applications/[app-id]/streaming/receivers A list of all streaming receivers.
/applications/[app-id]/streaming/receivers/[stream-id] Details of the given receiver.
/applications/[app-id]/streaming/batches A list of all retained batches.
/applications/[app-id]/streaming/batches/[batch-id] Details of the given batch.
/applications/[app-id]/streaming/batches/[batch-id]/operations A list of all output operations of the given batch.
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] Details of the given operation and given batch.
/applications/[app-id]/environment Environment details of the given application.

通過Spark Rest 服務監控Spark任務執行情況