From 021d98b263b967af801faa1538c83270aabc2022 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Tue, 3 Mar 2015 17:05:24 +0100 Subject: [PATCH] Initial version of jobs.py. --- jobs.py | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100755 jobs.py diff --git a/jobs.py b/jobs.py new file mode 100755 index 0000000..3e83c99 --- /dev/null +++ b/jobs.py @@ -0,0 +1,164 @@ +#! /usr/bin/python2 + +import pycurl, json +from io import BytesIO + +import re +import sys +import getopt +import socket + +base_jhs_url='' +base_rm_url='' +debug=0 +ssl=0 +details=1 +host=socket.getfqdn() + +c = pycurl.Curl() +c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE) +c.setopt(pycurl.USERPWD, ":") + +def get_rest(base_url, url): + b = BytesIO() + c.setopt(pycurl.URL, base_url + url) + #c.setopt(pycurl.WRITEDATA, b) + c.setopt(pycurl.WRITEFUNCTION, b.write) + c.perform() + s = b.getvalue().decode('utf-8') + + if c.getinfo(c.RESPONSE_CODE) != 200: + print s + print 'Status: %d' % c.getinfo(c.RESPONSE_CODE) + c.close() + b.close() + raise Exception() + + j = json.loads(s) + + if debug: + print json.dumps(j, indent=4) + + return j + +try: + opts, args = getopt.getopt(sys.argv[1:], 'hb:dj:r:s', ['help', 'base=', 'debug', 'base=', 'jhs-url=', 'rm-url=', 'ssl']) +except getopt.GetoptError: + print 'Args error' + sys.exit(2) +for opt, arg in opts: + if opt in ('-h', '--help'): + print('jobs.py [-h|--help] [-b|--base] [-d|--debug] [-j|--jhs-url URL] [-r|--rm-url URL] [-s|--ssl]') + sys.exit(0) + elif opt in ('-b', '--base'): + host = arg + elif opt in ('-d', '--debug'): + debug=1 + elif opt in ('-j', '--jhs-url'): + base_jhs_url = arg + elif opt in ('-r', '--rm-url'): + base_rm_url = arg + elif opt in ('-s', '--ssl'): + ssl=1 + else: + print 'Args error' + sys.exit(2) +if ssl: + default_rm_url = "https://" + host + ":8090" + default_jhs_url = "https://" + host + ":19890" +else: + default_rm_url = "http://" + host + ":8088" + default_jhs_url = "http://" + host + ":19888" + +if not base_rm_url: + base_rm_url = default_rm_url +if not base_jhs_url: + base_jhs_url = default_jhs_url + +jhs_url = base_jhs_url + '/ws/v1/history/mapreduce/jobs' +rm_url = base_rm_url + '/ws/v1/cluster/apps' +print '# ' + jhs_url +print '# ' + rm_url + +j1 = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs') +j2 = get_rest(base_rm_url, '/ws/v1/cluster/apps') + +class Job: + status = None + user = None + jhs = None + rm = None + +jobs = dict() + +regJob = re.compile('^job_') +if j1["jobs"]: + for j in j1["jobs"]["job"]: + id = regJob.sub('', j['id']) + if id not in jobs: + job = Job() + jobs[id] = job + else: + job = jobs[id] + job.jhs = j; + job.status = job.jhs['state'] + job.user = job.jhs['user'] + +regApp = re.compile('^application_') +if j2["apps"]: + for j in j2["apps"]["app"]: + id = regApp.sub('', j['id']) + if id not in jobs: + job = Job() + jobs[id] = job + else: + job = jobs[id] + job.rm = j; + if not job.status: + job.status = job.rm['finalStatus'] + if not job.user: + job.user = job.rm['user'] + +regHost = re.compile(':\d+') +counter=1 +for id, job in jobs.iteritems(): + print 'job %s (%d):' % (id, counter) + counter += 1 + print ' status: %s' % job.status + print ' user: %s' % job.user + if job.jhs: + print ' submit: %d, start: %d, finish: %d' % (job.jhs['submitTime'], job.jhs['startTime'], job.jhs['finishTime']) + print ' elapsed: %.3f s' % ((job.jhs['finishTime'] - job.jhs['startTime']) / 1000.0) + print ' finished: %.3f s' % ((job.jhs['finishTime'] - job.jhs['submitTime']) / 1000.0) + + if details: + nodes = dict() + t = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id) + if t['tasks']: + aggregate=0 + for task in t['tasks']['task']: +# # print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime']) + aggregate += task['elapsedTime'] + a = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id'])) + if a['taskAttempts']: + for attempt in a['taskAttempts']['taskAttempt']: + nodeHost = regHost.sub('', attempt['nodeHttpAddress']) + if nodeHost not in nodes: + nodes[nodeHost] = 0 + nodes[nodeHost] += attempt['elapsedTime'] +# # print 'tasks elapsed: %d' % aggregate + + aggregate=0 + for node, elapsed in nodes.iteritems(): + print ' node %s: %d' % (node, elapsed) + aggregate += elapsed + print ' ==> aggregated %d' % aggregate + + if job.rm: + print ' MB x s: %d' % job.rm['memorySeconds'] + print ' CPU x s: %d' % job.rm['vcoreSeconds'] + + print + +c.close() + -- 1.8.2.3