Fine tunging of verbosity.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 4 Mar 2015 20:37:52 +0000 (21:37 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 4 Mar 2015 20:37:52 +0000 (21:37 +0100)
jobs.py

diff --git a/jobs.py b/jobs.py
index 00b3988..5859749 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -60,7 +60,9 @@ def get_rest(base_url, url):
 
        j = json.loads(s)
 
-       if debug:
+       if debug >= 2:
+               print '# %s%s' % (base_url, url)
+       if debug >= 3:
                print json.dumps(j, indent=4)
 
        return j
@@ -99,7 +101,7 @@ def dbstr(s):
 
 
 try:
-       opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'jobid=', 'mapred=', 'yarn=', 'ssl'])
+       opts, args = getopt.getopt(sys.argv[1:], 'hb:c:d:j:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug=', 'jobid=', 'mapred=', 'yarn=', 'ssl'])
 except getopt.GetoptError:
        print 'Args error'
        sys.exit(2)
@@ -110,7 +112,7 @@ OPTIONS are:\n\
   -h, --help ........ help message\n\
   -b, --base ........ default hostname for YARN ans MapReduce\n\
   -c, --config ...... config file\n\
-  -d, --debug ....... debug output\n\
+  -d, --debug LEVEL . debug output (1=progress, 2=more, 3=json dumps)\n\
   --db .............. enable database\n\
   --dbhost\n\
   --dbname\n\
@@ -140,6 +142,8 @@ OPTIONS are:\n\
                                dbuser = cfg[1]
                        elif cfg[0] == 'dbpassword':
                                dbpassword = cfg[1]
+                       elif cfg[0] == 'debug':
+                               debug = int(cfg[1])
                        elif cfg[0] == 'mapred':
                                base_mapred_url = cfg[1]
                                print cfg
@@ -149,7 +153,7 @@ OPTIONS are:\n\
                        elif cfg[0] == 'ssl':
                                ssl = cfg[1]
        elif opt in ('-d', '--debug'):
-               debug=1
+               debug = int(arg)
        elif opt in ('--db'):
                db = 1
        elif opt in ('--dbhost'):
@@ -175,6 +179,10 @@ OPTIONS are:\n\
 base_yarn_url = gen_url(base_yarn_url, 8088, 8090)
 base_mapred_url = gen_url(base_mapred_url, 19888, 19890)
 
+if debug >= 1:
+       print '[MR] URL: ' + base_mapred_url
+       print '[YARN] URL: ' + base_yarn_url
+
 regJob = re.compile('^job_')
 regApp = re.compile('^application_')
 if id:
@@ -183,8 +191,6 @@ if id:
 
        mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs/job_%s' % id
        yarn_url = base_yarn_url + '/ws/v1/cluster/apps/application_%s' % id
-       print '# ' + mapred_url
-       print '# ' + yarn_url
 
        try:
                j1 = get_rest(mapred_url, '')
@@ -196,6 +202,7 @@ if id:
        except Exception:
                j2 = None
 
+       counter = 0
        if j1 and j1['job']:
                if id not in jobs:
                        job  = Job()
@@ -203,7 +210,10 @@ if id:
                else:
                        job = jobs[id]
                job.mapred = j1['job'];
+               counter += 1
+       if debug >= 1: print '[MR] %d jobs' % counter
 
+       counter = 0
        if j2 and j2['app']:
                if id not in jobs:
                        job  = Job()
@@ -211,17 +221,16 @@ if id:
                else:
                        job = jobs[id]
                job.yarn = j2['app'];
-
-       print jobs
+               counter += 1
+       if debug >= 1: print '[YARN] %d jobs' % counter
 else:
        mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs'
        yarn_url = base_yarn_url + '/ws/v1/cluster/apps'
-       print '# ' + mapred_url
-       print '# ' + yarn_url
 
        j1 = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs')
        j2 = get_rest(base_yarn_url, '/ws/v1/cluster/apps')
 
+       counter = 0
        if j1["jobs"]:
                for j in j1["jobs"]["job"]:
                        id = regJob.sub('', j['id'])
@@ -231,7 +240,10 @@ else:
                        else:
                                job = jobs[id]
                        job.mapred = j;
+                       counter += 1
+       if debug >= 1: print '[MR] %d jobs' % counter
 
+       counter = 0
        if j2["apps"]:
                for j in j2["apps"]["app"]:
                        id = regApp.sub('', j['id'])
@@ -241,6 +253,8 @@ else:
                        else:
                                job = jobs[id]
                        job.yarn = j;
+                       counter += 1
+       if debug >= 1: print '[YARN] %d jobs' % counter
 
 if db:
        db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
@@ -263,10 +277,10 @@ for id, job in jobs.iteritems():
                        job.user = job.yarn['user']
                if not job.start:
                        job.start = job.yarn['startedTime']
-                       if debug: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
+                       if debug >= 1: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
                if not job.finish:
                        job.finish = job.yarn['finishedTime']
-                       if debug: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
+                       if debug >= 1: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
 
        print 'job %s (%d):' % (id, counter)
        counter += 1
@@ -315,31 +329,31 @@ for id, job in jobs.iteritems():
                data = st.fetchone()
                if data:
                        if data[1] == job.user and data[2] == job.status and data[4] == job.start and data[5] == job.finish:
-                               if debug: print '[db] job %s found' % id
+                               if debug >= 2: print '[db] job %s found' % id
                        else:
                                st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id))
-                               if debug: print '[db] job %s updated' % id
+                               if debug >= 2: print '[db] job %s updated' % id
 
                else:
                        st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', '%s', '%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish)))
-                       if debug: print '[db] job %s inserted' % id
+                       if debug >= 2: print '[db] job %s inserted' % id
                if job.mapred:
                        if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']:
-                               if debug: print '[db] job %s mapred is actual' % id
+                               if debug >= 2: print '[db] job %s mapred is actual' % id
                        else:
                                st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id))
-                               if debug: print '[db] job %s mapred updated' % id
+                               if debug >= 2: print '[db] job %s mapred updated' % id
                if job.yarn:
                        if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']:
-                               if debug: print '[db] job %s yarn is actual' % id
+                               if debug >= 2: print '[db] job %s yarn is actual' % id
                        else:
                                st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
-                               if debug: print '[db] job %s yarn updated' % id
+                               if debug >= 2: print '[db] job %s yarn updated' % id
                if nodes:
                        st.execute("DELETE FROM node WHERE jobid='%s'" % id)
                        for nodename, node in nodes.iteritems():
                                st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce))
-                       if debug: print '[db] job %s nodes updated' % id
+                       if debug >= 2: print '[db] job %s nodes updated' % id
                db.commit()
 
        print