From: František Dvořák Date: Wed, 4 Mar 2015 20:37:52 +0000 (+0100) Subject: Fine tunging of verbosity. X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=3e0a74db61a1bf0cec200c65ffe55d2ceb07a0fe;p=hadoop-jobstat.git Fine tunging of verbosity. --- diff --git a/jobs.py b/jobs.py index 00b3988..5859749 100755 --- a/jobs.py +++ b/jobs.py @@ -60,7 +60,9 @@ def get_rest(base_url, url): j = json.loads(s) - if debug: + if debug >= 2: + print '# %s%s' % (base_url, url) + if debug >= 3: print json.dumps(j, indent=4) return j @@ -99,7 +101,7 @@ def dbstr(s): try: - opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'jobid=', 'mapred=', 'yarn=', 'ssl']) + opts, args = getopt.getopt(sys.argv[1:], 'hb:c:d:j:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug=', 'jobid=', 'mapred=', 'yarn=', 'ssl']) except getopt.GetoptError: print 'Args error' sys.exit(2) @@ -110,7 +112,7 @@ OPTIONS are:\n\ -h, --help ........ help message\n\ -b, --base ........ default hostname for YARN ans MapReduce\n\ -c, --config ...... config file\n\ - -d, --debug ....... debug output\n\ + -d, --debug LEVEL . debug output (1=progress, 2=more, 3=json dumps)\n\ --db .............. enable database\n\ --dbhost\n\ --dbname\n\ @@ -140,6 +142,8 @@ OPTIONS are:\n\ dbuser = cfg[1] elif cfg[0] == 'dbpassword': dbpassword = cfg[1] + elif cfg[0] == 'debug': + debug = int(cfg[1]) elif cfg[0] == 'mapred': base_mapred_url = cfg[1] print cfg @@ -149,7 +153,7 @@ OPTIONS are:\n\ elif cfg[0] == 'ssl': ssl = cfg[1] elif opt in ('-d', '--debug'): - debug=1 + debug = int(arg) elif opt in ('--db'): db = 1 elif opt in ('--dbhost'): @@ -175,6 +179,10 @@ OPTIONS are:\n\ base_yarn_url = gen_url(base_yarn_url, 8088, 8090) base_mapred_url = gen_url(base_mapred_url, 19888, 19890) +if debug >= 1: + print '[MR] URL: ' + base_mapred_url + print '[YARN] URL: ' + base_yarn_url + regJob = re.compile('^job_') regApp = re.compile('^application_') if id: @@ -183,8 +191,6 @@ if id: mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs/job_%s' % id yarn_url = base_yarn_url + '/ws/v1/cluster/apps/application_%s' % id - print '# ' + mapred_url - print '# ' + yarn_url try: j1 = get_rest(mapred_url, '') @@ -196,6 +202,7 @@ if id: except Exception: j2 = None + counter = 0 if j1 and j1['job']: if id not in jobs: job = Job() @@ -203,7 +210,10 @@ if id: else: job = jobs[id] job.mapred = j1['job']; + counter += 1 + if debug >= 1: print '[MR] %d jobs' % counter + counter = 0 if j2 and j2['app']: if id not in jobs: job = Job() @@ -211,17 +221,16 @@ if id: else: job = jobs[id] job.yarn = j2['app']; - - print jobs + counter += 1 + if debug >= 1: print '[YARN] %d jobs' % counter else: mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs' yarn_url = base_yarn_url + '/ws/v1/cluster/apps' - print '# ' + mapred_url - print '# ' + yarn_url j1 = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs') j2 = get_rest(base_yarn_url, '/ws/v1/cluster/apps') + counter = 0 if j1["jobs"]: for j in j1["jobs"]["job"]: id = regJob.sub('', j['id']) @@ -231,7 +240,10 @@ else: else: job = jobs[id] job.mapred = j; + counter += 1 + if debug >= 1: print '[MR] %d jobs' % counter + counter = 0 if j2["apps"]: for j in j2["apps"]["app"]: id = regApp.sub('', j['id']) @@ -241,6 +253,8 @@ else: else: job = jobs[id] job.yarn = j; + counter += 1 + if debug >= 1: print '[YARN] %d jobs' % counter if db: db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname) @@ -263,10 +277,10 @@ for id, job in jobs.iteritems(): job.user = job.yarn['user'] if not job.start: job.start = job.yarn['startedTime'] - if debug: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start) + if debug >= 1: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start) if not job.finish: job.finish = job.yarn['finishedTime'] - if debug: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish) + if debug >= 1: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish) print 'job %s (%d):' % (id, counter) counter += 1 @@ -315,31 +329,31 @@ for id, job in jobs.iteritems(): data = st.fetchone() if data: if data[1] == job.user and data[2] == job.status and data[4] == job.start and data[5] == job.finish: - if debug: print '[db] job %s found' % id + if debug >= 2: print '[db] job %s found' % id else: st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id)) - if debug: print '[db] job %s updated' % id + if debug >= 2: print '[db] job %s updated' % id else: st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', '%s', '%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish))) - if debug: print '[db] job %s inserted' % id + if debug >= 2: print '[db] job %s inserted' % id if job.mapred: if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']: - if debug: print '[db] job %s mapred is actual' % id + if debug >= 2: print '[db] job %s mapred is actual' % id else: st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id)) - if debug: print '[db] job %s mapred updated' % id + if debug >= 2: print '[db] job %s mapred updated' % id if job.yarn: if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']: - if debug: print '[db] job %s yarn is actual' % id + if debug >= 2: print '[db] job %s yarn is actual' % id else: st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id)) - if debug: print '[db] job %s yarn updated' % id + if debug >= 2: print '[db] job %s yarn updated' % id if nodes: st.execute("DELETE FROM node WHERE jobid='%s'" % id) for nodename, node in nodes.iteritems(): st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce)) - if debug: print '[db] job %s nodes updated' % id + if debug >= 2: print '[db] job %s nodes updated' % id db.commit() print