From c6607f361de8fbe26a82fd34b8a8191656a8d02d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Thu, 5 Mar 2015 13:14:37 +0100 Subject: [PATCH] Add more detailed subjobs table. --- create.sql | 14 ++++++++++++++ jobs.py | 17 ++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/create.sql b/create.sql index 7ac4456..a1f9bcc 100644 --- a/create.sql +++ b/create.sql @@ -18,6 +18,20 @@ CREATE TABLE jobs ( ); +CREATE TABLE subjobs ( + id CHAR(80) PRIMARY KEY, + jobid CHAR(80), + nodeid INTEGER, + state CHAR(20), + type CHAR(20), + start BIGINT, + finish BIGINT, + + INDEX(id), + INDEX(jobid) +); + + CREATE TABLE jobnodes ( jobid CHAR(80) NOT NULL, nodeid INTEGER, diff --git a/jobs.py b/jobs.py index b49549d..cfeb74f 100755 --- a/jobs.py +++ b/jobs.py @@ -176,6 +176,7 @@ if debug >= 1: regJob = re.compile('^job_') regApp = re.compile('^application_') +regAtt = re.compile('^attempt_') if id: if regJob.match(id): id = regJob.sub('', id) if regApp.match(id): id = regApp.sub('', id) @@ -347,11 +348,15 @@ for id, job in jobs.iteritems(): # check for details in DB st.execute('SELECT * FROM jobnodes WHERE jobid=%s', id) data = st.fetchone() + if data: + st.execute('SELECT * FROM subjobs WHERE jobid=%s', id) + data = st.fetchone() if not data: changed = 1 # get details (intensive!), if new job or any other difference jobnodes = dict() + subjobs = list() if job.mapred and (not db or changed): t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id) if t['tasks']: @@ -362,6 +367,8 @@ for id, job in jobs.iteritems(): a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id'])) if a['taskAttempts']: for attempt in a['taskAttempts']['taskAttempt']: + if regAtt.match(attempt['id']): attempt['id'] = regAtt.sub('', attempt['id']) + nodeHost = regHost.sub('', attempt['nodeHttpAddress']) if nodeHost not in jobnodes: jobnodes[nodeHost] = JobNode() @@ -373,11 +380,13 @@ for id, job in jobs.iteritems(): else: raise Exception('unknown type %s' % attempt['type']) # print 'tasks elapsed: %d' % aggregate + subjobs.append(attempt) aggregate=0 for nodename, jobnode in jobnodes.iteritems(): print ' node %s: %d' % (nodename, jobnode.elapsed) aggregate += jobnode.elapsed + print ' subjobs: %d' % len(subjobs) print ' ==> aggregated %d' % aggregate if jobnodes and db: @@ -392,7 +401,13 @@ for id, job in jobs.iteritems(): node_ids[node.id] = node st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce)) if debug >= 2: print '[db] job %s nodes updated' % id - # better to update timestamp exlicitly on the end of the transaction + + st.execute("DELETE FROM subjobs WHERE jobid=%s", id) + for subjob in subjobs: + st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime'])) + if debug >= 2: print '[db] job %s subjobs updated' % id + + # better to update timestamp again explicitly on the end of the transaction st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id); if db: -- 1.8.2.3