regJob = re.compile('^job_')
regApp = re.compile('^application_')
+regAtt = re.compile('^attempt_')
if id:
if regJob.match(id): id = regJob.sub('', id)
if regApp.match(id): id = regApp.sub('', id)
# check for details in DB
st.execute('SELECT * FROM jobnodes WHERE jobid=%s', id)
data = st.fetchone()
+ if data:
+ st.execute('SELECT * FROM subjobs WHERE jobid=%s', id)
+ data = st.fetchone()
if not data:
changed = 1
# get details (intensive!), if new job or any other difference
jobnodes = dict()
+ subjobs = list()
if job.mapred and (not db or changed):
t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
if t['tasks']:
a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
if a['taskAttempts']:
for attempt in a['taskAttempts']['taskAttempt']:
+ if regAtt.match(attempt['id']): attempt['id'] = regAtt.sub('', attempt['id'])
+
nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
if nodeHost not in jobnodes:
jobnodes[nodeHost] = JobNode()
else:
raise Exception('unknown type %s' % attempt['type'])
# print 'tasks elapsed: %d' % aggregate
+ subjobs.append(attempt)
aggregate=0
for nodename, jobnode in jobnodes.iteritems():
print ' node %s: %d' % (nodename, jobnode.elapsed)
aggregate += jobnode.elapsed
+ print ' subjobs: %d' % len(subjobs)
print ' ==> aggregated %d' % aggregate
if jobnodes and db:
node_ids[node.id] = node
st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce))
if debug >= 2: print '[db] job %s nodes updated' % id
- # better to update timestamp exlicitly on the end of the transaction
+
+ st.execute("DELETE FROM subjobs WHERE jobid=%s", id)
+ for subjob in subjobs:
+ st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime']))
+ if debug >= 2: print '[db] job %s subjobs updated' % id
+
+ # better to update timestamp again explicitly on the end of the transaction
st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id);
if db: