From c484e3cf1d55afb12b9d9db3fb4245ebe2406cfa Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Fri, 6 Mar 2015 15:26:22 +0100 Subject: [PATCH] Tune verbosity (print only errors by default). --- jobs.py | 66 +++++++++++++++++++++++++++++++++-------------------------------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/jobs.py b/jobs.py index b1f52fb..df2856b 100755 --- a/jobs.py +++ b/jobs.py @@ -67,9 +67,9 @@ def get_rest(base_url, url): j = json.loads(s) - if debug >= 2: - print '# %s%s' % (base_url, url) if debug >= 3: + print '# %s%s' % (base_url, url) + if debug >= 4: print json.dumps(j, indent=4) return j @@ -105,7 +105,7 @@ OPTIONS are:\n\ -h, --help ........ help message\n\ -b, --base ........ default hostname for YARN ans MapReduce\n\ -c, --config ...... config file\n\ - -d, --debug LEVEL . debug output (1=progress, 2=more, 3=json dumps)\n\ + -d, --debug LEVEL . debug output (2=progress, 3=trace, 4=json dumps)\n\ --db .............. enable database\n\ --dbhost\n\ --dbname\n\ @@ -139,10 +139,8 @@ OPTIONS are:\n\ debug = int(cfg[1]) elif cfg[0] == 'mapred': base_mapred_url = cfg[1] - print cfg elif cfg[0] == 'yarn': base_yarn_url = cfg[1] - print cfg elif cfg[0] == 'ssl': ssl = cfg[1] elif opt in ('-d', '--debug'): @@ -174,7 +172,7 @@ OPTIONS are:\n\ base_yarn_url = gen_url(base_yarn_url, 8088, 8090) base_mapred_url = gen_url(base_mapred_url, 19888, 19890) -if debug >= 1: +if debug >= 2: print '[MR] URL: ' + base_mapred_url print '[YARN] URL: ' + base_yarn_url @@ -207,7 +205,7 @@ if id: job = jobs[id] job.mapred = j1['job']; counter += 1 - if debug >= 1: print '[MR] %d jobs' % counter + if debug >= 2: print '[MR] %d jobs' % counter counter = 0 if j2 and j2['app']: @@ -218,7 +216,7 @@ if id: job = jobs[id] job.yarn = j2['app']; counter += 1 - if debug >= 1: print '[YARN] %d jobs' % counter + if debug >= 2: print '[YARN] %d jobs' % counter else: mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs' + query yarn_url = base_yarn_url + '/ws/v1/cluster/apps' + query @@ -237,7 +235,7 @@ else: job = jobs[id] job.mapred = j; counter += 1 - if debug >= 1: print '[MR] %d jobs' % counter + if debug >= 2: print '[MR] %d jobs' % counter counter = 0 if j2["apps"]: @@ -250,7 +248,7 @@ else: job = jobs[id] job.yarn = j; counter += 1 - if debug >= 1: print '[YARN] %d jobs' % counter + if debug >= 2: print '[YARN] %d jobs' % counter if db: db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname) @@ -291,23 +289,26 @@ for id, job in jobs.iteritems(): job.queue = job.yarn['queue'] if not job.start: job.start = job.yarn['startedTime'] - if debug >= 1: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start) + if debug >= 2: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start) if not job.finish: job.finish = job.yarn['finishedTime'] - if debug >= 1: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish) + if debug >= 2: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish) + + if debug >= 1: + print 'job %s (%d):' % (id, counter) + print ' name: %s' % job.name + print ' status: %s' % job.status + print ' user: %s' % job.user + print ' queue: %s' % job.queue - print 'job %s (%d):' % (id, counter) counter += 1 - print ' name: %s' % job.name - print ' status: %s' % job.status - print ' user: %s' % job.user - print ' queue: %s' % job.queue - if job.mapred: + + if job.mapred and debug >= 1: print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime']) print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0) print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0) - if job.yarn: + if job.yarn and debug >= 1: print ' MB x s: %d' % job.yarn['memorySeconds'] print ' CPU x s: %d' % job.yarn['vcoreSeconds'] @@ -329,29 +330,29 @@ for id, job in jobs.iteritems(): data = st.fetchone() if data: if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[QUEUE] == job.queue and data[START] == job.start and data[FINISH] == job.finish: - if debug >= 2: print '[db] job %s found' % id + if debug >= 3: print '[db] job %s found' % id else: st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, queue=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.queue, job.start, job.finish, id)) - if debug >= 2: print '[db] job %s updated' % id + if debug >= 3: print '[db] job %s updated' % id changed = 1 else: st.execute("INSERT INTO jobs (id, name, user, status, queue, start, finish) VALUES(%s, %s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.queue, job.start, job.finish)) - if debug >= 2: print '[db] job %s inserted' % id + if debug >= 3: print '[db] job %s inserted' % id changed = 1 if job.mapred: if data and data[SUBMIT] == job.mapred['submitTime'] and data[MAP] == job.mapred['mapsTotal'] and data[REDUCE] == job.mapred['reducesTotal']: - if debug >= 2: print '[db] job %s mapred is actual' % id + if debug >= 3: print '[db] job %s mapred is actual' % id else: st.execute("UPDATE jobs SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id)) - if debug >= 2: print '[db] job %s mapred updated' % id + if debug >= 3: print '[db] job %s mapred updated' % id changed = 1 if job.yarn: if data and data[MEMORY] == job.yarn['memorySeconds'] and data[CPU] == job.yarn['vcoreSeconds']: - if debug >= 2: print '[db] job %s yarn is actual' % id + if debug >= 3: print '[db] job %s yarn is actual' % id else: st.execute("UPDATE jobs SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id)) - if debug >= 2: print '[db] job %s yarn updated' % id + if debug >= 3: print '[db] job %s yarn updated' % id changed = 1 # check for details in DB @@ -394,10 +395,11 @@ for id, job in jobs.iteritems(): aggregate=0 for nodename, jobnode in jobnodes.iteritems(): - print ' node %s: %d' % (nodename, jobnode.elapsed) + if debug >= 1: print ' node %s: %d' % (nodename, jobnode.elapsed) aggregate += jobnode.elapsed - print ' subjobs: %d' % len(subjobs) - print ' ==> aggregated %d' % aggregate + if debug >= 1: + print ' subjobs: %d' % len(subjobs) + print ' ==> aggregated %d' % aggregate if jobnodes and db: st.execute("DELETE FROM jobnodes WHERE jobid=%s", id) @@ -410,13 +412,13 @@ for id, job in jobs.iteritems(): node_hosts[nodename] = node node_ids[node.id] = node st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce)) - if debug >= 2: print '[db] job %s nodes updated' % id + if debug >= 3: print '[db] job %s nodes updated' % id st.execute("DELETE FROM subjobs WHERE jobid=%s", id) for subjob in subjobs: nodename = subjob['nodeHttpAddress'] st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime'])) - if debug >= 2: print '[db] job %s subjobs updated' % id + if debug >= 3: print '[db] job %s subjobs updated' % id # better to update timestamp again explicitly on the end of the transaction st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id); @@ -424,7 +426,7 @@ for id, job in jobs.iteritems(): if db: db.commit() - print + if debug >= 1: print if db: db.close() -- 1.8.2.3