Tune verbosity (print only errors by default).
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 6 Mar 2015 14:26:22 +0000 (15:26 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 6 Mar 2015 14:26:22 +0000 (15:26 +0100)
jobs.py

diff --git a/jobs.py b/jobs.py
index b1f52fb..df2856b 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -67,9 +67,9 @@ def get_rest(base_url, url):
 
        j = json.loads(s)
 
-       if debug >= 2:
-               print '# %s%s' % (base_url, url)
        if debug >= 3:
+               print '# %s%s' % (base_url, url)
+       if debug >= 4:
                print json.dumps(j, indent=4)
 
        return j
@@ -105,7 +105,7 @@ OPTIONS are:\n\
   -h, --help ........ help message\n\
   -b, --base ........ default hostname for YARN ans MapReduce\n\
   -c, --config ...... config file\n\
-  -d, --debug LEVEL . debug output (1=progress, 2=more, 3=json dumps)\n\
+  -d, --debug LEVEL . debug output (2=progress, 3=trace, 4=json dumps)\n\
   --db .............. enable database\n\
   --dbhost\n\
   --dbname\n\
@@ -139,10 +139,8 @@ OPTIONS are:\n\
                                debug = int(cfg[1])
                        elif cfg[0] == 'mapred':
                                base_mapred_url = cfg[1]
-                               print cfg
                        elif cfg[0] == 'yarn':
                                base_yarn_url = cfg[1]
-                               print cfg
                        elif cfg[0] == 'ssl':
                                ssl = cfg[1]
        elif opt in ('-d', '--debug'):
@@ -174,7 +172,7 @@ OPTIONS are:\n\
 base_yarn_url = gen_url(base_yarn_url, 8088, 8090)
 base_mapred_url = gen_url(base_mapred_url, 19888, 19890)
 
-if debug >= 1:
+if debug >= 2:
        print '[MR] URL: ' + base_mapred_url
        print '[YARN] URL: ' + base_yarn_url
 
@@ -207,7 +205,7 @@ if id:
                        job = jobs[id]
                job.mapred = j1['job'];
                counter += 1
-       if debug >= 1: print '[MR] %d jobs' % counter
+       if debug >= 2: print '[MR] %d jobs' % counter
 
        counter = 0
        if j2 and j2['app']:
@@ -218,7 +216,7 @@ if id:
                        job = jobs[id]
                job.yarn = j2['app'];
                counter += 1
-       if debug >= 1: print '[YARN] %d jobs' % counter
+       if debug >= 2: print '[YARN] %d jobs' % counter
 else:
        mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs' + query
        yarn_url = base_yarn_url + '/ws/v1/cluster/apps' + query
@@ -237,7 +235,7 @@ else:
                                job = jobs[id]
                        job.mapred = j;
                        counter += 1
-       if debug >= 1: print '[MR] %d jobs' % counter
+       if debug >= 2: print '[MR] %d jobs' % counter
 
        counter = 0
        if j2["apps"]:
@@ -250,7 +248,7 @@ else:
                                job = jobs[id]
                        job.yarn = j;
                        counter += 1
-       if debug >= 1: print '[YARN] %d jobs' % counter
+       if debug >= 2: print '[YARN] %d jobs' % counter
 
 if db:
        db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
@@ -291,23 +289,26 @@ for id, job in jobs.iteritems():
                        job.queue = job.yarn['queue']
                if not job.start:
                        job.start = job.yarn['startedTime']
-                       if debug >= 1: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
+                       if debug >= 2: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
                if not job.finish:
                        job.finish = job.yarn['finishedTime']
-                       if debug >= 1: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
+                       if debug >= 2: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
+
+       if debug >= 1:
+               print 'job %s (%d):' % (id, counter)
+               print '  name: %s' % job.name
+               print '  status: %s' % job.status
+               print '  user: %s' % job.user
+               print '  queue: %s' % job.queue
 
-       print 'job %s (%d):' % (id, counter)
        counter += 1
-       print '  name: %s' % job.name
-       print '  status: %s' % job.status
-       print '  user: %s' % job.user
-       print '  queue: %s' % job.queue
-       if job.mapred:
+
+       if job.mapred and debug >= 1:
                print '  submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
                print '  elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
                print '  finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
 
-       if job.yarn:
+       if job.yarn and debug >= 1:
                print '  MB x s: %d' % job.yarn['memorySeconds']
                print '  CPU x s: %d' % job.yarn['vcoreSeconds']
 
@@ -329,29 +330,29 @@ for id, job in jobs.iteritems():
                data = st.fetchone()
                if data:
                        if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[QUEUE] == job.queue and data[START] == job.start and data[FINISH] == job.finish:
-                               if debug >= 2: print '[db] job %s found' % id
+                               if debug >= 3: print '[db] job %s found' % id
                        else:
                                st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, queue=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.queue, job.start, job.finish, id))
