Move node hostnames to separated table.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 11:42:14 +0000 (12:42 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 11:42:14 +0000 (12:42 +0100)
create.sql
jobs.py

index 5510c47..7ac4456 100644 (file)
@@ -1,4 +1,4 @@
-CREATE TABLE job (
+CREATE TABLE jobs (
        id CHAR(80) PRIMARY KEY,
        name CHAR(128),
        user CHAR(20),
@@ -18,9 +18,9 @@ CREATE TABLE job (
 );
 
 
-CREATE TABLE node (
+CREATE TABLE jobnodes (
        jobid CHAR(80) NOT NULL,
-       host VARCHAR(256),
+       nodeid INTEGER,
        elapsed INTEGER,
        map INTEGER,
        reduce INTEGER,
@@ -28,14 +28,21 @@ CREATE TABLE node (
        INDEX (jobid)
 );
 
+CREATE TABLE nodes (
+       id INTEGER PRIMARY KEY AUTO_INCREMENT,
+       host VARCHAR(256) UNIQUE,
+
+       INDEX(host)
+);
+
 DELIMITER //
 
-CREATE TRIGGER bi_measure BEFORE INSERT ON job
+CREATE TRIGGER bi_measure BEFORE INSERT ON jobs
 FOR EACH ROW BEGIN
   SET NEW.changed = NOW();
 END; //
 
-CREATE TRIGGER bu_measure BEFORE UPDATE ON job
+CREATE TRIGGER bu_measure BEFORE UPDATE ON jobs
 FOR EACH ROW BEGIN
   SET NEW.changed = NOW();
 END; //
diff --git a/jobs.py b/jobs.py
index 725c7ed..c77a3f7 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -19,11 +19,14 @@ class Job:
        start = None
        finish = None
 
-class Node:
+class JobNode:
        elapsed = 0
        map = 0
        reduce = 0
 
+class Node:
+       host = None
+
 base_mapred_url=''
 base_yarn_url=''
 db = None
@@ -36,6 +39,8 @@ ssl=0
 host=socket.getfqdn()
 id = None
 
+node_hosts = dict()
+node_ids = dict()
 jobs = dict()
 
 c = pycurl.Curl()
@@ -246,6 +251,18 @@ if db:
        db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
        st = db.cursor()
 
+       data = st.execute('SELECT id, host FROM nodes')
+       while 1:
+               data = st.fetchone()
+               if data:
+                       node = Node()
+                       node.id = data[0]
+                       node.host = data[1]
+                       node_ids[node.id] = node
+                       node_hosts[node.host] = node
+               else:
+                       break
+
 regHost = re.compile(':\d+')
 counter=1
 for id, job in jobs.iteritems():
@@ -298,37 +315,37 @@ for id, job in jobs.iteritems():
                CPU=8
                MAP=9
                REDUCE=10
-               st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id=%s", id)
+               st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id)
                data = st.fetchone()
                if data:
                        if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[START] == job.start and data[FINISH] == job.finish:
                                if debug >= 2: print '[db] job %s found' % id
                        else:
-                               st.execute("UPDATE job SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id))
+                               st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id))
                                if debug >= 2: print '[db] job %s updated' % id
                                changed = 1
 
                else:
-                       st.execute("INSERT INTO job (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish))
+                       st.execute("INSERT INTO jobs (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish))
                        if debug >= 2: print '[db] job %s inserted' % id
                        changed = 1
                if job.mapred:
                        if data and data[SUBMIT] == job.mapred['submitTime'] and data[MAP] == job.mapred['mapsTotal'] and data[REDUCE] == job.mapred['reducesTotal']:
                                if debug >= 2: print '[db] job %s mapred is actual' % id
                        else:
-                               st.execute("UPDATE job SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id))
+                               st.execute("UPDATE jobs SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id))
                                if debug >= 2: print '[db] job %s mapred updated' % id
                                changed = 1
                if job.yarn:
                        if data and data[MEMORY] == job.yarn['memorySeconds'] and data[CPU] == job.yarn['vcoreSeconds']:
                                if debug >= 2: print '[db] job %s yarn is actual' % id
                        else:
-                               st.execute("UPDATE job SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
+                               st.execute("UPDATE jobs SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
                                if debug >= 2: print '[db] job %s yarn updated' % id
                                changed = 1
 
        # get details (intensive!), if new job or any other difference
-       nodes = dict()
+       jobnodes = dict()
        if job.mapred and (not db or changed):
                t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
                if t['tasks']:
@@ -340,30 +357,37 @@ for id, job in jobs.iteritems():
                                if a['taskAttempts']:
                                        for attempt in a['taskAttempts']['taskAttempt']:
                                                nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
-                                               if nodeHost not in nodes:
-                                                       nodes[nodeHost] = Node()
-                                               nodes[nodeHost].elapsed += attempt['elapsedTime']
+                                               if nodeHost not in jobnodes:
+                                                       jobnodes[nodeHost] = JobNode()
+                                               jobnodes[nodeHost].elapsed += attempt['elapsedTime']
                                                if attempt['type'] == 'MAP':
-                                                       nodes[nodeHost].map += 1
+                                                       jobnodes[nodeHost].map += 1
                                                elif attempt['type'] == 'REDUCE':
-                                                       nodes[nodeHost].reduce += 1
+                                                       jobnodes[nodeHost].reduce += 1
                                                else:
                                                        raise Exception('unknown type %s' %  attempt['type'])
 #                      print 'tasks elapsed: %d' % aggregate
 
                aggregate=0
-               for nodename, node in nodes.iteritems():
-                       print '  node %s: %d' % (nodename, node.elapsed)
-                       aggregate += node.elapsed
+               for nodename, jobnode in jobnodes.iteritems():
+                       print '  node %s: %d' % (nodename, jobnode.elapsed)
+                       aggregate += jobnode.elapsed
                print '  ==> aggregated %d' % aggregate
 
-       if nodes and db:
-               st.execute("DELETE FROM node WHERE jobid=%s", id)
-               for nodename, node in nodes.iteritems():
-                       st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, nodename, node.elapsed, node.map, node.reduce))
+       if jobnodes and db:
+               st.execute("DELETE FROM jobnodes WHERE jobid=%s", id)
+               for nodename, jobnode in jobnodes.iteritems():
+                       if not nodename in node_hosts.keys():
+                               st.execute('INSERT INTO nodes (host) VALUES (%s)', nodename)
+                               node = Node()
+                               node.id = db.insert_id()
+                               node.host = nodename
+                               node_hosts[nodename] = node
+                               node_ids[node.id] = node
+                       st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce))
                if debug >= 2: print '[db] job %s nodes updated' % id
                # better to update timestamp exlicitly on the end of the transaction
-               st.execute('UPDATE job SET changed=NOW() WHERE id=%s', id);
+               st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id);
 
        if db:
                db.commit()