j = json.loads(s)
- if debug >= 2:
- print '# %s%s' % (base_url, url)
if debug >= 3:
+ print '# %s%s' % (base_url, url)
+ if debug >= 4:
print json.dumps(j, indent=4)
return j
-h, --help ........ help message\n\
-b, --base ........ default hostname for YARN ans MapReduce\n\
-c, --config ...... config file\n\
- -d, --debug LEVEL . debug output (1=progress, 2=more, 3=json dumps)\n\
+ -d, --debug LEVEL . debug output (2=progress, 3=trace, 4=json dumps)\n\
--db .............. enable database\n\
--dbhost\n\
--dbname\n\
debug = int(cfg[1])
elif cfg[0] == 'mapred':
base_mapred_url = cfg[1]
- print cfg
elif cfg[0] == 'yarn':
base_yarn_url = cfg[1]
- print cfg
elif cfg[0] == 'ssl':
ssl = cfg[1]
elif opt in ('-d', '--debug'):
base_yarn_url = gen_url(base_yarn_url, 8088, 8090)
base_mapred_url = gen_url(base_mapred_url, 19888, 19890)
-if debug >= 1:
+if debug >= 2:
print '[MR] URL: ' + base_mapred_url
print '[YARN] URL: ' + base_yarn_url
job = jobs[id]
job.mapred = j1['job'];
counter += 1
- if debug >= 1: print '[MR] %d jobs' % counter
+ if debug >= 2: print '[MR] %d jobs' % counter
counter = 0
if j2 and j2['app']:
job = jobs[id]
job.yarn = j2['app'];
counter += 1
- if debug >= 1: print '[YARN] %d jobs' % counter
+ if debug >= 2: print '[YARN] %d jobs' % counter
else:
mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs' + query
yarn_url = base_yarn_url + '/ws/v1/cluster/apps' + query
job = jobs[id]
job.mapred = j;
counter += 1
- if debug >= 1: print '[MR] %d jobs' % counter
+ if debug >= 2: print '[MR] %d jobs' % counter
counter = 0
if j2["apps"]:
job = jobs[id]
job.yarn = j;
counter += 1
- if debug >= 1: print '[YARN] %d jobs' % counter
+ if debug >= 2: print '[YARN] %d jobs' % counter
if db:
db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
job.queue = job.yarn['queue']
if not job.start:
job.start = job.yarn['startedTime']
- if debug >= 1: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
+ if debug >= 2: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
if not job.finish:
job.finish = job.yarn['finishedTime']
- if debug >= 1: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
+ if debug >= 2: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
+
+ if debug >= 1:
+ print 'job %s (%d):' % (id, counter)
+ print ' name: %s' % job.name
+ print ' status: %s' % job.status
+ print ' user: %s' % job.user
+ print ' queue: %s' % job.queue
- print 'job %s (%d):' % (id, counter)
counter += 1
- print ' name: %s' % job.name
- print ' status: %s' % job.status
- print ' user: %s' % job.user
- print ' queue: %s' % job.queue
- if job.mapred:
+
+ if job.mapred and debug >= 1:
print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
- if job.yarn:
+ if job.yarn and debug >= 1:
print ' MB x s: %d' % job.yarn['memorySeconds']
print ' CPU x s: %d' % job.yarn['vcoreSeconds']
data = st.fetchone()
if data:
if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[QUEUE] == job.queue and data[START] == job.start and data[FINISH] == job.finish:
- if debug >= 2: print '[db] job %s found' % id
+ if debug >= 3: print '[db] job %s found' % id
else:
st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, queue=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.queue, job.start, job.finish, id))
- if debug >= 2: print '[db] job %s updated' % id
+ if debug >= 3: print '[db] job %s updated' % id
changed = 1
else:
st.execute("INSERT INTO jobs (id, name, user, status, queue, start, finish) VALUES(%s, %s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.queue, job.start, job.finish))
- if debug >= 2: print '[db] job %s inserted' % id
+ if debug >= 3: print '[db] job %s inserted' % id
changed = 1
if job.mapred:
if data and data[SUBMIT] == job.mapred['submitTime'] and data[MAP] == job.mapred['mapsTotal'] and data[REDUCE] == job.mapred['reducesTotal']:
- if debug >= 2: print '[db] job %s mapred is actual' % id
+ if debug >= 3: print '[db] job %s mapred is actual' % id
else:
st.execute("UPDATE jobs SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id))
- if debug >= 2: print '[db] job %s mapred updated' % id
+ if debug >= 3: print '[db] job %s mapred updated' % id
changed = 1
if job.yarn:
if data and data[MEMORY] == job.yarn['memorySeconds'] and data[CPU] == job.yarn['vcoreSeconds']:
- if debug >= 2: print '[db] job %s yarn is actual' % id
+ if debug >= 3: print '[db] job %s yarn is actual' % id
else:
st.execute("UPDATE jobs SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
- if debug >= 2: print '[db] job %s yarn updated' % id
+ if debug >= 3: print '[db] job %s yarn updated' % id
changed = 1
# check for details in DB
aggregate=0
for nodename, jobnode in jobnodes.iteritems():
- print ' node %s: %d' % (nodename, jobnode.elapsed)
+ if debug >= 1: print ' node %s: %d' % (nodename, jobnode.elapsed)
aggregate += jobnode.elapsed
- print ' subjobs: %d' % len(subjobs)
- print ' ==> aggregated %d' % aggregate
+ if debug >= 1:
+ print ' subjobs: %d' % len(subjobs)
+ print ' ==> aggregated %d' % aggregate
if jobnodes and db:
st.execute("DELETE FROM jobnodes WHERE jobid=%s", id)
node_hosts[nodename] = node
node_ids[node.id] = node
st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce))
- if debug >= 2: print '[db] job %s nodes updated' % id
+ if debug >= 3: print '[db] job %s nodes updated' % id
st.execute("DELETE FROM subjobs WHERE jobid=%s", id)
for subjob in subjobs:
nodename = subjob['nodeHttpAddress']
st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime']))
- if debug >= 2: print '[db] job %s subjobs updated' % id
+ if debug >= 3: print '[db] job %s subjobs updated' % id
# better to update timestamp again explicitly on the end of the transaction
st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id);
if db:
db.commit()
- print
+ if debug >= 1: print
if db:
db.close()