-                               if debug >= 2: print '[db] job %s updated' % id
+                               if debug >= 3: print '[db] job %s updated' % id
                                changed = 1
 
                else:
                        st.execute("INSERT INTO jobs (id, name, user, status, queue, start, finish) VALUES(%s, %s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.queue, job.start, job.finish))
-                       if debug >= 2: print '[db] job %s inserted' % id
+                       if debug >= 3: print '[db] job %s inserted' % id
                        changed = 1
                if job.mapred:
                        if data and data[SUBMIT] == job.mapred['submitTime'] and data[MAP] == job.mapred['mapsTotal'] and data[REDUCE] == job.mapred['reducesTotal']:
-                               if debug >= 2: print '[db] job %s mapred is actual' % id
+                               if debug >= 3: print '[db] job %s mapred is actual' % id
                        else:
                                st.execute("UPDATE jobs SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id))
-                               if debug >= 2: print '[db] job %s mapred updated' % id
+                               if debug >= 3: print '[db] job %s mapred updated' % id
                                changed = 1
                if job.yarn:
                        if data and data[MEMORY] == job.yarn['memorySeconds'] and data[CPU] == job.yarn['vcoreSeconds']:
-                               if debug >= 2: print '[db] job %s yarn is actual' % id
+                               if debug >= 3: print '[db] job %s yarn is actual' % id
                        else:
                                st.execute("UPDATE jobs SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
-                               if debug >= 2: print '[db] job %s yarn updated' % id
+                               if debug >= 3: print '[db] job %s yarn updated' % id
                                changed = 1
 
                # check for details in DB
@@ -394,10 +395,11 @@ for id, job in jobs.iteritems():
 
                aggregate=0
                for nodename, jobnode in jobnodes.iteritems():
-                       print '  node %s: %d' % (nodename, jobnode.elapsed)
+                       if debug >= 1: print '  node %s: %d' % (nodename, jobnode.elapsed)
                        aggregate += jobnode.elapsed
-               print '  subjobs: %d' % len(subjobs)
-               print '  ==> aggregated %d' % aggregate
+               if debug >= 1:
+                       print '  subjobs: %d' % len(subjobs)
+                       print '  ==> aggregated %d' % aggregate
 
        if jobnodes and db:
                st.execute("DELETE FROM jobnodes WHERE jobid=%s", id)
@@ -410,13 +412,13 @@ for id, job in jobs.iteritems():
                                node_hosts[nodename] = node
                                node_ids[node.id] = node
                        st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce))
-               if debug >= 2: print '[db] job %s nodes updated' % id
+               if debug >= 3: print '[db] job %s nodes updated' % id
 
                st.execute("DELETE FROM subjobs WHERE jobid=%s", id)
                for subjob in subjobs:
                        nodename = subjob['nodeHttpAddress']
                        st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime']))
-               if debug >= 2: print '[db] job %s subjobs updated' % id
+               if debug >= 3: print '[db] job %s subjobs updated' % id
 
                # better to update timestamp again explicitly on the end of the transaction
                st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id);
@@ -424,7 +426,7 @@ for id, job in jobs.iteritems():
        if db:
                db.commit()
 
-       print
+       if debug >= 1: print
 
 if db:
        db.close()