Add more detailed subjobs table.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 12:14:37 +0000 (13:14 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 12:14:37 +0000 (13:14 +0100)
create.sql
jobs.py

index 7ac4456..a1f9bcc 100644 (file)
@@ -18,6 +18,20 @@ CREATE TABLE jobs (
 );
 
 
+CREATE TABLE subjobs (
+       id CHAR(80) PRIMARY KEY,
+       jobid CHAR(80),
+       nodeid INTEGER,
+       state CHAR(20),
+       type CHAR(20),
+       start BIGINT,
+       finish BIGINT,
+
+       INDEX(id),
+       INDEX(jobid)
+);
+
+
 CREATE TABLE jobnodes (
        jobid CHAR(80) NOT NULL,
        nodeid INTEGER,
diff --git a/jobs.py b/jobs.py
index b49549d..cfeb74f 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -176,6 +176,7 @@ if debug >= 1:
 
 regJob = re.compile('^job_')
 regApp = re.compile('^application_')
+regAtt = re.compile('^attempt_')
 if id:
        if regJob.match(id): id = regJob.sub('', id)
        if regApp.match(id): id = regApp.sub('', id)
@@ -347,11 +348,15 @@ for id, job in jobs.iteritems():
                # check for details in DB
                st.execute('SELECT * FROM jobnodes WHERE jobid=%s', id)
                data = st.fetchone()
+               if data:
+                       st.execute('SELECT * FROM subjobs WHERE jobid=%s', id)
+                       data = st.fetchone()
                if not data:
                        changed = 1
 
        # get details (intensive!), if new job or any other difference
        jobnodes = dict()
+       subjobs = list()
        if job.mapred and (not db or changed):
                t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
                if t['tasks']:
@@ -362,6 +367,8 @@ for id, job in jobs.iteritems():
                                a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
                                if a['taskAttempts']:
                                        for attempt in a['taskAttempts']['taskAttempt']:
+                                               if regAtt.match(attempt['id']): attempt['id'] = regAtt.sub('', attempt['id'])
+
                                                nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
                                                if nodeHost not in jobnodes:
                                                        jobnodes[nodeHost] = JobNode()
@@ -373,11 +380,13 @@ for id, job in jobs.iteritems():
                                                else:
                                                        raise Exception('unknown type %s' %  attempt['type'])
 #                      print 'tasks elapsed: %d' % aggregate
+                                       subjobs.append(attempt)
 
                aggregate=0
                for nodename, jobnode in jobnodes.iteritems():
                        print '  node %s: %d' % (nodename, jobnode.elapsed)
                        aggregate += jobnode.elapsed
+               print '  subjobs: %d' % len(subjobs)
                print '  ==> aggregated %d' % aggregate
 
        if jobnodes and db:
@@ -392,7 +401,13 @@ for id, job in jobs.iteritems():
                                node_ids[node.id] = node
                        st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce))
                if debug >= 2: print '[db] job %s nodes updated' % id
-               # better to update timestamp exlicitly on the end of the transaction
+
+               st.execute("DELETE FROM subjobs WHERE jobid=%s", id)
+               for subjob in subjobs:
+                       st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime']))
+               if debug >= 2: print '[db] job %s subjobs updated' % id
+
+               # better to update timestamp again explicitly on the end of the transaction
                st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id);
 
        if db: