Features: config file, single job query.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 4 Mar 2015 14:31:08 +0000 (15:31 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 4 Mar 2015 14:31:08 +0000 (15:31 +0100)
jobs.py

diff --git a/jobs.py b/jobs.py
index 3e83c99..120b195 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -8,17 +8,27 @@ import sys
 import getopt
 import socket
 
-base_jhs_url=''
-base_rm_url=''
+class Job:
+       status = None
+       user = None
+       mapred = None
+       yarn = None
+
+base_mapred_url=''
+base_yarn_url=''
 debug=0
 ssl=0
 details=1
 host=socket.getfqdn()
+id = None
+
+jobs = dict()
 
 c = pycurl.Curl()
 c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
 c.setopt(pycurl.USERPWD, ":")
 
+
 def get_rest(base_url, url):
        b = BytesIO()
        c.setopt(pycurl.URL, base_url + url)
@@ -41,83 +51,146 @@ def get_rest(base_url, url):
 
        return j
 
+
+def gen_url(base_url, port_nossl, port_ssl):
+       if ssl:
+               schema = 'https://'
+               port = port_ssl
+       else:
+               schema = 'http://'
+               port = port_nossl
+
+       if not base_url:
+               base_url = host
+       if not '://' in base_url:
+               base_url = schema + base_url
+       if not re.match(r'.*:\d+$', base_url):
+               base_url = base_url + ':%d' % port
+
+       return base_url
+
+
 try:
-       opts, args = getopt.getopt(sys.argv[1:], 'hb:dj:r:s', ['help', 'base=', 'debug', 'base=', 'jhs-url=', 'rm-url=', 'ssl'])
+       opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'debug', 'jobs=', 'mapred=', 'yarn=', 'ssl'])
 except getopt.GetoptError:
        print 'Args error'
        sys.exit(2)
 for opt, arg in opts:
        if opt in ('-h', '--help'):
-               print('jobs.py [-h|--help] [-b|--base] [-d|--debug] [-j|--jhs-url URL] [-r|--rm-url URL] [-s|--ssl]')
+               print('jobs.py [-h|--help] [-b|--base] [-c|--config] [-d|--debug] [-j|--jobs] [-m|--mapred URL] [-y|--yarn URL] [-s|--ssl]')
                sys.exit(0)
        elif opt in ('-b', '--base'):
                host = arg
+       elif opt in ('-c', '--config'):
+               f = open(arg, 'r')
+               for line in f:
+                       cfg=line.rstrip().split('=')
+                       if cfg[0] == 'base':
+                               host = cfg[1]
+                       if cfg[0] == 'mapred':
+                               base_mapred_url = cfg[1]
+                               print cfg
+                       elif cfg[0] == 'yarn':
+                               base_yarn_url = cfg[1]
+                               print cfg
+                       elif cfg[0] == 'ssl':
+                               ssl = cfg[1]
        elif opt in ('-d', '--debug'):
                debug=1
-       elif opt in ('-j', '--jhs-url'):
-               base_jhs_url = arg
-       elif opt in ('-r', '--rm-url'):
-               base_rm_url = arg
+       elif opt in ('-j', '--jobs'):
+               id = arg
+       elif opt in ('-m', '--mapred'):
+               base_mapred_url = arg
+       elif opt in ('-y', '--yarn'):
+               base_yarn_url = arg
        elif opt in ('-s', '--ssl'):
                ssl=1
        else:
                print 'Args error'
                sys.exit(2)
-if ssl:
-       default_rm_url = "https://" + host + ":8090"
-       default_jhs_url = "https://" + host + ":19890"
-else:
-       default_rm_url = "http://" + host + ":8088"
-       default_jhs_url = "http://" + host + ":19888"
-       
-if not base_rm_url:
-       base_rm_url = default_rm_url
-if not base_jhs_url:
-       base_jhs_url = default_jhs_url
-
-jhs_url = base_jhs_url + '/ws/v1/history/mapreduce/jobs'
-rm_url = base_rm_url + '/ws/v1/cluster/apps'
-print '# ' + jhs_url
-print '# ' + rm_url
-
-j1 = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs')
-j2 = get_rest(base_rm_url, '/ws/v1/cluster/apps')
-
-class Job:
-       status = None
-       user = None
-       jhs = None
-       rm = None
 
-jobs = dict()
+base_yarn_url = gen_url(base_yarn_url, 8088, 8090)
+base_mapred_url = gen_url(base_mapred_url, 19888, 19890)
 
 regJob = re.compile('^job_')
-if j1["jobs"]:
-       for j in j1["jobs"]["job"]:
-               id = regJob.sub('', j['id'])
+regApp = re.compile('^application_')
+if id:
+       if regJob.match(id): id = regJob.sub('', id)
+       if regApp.match(id): id = regApp.sub('', id)
+
+       mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs/job_%s' % id
+       yarn_url = base_yarn_url + '/ws/v1/cluster/apps/application_%s' % id
+       print '# ' + mapred_url
+       print '# ' + yarn_url
+
+       try:
+               j1 = get_rest(mapred_url, '')
+       except Exception:
+               j1 = None
+
+       try:
+               j2 = get_rest(yarn_url, '')
+       except Exception:
+               j2 = None
+
+       if j1 and j1['job']:
                if id not in jobs:
                        job  = Job()
                        jobs[id] = job
                else:
                        job = jobs[id]
-               job.jhs = j;
-               job.status = job.jhs['state']
-               job.user = job.jhs['user']
+               job.mapred = j1['job'];
+               job.status = job.mapred['state']
+               job.user = job.mapred['user']
 
-regApp = re.compile('^application_')
-if j2["apps"]:
-       for j in j2["apps"]["app"]:
-               id = regApp.sub('', j['id'])
+       if j2 and j2['app']:
                if id not in jobs:
                        job  = Job()
                        jobs[id] = job
                else:
                        job = jobs[id]
-               job.rm = j;
+               job.yarn = j2['app'];
                if not job.status:
-                       job.status = job.rm['finalStatus']
+                       job.status = job.yarn['finalStatus']
                if not job.user:
-                       job.user = job.rm['user']
+                       job.user = job.yarn['user']
+
+       print jobs
+else:
+       mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs'
+       yarn_url = base_yarn_url + '/ws/v1/cluster/apps'
+       print '# ' + mapred_url
+       print '# ' + yarn_url
+
+       j1 = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs')
+       j2 = get_rest(base_yarn_url, '/ws/v1/cluster/apps')
+
+       if j1["jobs"]:
+               for j in j1["jobs"]["job"]:
+                       id = regJob.sub('', j['id'])
+                       if id not in jobs:
+                               job  = Job()
+                               jobs[id] = job
+                       else:
+                               job = jobs[id]
+                       job.mapred = j;
+                       job.status = job.mapred['state']
+                       job.user = job.mapred['user']
+
+       if j2["apps"]:
+               for j in j2["apps"]["app"]:
+                       id = regApp.sub('', j['id'])
+                       if id not in jobs:
+                               job  = Job()
+                               jobs[id] = job
+                       else:
+                               job = jobs[id]
+                       job.yarn = j;
+                       if not job.status:
+                               job.status = job.yarn['finalStatus']
+                       if not job.user:
+                               job.user = job.yarn['user']
+
 
 regHost = re.compile(':\d+')
 counter=1
@@ -126,20 +199,20 @@ for id, job in jobs.iteritems():
        counter += 1
        print '  status: %s' % job.status
        print '  user: %s' % job.user
-       if job.jhs:
-               print '  submit: %d, start: %d, finish: %d' % (job.jhs['submitTime'], job.jhs['startTime'], job.jhs['finishTime'])
-               print '  elapsed: %.3f s' % ((job.jhs['finishTime'] - job.jhs['startTime']) / 1000.0)
-               print '  finished: %.3f s' % ((job.jhs['finishTime'] - job.jhs['submitTime']) / 1000.0)
+       if job.mapred:
+               print '  submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
+               print '  elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
+               print '  finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
 
                if details:
                        nodes = dict()
-                       t = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
+                       t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
                        if t['tasks']:
                                aggregate=0
                                for task in t['tasks']['task']:
 #      #                               print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime'])
                                        aggregate += task['elapsedTime']
-                                       a = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
+                                       a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
                                        if a['taskAttempts']:
                                                for attempt in a['taskAttempts']['taskAttempt']:
                                                        nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
@@ -154,9 +227,9 @@ for id, job in jobs.iteritems():
                                aggregate += elapsed
                        print '  ==> aggregated %d' % aggregate
 
-       if job.rm:
-               print '  MB x s: %d' % job.rm['memorySeconds']
-               print '  CPU x s: %d' % job.rm['vcoreSeconds']
+       if job.yarn:
+               print '  MB x s: %d' % job.yarn['memorySeconds']
+               print '  CPU x s: %d' % job.yarn['vcoreSeconds']
 
        print