Initial version of jobs.py.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 3 Mar 2015 16:05:24 +0000 (17:05 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 3 Mar 2015 16:05:24 +0000 (17:05 +0100)
jobs.py [new file with mode: 0755]

diff --git a/jobs.py b/jobs.py
new file mode 100755 (executable)
index 0000000..3e83c99
--- /dev/null
+++ b/jobs.py
@@ -0,0 +1,164 @@
+#! /usr/bin/python2
+
+import pycurl, json
+from io import BytesIO
+
+import re
+import sys
+import getopt
+import socket
+
+base_jhs_url=''
+base_rm_url=''
+debug=0
+ssl=0
+details=1
+host=socket.getfqdn()
+
+c = pycurl.Curl()
+c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
+c.setopt(pycurl.USERPWD, ":")
+
+def get_rest(base_url, url):
+       b = BytesIO()
+       c.setopt(pycurl.URL, base_url + url)
+       #c.setopt(pycurl.WRITEDATA, b)
+       c.setopt(pycurl.WRITEFUNCTION, b.write)
+       c.perform()
+       s = b.getvalue().decode('utf-8')
+
+       if c.getinfo(c.RESPONSE_CODE) != 200:
+               print s
+               print 'Status: %d' % c.getinfo(c.RESPONSE_CODE)
+               c.close()
+               b.close()
+               raise Exception()
+
+       j = json.loads(s)
+
+       if debug:
+               print json.dumps(j, indent=4)
+
+       return j
+
+try:
+       opts, args = getopt.getopt(sys.argv[1:], 'hb:dj:r:s', ['help', 'base=', 'debug', 'base=', 'jhs-url=', 'rm-url=', 'ssl'])
+except getopt.GetoptError:
+       print 'Args error'
+       sys.exit(2)
+for opt, arg in opts:
+       if opt in ('-h', '--help'):
+               print('jobs.py [-h|--help] [-b|--base] [-d|--debug] [-j|--jhs-url URL] [-r|--rm-url URL] [-s|--ssl]')
+               sys.exit(0)
+       elif opt in ('-b', '--base'):
+               host = arg
+       elif opt in ('-d', '--debug'):
+               debug=1
+       elif opt in ('-j', '--jhs-url'):
+               base_jhs_url = arg
+       elif opt in ('-r', '--rm-url'):
+               base_rm_url = arg
+       elif opt in ('-s', '--ssl'):
+               ssl=1
+       else:
+               print 'Args error'
+               sys.exit(2)
+if ssl:
+       default_rm_url = "https://" + host + ":8090"
+       default_jhs_url = "https://" + host + ":19890"
+else:
+       default_rm_url = "http://" + host + ":8088"
+       default_jhs_url = "http://" + host + ":19888"
+       
+if not base_rm_url:
+       base_rm_url = default_rm_url
+if not base_jhs_url:
+       base_jhs_url = default_jhs_url
+
+jhs_url = base_jhs_url + '/ws/v1/history/mapreduce/jobs'
+rm_url = base_rm_url + '/ws/v1/cluster/apps'
+print '# ' + jhs_url
+print '# ' + rm_url
+
+j1 = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs')
+j2 = get_rest(base_rm_url, '/ws/v1/cluster/apps')
+
+class Job:
+       status = None
+       user = None
+       jhs = None
+       rm = None
+
+jobs = dict()
+
+regJob = re.compile('^job_')
+if j1["jobs"]:
+       for j in j1["jobs"]["job"]:
+               id = regJob.sub('', j['id'])
+               if id not in jobs:
+                       job  = Job()
+                       jobs[id] = job
+               else:
+                       job = jobs[id]
+               job.jhs = j;
+               job.status = job.jhs['state']
+               job.user = job.jhs['user']
+
+regApp = re.compile('^application_')
+if j2["apps"]:
+       for j in j2["apps"]["app"]:
+               id = regApp.sub('', j['id'])
+               if id not in jobs:
+                       job  = Job()
+                       jobs[id] = job
+               else:
+                       job = jobs[id]
+               job.rm = j;
+               if not job.status:
+                       job.status = job.rm['finalStatus']
+               if not job.user:
+                       job.user = job.rm['user']
+
+regHost = re.compile(':\d+')
+counter=1
+for id, job in jobs.iteritems():
+       print 'job %s (%d):' % (id, counter)
+       counter += 1
+       print '  status: %s' % job.status
+       print '  user: %s' % job.user
+       if job.jhs:
+               print '  submit: %d, start: %d, finish: %d' % (job.jhs['submitTime'], job.jhs['startTime'], job.jhs['finishTime'])
+               print '  elapsed: %.3f s' % ((job.jhs['finishTime'] - job.jhs['startTime']) / 1000.0)
+               print '  finished: %.3f s' % ((job.jhs['finishTime'] - job.jhs['submitTime']) / 1000.0)
+
+               if details:
+                       nodes = dict()
+                       t = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
+                       if t['tasks']:
+                               aggregate=0
+                               for task in t['tasks']['task']:
+#      #                               print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime'])
+                                       aggregate += task['elapsedTime']
+                                       a = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
+                                       if a['taskAttempts']:
+                                               for attempt in a['taskAttempts']['taskAttempt']:
+                                                       nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
+                                                       if nodeHost not in nodes:
+                                                               nodes[nodeHost] = 0
+                                                       nodes[nodeHost] += attempt['elapsedTime']
+#      #                       print 'tasks elapsed: %d' % aggregate
+
+                       aggregate=0
+                       for node, elapsed in nodes.iteritems():
+                               print '  node %s: %d' % (node, elapsed)
+                               aggregate += elapsed
+                       print '  ==> aggregated %d' % aggregate
+
+       if job.rm:
+               print '  MB x s: %d' % job.rm['memorySeconds']
+               print '  CPU x s: %d' % job.rm['vcoreSeconds']
+
+       print
+
+c.close()
+