dbpassword = ''
debug=0
ssl=0
-details=1
host=socket.getfqdn()
id = None
print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
- if details:
- nodes = dict()
- t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
- if t['tasks']:
- aggregate=0
- for task in t['tasks']['task']:
-# # print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime'])
- aggregate += task['elapsedTime']
- a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
- if a['taskAttempts']:
- for attempt in a['taskAttempts']['taskAttempt']:
- nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
- if nodeHost not in nodes:
- nodes[nodeHost] = Node()
- nodes[nodeHost].elapsed += attempt['elapsedTime']
- if attempt['type'] == 'MAP':
- nodes[nodeHost].map += 1
- elif attempt['type'] == 'REDUCE':
- nodes[nodeHost].reduce += 1
- else:
- raise Exception('unknown type %s' % attempt['type'])
-# # print 'tasks elapsed: %d' % aggregate
-
- aggregate=0
- for nodename, node in nodes.iteritems():
- print ' node %s: %d' % (nodename, node.elapsed)
- aggregate += node.elapsed
- print ' ==> aggregated %d' % aggregate
-
if job.yarn:
print ' MB x s: %d' % job.yarn['memorySeconds']
print ' CPU x s: %d' % job.yarn['vcoreSeconds']
if db:
+ changed = 0
st.execute("SELECT id, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'" % id)
data = st.fetchone()
if data:
else:
st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id))
if debug >= 2: print '[db] job %s updated' % id
+ changed = 1
else:
st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', '%s', '%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish)))
if debug >= 2: print '[db] job %s inserted' % id
+ changed = 1
if job.mapred:
if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']:
if debug >= 2: print '[db] job %s mapred is actual' % id
else:
st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id))
if debug >= 2: print '[db] job %s mapred updated' % id
+ changed = 1
if job.yarn:
if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']:
if debug >= 2: print '[db] job %s yarn is actual' % id
else:
st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
if debug >= 2: print '[db] job %s yarn updated' % id
- if nodes:
- st.execute("DELETE FROM node WHERE jobid='%s'" % id)
- for nodename, node in nodes.iteritems():
- st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce))
- if debug >= 2: print '[db] job %s nodes updated' % id
+ changed = 1
+
+ # get details (intensive!), if new job or any other difference
+ nodes = dict()
+ if job.mapred and (not db or changed):
+ t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
+ if t['tasks']:
+ aggregate=0
+ for task in t['tasks']['task']:
+# print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime'])
+ aggregate += task['elapsedTime']
+ a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
+ if a['taskAttempts']:
+ for attempt in a['taskAttempts']['taskAttempt']:
+ nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
+ if nodeHost not in nodes:
+ nodes[nodeHost] = Node()
+ nodes[nodeHost].elapsed += attempt['elapsedTime']
+ if attempt['type'] == 'MAP':
+ nodes[nodeHost].map += 1
+ elif attempt['type'] == 'REDUCE':
+ nodes[nodeHost].reduce += 1
+ else:
+ raise Exception('unknown type %s' % attempt['type'])
+# print 'tasks elapsed: %d' % aggregate
+
+ aggregate=0
+ for nodename, node in nodes.iteritems():
+ print ' node %s: %d' % (nodename, node.elapsed)
+ aggregate += node.elapsed
+ print ' ==> aggregated %d' % aggregate
+
+ if nodes and db:
+ st.execute("DELETE FROM node WHERE jobid='%s'" % id)
+ for nodename, node in nodes.iteritems():
+ st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce))
+ if debug >= 2: print '[db] job %s nodes updated' % id
+
+ if db:
db.commit()
print