From: František Dvořák Date: Thu, 5 Mar 2015 11:42:14 +0000 (+0100) Subject: Move node hostnames to separated table. X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=ef970b1addd24d0038377440d67e9ab132f0cc8f;p=hadoop-jobstat.git Move node hostnames to separated table. --- diff --git a/create.sql b/create.sql index 5510c47..7ac4456 100644 --- a/create.sql +++ b/create.sql @@ -1,4 +1,4 @@ -CREATE TABLE job ( +CREATE TABLE jobs ( id CHAR(80) PRIMARY KEY, name CHAR(128), user CHAR(20), @@ -18,9 +18,9 @@ CREATE TABLE job ( ); -CREATE TABLE node ( +CREATE TABLE jobnodes ( jobid CHAR(80) NOT NULL, - host VARCHAR(256), + nodeid INTEGER, elapsed INTEGER, map INTEGER, reduce INTEGER, @@ -28,14 +28,21 @@ CREATE TABLE node ( INDEX (jobid) ); +CREATE TABLE nodes ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + host VARCHAR(256) UNIQUE, + + INDEX(host) +); + DELIMITER // -CREATE TRIGGER bi_measure BEFORE INSERT ON job +CREATE TRIGGER bi_measure BEFORE INSERT ON jobs FOR EACH ROW BEGIN SET NEW.changed = NOW(); END; // -CREATE TRIGGER bu_measure BEFORE UPDATE ON job +CREATE TRIGGER bu_measure BEFORE UPDATE ON jobs FOR EACH ROW BEGIN SET NEW.changed = NOW(); END; // diff --git a/jobs.py b/jobs.py index 725c7ed..c77a3f7 100755 --- a/jobs.py +++ b/jobs.py @@ -19,11 +19,14 @@ class Job: start = None finish = None -class Node: +class JobNode: elapsed = 0 map = 0 reduce = 0 +class Node: + host = None + base_mapred_url='' base_yarn_url='' db = None @@ -36,6 +39,8 @@ ssl=0 host=socket.getfqdn() id = None +node_hosts = dict() +node_ids = dict() jobs = dict() c = pycurl.Curl() @@ -246,6 +251,18 @@ if db: db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname) st = db.cursor() + data = st.execute('SELECT id, host FROM nodes') + while 1: + data = st.fetchone() + if data: + node = Node() + node.id = data[0] + node.host = data[1] + node_ids[node.id] = node + node_hosts[node.host] = node + else: + break + regHost = re.compile(':\d+') counter=1 for id, job in jobs.iteritems(): @@ -298,37 +315,37 @@ for id, job in jobs.iteritems(): CPU=8 MAP=9 REDUCE=10 - st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id=%s", id) + st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id) data = st.fetchone() if data: if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[START] == job.start and data[FINISH] == job.finish: if debug >= 2: print '[db] job %s found' % id else: - st.execute("UPDATE job SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id)) + st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id)) if debug >= 2: print '[db] job %s updated' % id changed = 1 else: - st.execute("INSERT INTO job (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish)) + st.execute("INSERT INTO jobs (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish)) if debug >= 2: print '[db] job %s inserted' % id changed = 1 if job.mapred: if data and data[SUBMIT] == job.mapred['submitTime'] and data[MAP] == job.mapred['mapsTotal'] and data[REDUCE] == job.mapred['reducesTotal']: if debug >= 2: print '[db] job %s mapred is actual' % id else: - st.execute("UPDATE job SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id)) + st.execute("UPDATE jobs SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id)) if debug >= 2: print '[db] job %s mapred updated' % id changed = 1 if job.yarn: if data and data[MEMORY] == job.yarn['memorySeconds'] and data[CPU] == job.yarn['vcoreSeconds']: if debug >= 2: print '[db] job %s yarn is actual' % id else: - st.execute("UPDATE job SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id)) + st.execute("UPDATE jobs SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id)) if debug >= 2: print '[db] job %s yarn updated' % id changed = 1 # get details (intensive!), if new job or any other difference - nodes = dict() + jobnodes = dict() if job.mapred and (not db or changed): t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id) if t['tasks']: @@ -340,30 +357,37 @@ for id, job in jobs.iteritems(): if a['taskAttempts']: for attempt in a['taskAttempts']['taskAttempt']: nodeHost = regHost.sub('', attempt['nodeHttpAddress']) - if nodeHost not in nodes: - nodes[nodeHost] = Node() - nodes[nodeHost].elapsed += attempt['elapsedTime'] + if nodeHost not in jobnodes: + jobnodes[nodeHost] = JobNode() + jobnodes[nodeHost].elapsed += attempt['elapsedTime'] if attempt['type'] == 'MAP': - nodes[nodeHost].map += 1 + jobnodes[nodeHost].map += 1 elif attempt['type'] == 'REDUCE': - nodes[nodeHost].reduce += 1 + jobnodes[nodeHost].reduce += 1 else: raise Exception('unknown type %s' % attempt['type']) # print 'tasks elapsed: %d' % aggregate aggregate=0 - for nodename, node in nodes.iteritems(): - print ' node %s: %d' % (nodename, node.elapsed) - aggregate += node.elapsed + for nodename, jobnode in jobnodes.iteritems(): + print ' node %s: %d' % (nodename, jobnode.elapsed) + aggregate += jobnode.elapsed print ' ==> aggregated %d' % aggregate - if nodes and db: - st.execute("DELETE FROM node WHERE jobid=%s", id) - for nodename, node in nodes.iteritems(): - st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, nodename, node.elapsed, node.map, node.reduce)) + if jobnodes and db: + st.execute("DELETE FROM jobnodes WHERE jobid=%s", id) + for nodename, jobnode in jobnodes.iteritems(): + if not nodename in node_hosts.keys(): + st.execute('INSERT INTO nodes (host) VALUES (%s)', nodename) + node = Node() + node.id = db.insert_id() + node.host = nodename + node_hosts[nodename] = node + node_ids[node.id] = node + st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce)) if debug >= 2: print '[db] job %s nodes updated' % id # better to update timestamp exlicitly on the end of the transaction - st.execute('UPDATE job SET changed=NOW() WHERE id=%s', id); + st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id); if db: db.commit()