From 7aec0a590c2dc6d314115853fbc106cb4f541931 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Wed, 4 Mar 2015 15:31:08 +0100 Subject: [PATCH] Features: config file, single job query. --- jobs.py | 185 ++++++++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 129 insertions(+), 56 deletions(-) diff --git a/jobs.py b/jobs.py index 3e83c99..120b195 100755 --- a/jobs.py +++ b/jobs.py @@ -8,17 +8,27 @@ import sys import getopt import socket -base_jhs_url='' -base_rm_url='' +class Job: + status = None + user = None + mapred = None + yarn = None + +base_mapred_url='' +base_yarn_url='' debug=0 ssl=0 details=1 host=socket.getfqdn() +id = None + +jobs = dict() c = pycurl.Curl() c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE) c.setopt(pycurl.USERPWD, ":") + def get_rest(base_url, url): b = BytesIO() c.setopt(pycurl.URL, base_url + url) @@ -41,83 +51,146 @@ def get_rest(base_url, url): return j + +def gen_url(base_url, port_nossl, port_ssl): + if ssl: + schema = 'https://' + port = port_ssl + else: + schema = 'http://' + port = port_nossl + + if not base_url: + base_url = host + if not '://' in base_url: + base_url = schema + base_url + if not re.match(r'.*:\d+$', base_url): + base_url = base_url + ':%d' % port + + return base_url + + try: - opts, args = getopt.getopt(sys.argv[1:], 'hb:dj:r:s', ['help', 'base=', 'debug', 'base=', 'jhs-url=', 'rm-url=', 'ssl']) + opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'debug', 'jobs=', 'mapred=', 'yarn=', 'ssl']) except getopt.GetoptError: print 'Args error' sys.exit(2) for opt, arg in opts: if opt in ('-h', '--help'): - print('jobs.py [-h|--help] [-b|--base] [-d|--debug] [-j|--jhs-url URL] [-r|--rm-url URL] [-s|--ssl]') + print('jobs.py [-h|--help] [-b|--base] [-c|--config] [-d|--debug] [-j|--jobs] [-m|--mapred URL] [-y|--yarn URL] [-s|--ssl]') sys.exit(0) elif opt in ('-b', '--base'): host = arg + elif opt in ('-c', '--config'): + f = open(arg, 'r') + for line in f: + cfg=line.rstrip().split('=') + if cfg[0] == 'base': + host = cfg[1] + if cfg[0] == 'mapred': + base_mapred_url = cfg[1] + print cfg + elif cfg[0] == 'yarn': + base_yarn_url = cfg[1] + print cfg + elif cfg[0] == 'ssl': + ssl = cfg[1] elif opt in ('-d', '--debug'): debug=1 - elif opt in ('-j', '--jhs-url'): - base_jhs_url = arg - elif opt in ('-r', '--rm-url'): - base_rm_url = arg + elif opt in ('-j', '--jobs'): + id = arg + elif opt in ('-m', '--mapred'): + base_mapred_url = arg + elif opt in ('-y', '--yarn'): + base_yarn_url = arg elif opt in ('-s', '--ssl'): ssl=1 else: print 'Args error' sys.exit(2) -if ssl: - default_rm_url = "https://" + host + ":8090" - default_jhs_url = "https://" + host + ":19890" -else: - default_rm_url = "http://" + host + ":8088" - default_jhs_url = "http://" + host + ":19888" - -if not base_rm_url: - base_rm_url = default_rm_url -if not base_jhs_url: - base_jhs_url = default_jhs_url - -jhs_url = base_jhs_url + '/ws/v1/history/mapreduce/jobs' -rm_url = base_rm_url + '/ws/v1/cluster/apps' -print '# ' + jhs_url -print '# ' + rm_url - -j1 = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs') -j2 = get_rest(base_rm_url, '/ws/v1/cluster/apps') - -class Job: - status = None - user = None - jhs = None - rm = None -jobs = dict() +base_yarn_url = gen_url(base_yarn_url, 8088, 8090) +base_mapred_url = gen_url(base_mapred_url, 19888, 19890) regJob = re.compile('^job_') -if j1["jobs"]: - for j in j1["jobs"]["job"]: - id = regJob.sub('', j['id']) +regApp = re.compile('^application_') +if id: + if regJob.match(id): id = regJob.sub('', id) + if regApp.match(id): id = regApp.sub('', id) + + mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs/job_%s' % id + yarn_url = base_yarn_url + '/ws/v1/cluster/apps/application_%s' % id + print '# ' + mapred_url + print '# ' + yarn_url + + try: + j1 = get_rest(mapred_url, '') + except Exception: + j1 = None + + try: + j2 = get_rest(yarn_url, '') + except Exception: + j2 = None + + if j1 and j1['job']: if id not in jobs: job = Job() jobs[id] = job else: job = jobs[id] - job.jhs = j; - job.status = job.jhs['state'] - job.user = job.jhs['user'] + job.mapred = j1['job']; + job.status = job.mapred['state'] + job.user = job.mapred['user'] -regApp = re.compile('^application_') -if j2["apps"]: - for j in j2["apps"]["app"]: - id = regApp.sub('', j['id']) + if j2 and j2['app']: if id not in jobs: job = Job() jobs[id] = job else: job = jobs[id] - job.rm = j; + job.yarn = j2['app']; if not job.status: - job.status = job.rm['finalStatus'] + job.status = job.yarn['finalStatus'] if not job.user: - job.user = job.rm['user'] + job.user = job.yarn['user'] + + print jobs +else: + mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs' + yarn_url = base_yarn_url + '/ws/v1/cluster/apps' + print '# ' + mapred_url + print '# ' + yarn_url + + j1 = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs') + j2 = get_rest(base_yarn_url, '/ws/v1/cluster/apps') + + if j1["jobs"]: + for j in j1["jobs"]["job"]: + id = regJob.sub('', j['id']) + if id not in jobs: + job = Job() + jobs[id] = job + else: + job = jobs[id] + job.mapred = j; + job.status = job.mapred['state'] + job.user = job.mapred['user'] + + if j2["apps"]: + for j in j2["apps"]["app"]: + id = regApp.sub('', j['id']) + if id not in jobs: + job = Job() + jobs[id] = job + else: + job = jobs[id] + job.yarn = j; + if not job.status: + job.status = job.yarn['finalStatus'] + if not job.user: + job.user = job.yarn['user'] + regHost = re.compile(':\d+') counter=1 @@ -126,20 +199,20 @@ for id, job in jobs.iteritems(): counter += 1 print ' status: %s' % job.status print ' user: %s' % job.user - if job.jhs: - print ' submit: %d, start: %d, finish: %d' % (job.jhs['submitTime'], job.jhs['startTime'], job.jhs['finishTime']) - print ' elapsed: %.3f s' % ((job.jhs['finishTime'] - job.jhs['startTime']) / 1000.0) - print ' finished: %.3f s' % ((job.jhs['finishTime'] - job.jhs['submitTime']) / 1000.0) + if job.mapred: + print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime']) + print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0) + print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0) if details: nodes = dict() - t = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id) + t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id) if t['tasks']: aggregate=0 for task in t['tasks']['task']: # # print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime']) aggregate += task['elapsedTime'] - a = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id'])) + a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id'])) if a['taskAttempts']: for attempt in a['taskAttempts']['taskAttempt']: nodeHost = regHost.sub('', attempt['nodeHttpAddress']) @@ -154,9 +227,9 @@ for id, job in jobs.iteritems(): aggregate += elapsed print ' ==> aggregated %d' % aggregate - if job.rm: - print ' MB x s: %d' % job.rm['memorySeconds'] - print ' CPU x s: %d' % job.rm['vcoreSeconds'] + if job.yarn: + print ' MB x s: %d' % job.yarn['memorySeconds'] + print ' CPU x s: %d' % job.yarn['vcoreSeconds'] print -- 1.8.2.3