Get node details (mapred tasks and attempts) only when job is updated in DB or when...
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 4 Mar 2015 20:57:27 +0000 (21:57 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 4 Mar 2015 20:57:27 +0000 (21:57 +0100)
jobs.py

diff --git a/jobs.py b/jobs.py
index 5859749..e0c9c4c 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -32,7 +32,6 @@ dbuser = 'hadoop'
 dbpassword = ''
 debug=0
 ssl=0
-details=1
 host=socket.getfqdn()
 id = None
 
@@ -291,40 +290,12 @@ for id, job in jobs.iteritems():
                print '  elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
                print '  finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
 
-               if details:
-                       nodes = dict()
-                       t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
-                       if t['tasks']:
-                               aggregate=0
-                               for task in t['tasks']['task']:
-#      #                               print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime'])
-                                       aggregate += task['elapsedTime']
-                                       a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
-                                       if a['taskAttempts']:
-                                               for attempt in a['taskAttempts']['taskAttempt']:
-                                                       nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
-                                                       if nodeHost not in nodes:
-                                                               nodes[nodeHost] = Node()
-                                                       nodes[nodeHost].elapsed += attempt['elapsedTime']
-                                                       if attempt['type'] == 'MAP':
-                                                               nodes[nodeHost].map += 1
-                                                       elif attempt['type'] == 'REDUCE':
-                                                               nodes[nodeHost].reduce += 1
-                                                       else:
-                                                               raise Exception('unknown type %s' %  attempt['type'])
-#      #                       print 'tasks elapsed: %d' % aggregate
-
-                       aggregate=0
-                       for nodename, node in nodes.iteritems():
-                               print '  node %s: %d' % (nodename, node.elapsed)
-                               aggregate += node.elapsed
-                       print '  ==> aggregated %d' % aggregate
-
        if job.yarn:
                print '  MB x s: %d' % job.yarn['memorySeconds']
                print '  CPU x s: %d' % job.yarn['vcoreSeconds']
 
        if db:
+               changed = 0
                st.execute("SELECT id, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'" % id)
                data = st.fetchone()
                if data:
@@ -333,27 +304,64 @@ for id, job in jobs.iteritems():
                        else:
                                st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id))
                                if debug >= 2: print '[db] job %s updated' % id
+                               changed = 1
 
                else:
                        st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', '%s', '%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish)))
                        if debug >= 2: print '[db] job %s inserted' % id
+                       changed = 1
                if job.mapred:
                        if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']:
                                if debug >= 2: print '[db] job %s mapred is actual' % id
                        else:
                                st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id))
                                if debug >= 2: print '[db] job %s mapred updated' % id
+                               changed = 1
                if job.yarn:
                        if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']:
                                if debug >= 2: print '[db] job %s yarn is actual' % id
                        else:
                                st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
                                if debug >= 2: print '[db] job %s yarn updated' % id
-               if nodes:
-                       st.execute("DELETE FROM node WHERE jobid='%s'" % id)
-                       for nodename, node in nodes.iteritems():
-                               st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce))
-                       if debug >= 2: print '[db] job %s nodes updated' % id
+                               changed = 1
+
+       # get details (intensive!), if new job or any other difference
+       nodes = dict()
+       if job.mapred and (not db or changed):
+               t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
+               if t['tasks']:
+                       aggregate=0
+                       for task in t['tasks']['task']:
+#                              print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime'])
+                               aggregate += task['elapsedTime']
+                               a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
+                               if a['taskAttempts']:
+                                       for attempt in a['taskAttempts']['taskAttempt']:
+                                               nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
+                                               if nodeHost not in nodes:
+                                                       nodes[nodeHost] = Node()
+                                               nodes[nodeHost].elapsed += attempt['elapsedTime']
+                                               if attempt['type'] == 'MAP':
+                                                       nodes[nodeHost].map += 1
+                                               elif attempt['type'] == 'REDUCE':
+                                                       nodes[nodeHost].reduce += 1
+                                               else:
+                                                       raise Exception('unknown type %s' %  attempt['type'])
+#                      print 'tasks elapsed: %d' % aggregate
+
+               aggregate=0
+               for nodename, node in nodes.iteritems():
+                       print '  node %s: %d' % (nodename, node.elapsed)
+                       aggregate += node.elapsed
+               print '  ==> aggregated %d' % aggregate
+
+       if nodes and db:
+               st.execute("DELETE FROM node WHERE jobid='%s'" % id)
+               for nodename, node in nodes.iteritems():
+                       st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce))
+               if debug >= 2: print '[db] job %s nodes updated' % id
+
+       if db:
                db.commit()
 
        print