From: František Dvořák Date: Wed, 4 Mar 2015 20:57:27 +0000 (+0100) Subject: Get node details (mapred tasks and attempts) only when job is updated in DB or when... X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=94212a38610aa15c617b54073ed1eac888301cae;p=hadoop-jobstat.git Get node details (mapred tasks and attempts) only when job is updated in DB or when printing. --- diff --git a/jobs.py b/jobs.py index 5859749..e0c9c4c 100755 --- a/jobs.py +++ b/jobs.py @@ -32,7 +32,6 @@ dbuser = 'hadoop' dbpassword = '' debug=0 ssl=0 -details=1 host=socket.getfqdn() id = None @@ -291,40 +290,12 @@ for id, job in jobs.iteritems(): print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0) print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0) - if details: - nodes = dict() - t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id) - if t['tasks']: - aggregate=0 - for task in t['tasks']['task']: -# # print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime']) - aggregate += task['elapsedTime'] - a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id'])) - if a['taskAttempts']: - for attempt in a['taskAttempts']['taskAttempt']: - nodeHost = regHost.sub('', attempt['nodeHttpAddress']) - if nodeHost not in nodes: - nodes[nodeHost] = Node() - nodes[nodeHost].elapsed += attempt['elapsedTime'] - if attempt['type'] == 'MAP': - nodes[nodeHost].map += 1 - elif attempt['type'] == 'REDUCE': - nodes[nodeHost].reduce += 1 - else: - raise Exception('unknown type %s' % attempt['type']) -# # print 'tasks elapsed: %d' % aggregate - - aggregate=0 - for nodename, node in nodes.iteritems(): - print ' node %s: %d' % (nodename, node.elapsed) - aggregate += node.elapsed - print ' ==> aggregated %d' % aggregate - if job.yarn: print ' MB x s: %d' % job.yarn['memorySeconds'] print ' CPU x s: %d' % job.yarn['vcoreSeconds'] if db: + changed = 0 st.execute("SELECT id, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'" % id) data = st.fetchone() if data: @@ -333,27 +304,64 @@ for id, job in jobs.iteritems(): else: st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id)) if debug >= 2: print '[db] job %s updated' % id + changed = 1 else: st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', '%s', '%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish))) if debug >= 2: print '[db] job %s inserted' % id + changed = 1 if job.mapred: if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']: if debug >= 2: print '[db] job %s mapred is actual' % id else: st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id)) if debug >= 2: print '[db] job %s mapred updated' % id + changed = 1 if job.yarn: if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']: if debug >= 2: print '[db] job %s yarn is actual' % id else: st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id)) if debug >= 2: print '[db] job %s yarn updated' % id - if nodes: - st.execute("DELETE FROM node WHERE jobid='%s'" % id) - for nodename, node in nodes.iteritems(): - st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce)) - if debug >= 2: print '[db] job %s nodes updated' % id + changed = 1 + + # get details (intensive!), if new job or any other difference + nodes = dict() + if job.mapred and (not db or changed): + t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id) + if t['tasks']: + aggregate=0 + for task in t['tasks']['task']: +# print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime']) + aggregate += task['elapsedTime'] + a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id'])) + if a['taskAttempts']: + for attempt in a['taskAttempts']['taskAttempt']: + nodeHost = regHost.sub('', attempt['nodeHttpAddress']) + if nodeHost not in nodes: + nodes[nodeHost] = Node() + nodes[nodeHost].elapsed += attempt['elapsedTime'] + if attempt['type'] == 'MAP': + nodes[nodeHost].map += 1 + elif attempt['type'] == 'REDUCE': + nodes[nodeHost].reduce += 1 + else: + raise Exception('unknown type %s' % attempt['type']) +# print 'tasks elapsed: %d' % aggregate + + aggregate=0 + for nodename, node in nodes.iteritems(): + print ' node %s: %d' % (nodename, node.elapsed) + aggregate += node.elapsed + print ' ==> aggregated %d' % aggregate + + if nodes and db: + st.execute("DELETE FROM node WHERE jobid='%s'" % id) + for nodename, node in nodes.iteritems(): + st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce)) + if debug >= 2: print '[db] job %s nodes updated' % id + + if db: db.commit() print