start = None
finish = None
-class Node:
+class JobNode:
elapsed = 0
map = 0
reduce = 0
+class Node:
+ host = None
+
base_mapred_url=''
base_yarn_url=''
db = None
host=socket.getfqdn()
id = None
+node_hosts = dict()
+node_ids = dict()
jobs = dict()
c = pycurl.Curl()
db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
st = db.cursor()
+ data = st.execute('SELECT id, host FROM nodes')
+ while 1:
+ data = st.fetchone()
+ if data:
+ node = Node()
+ node.id = data[0]
+ node.host = data[1]
+ node_ids[node.id] = node
+ node_hosts[node.host] = node
+ else:
+ break
+
regHost = re.compile(':\d+')
counter=1
for id, job in jobs.iteritems():
CPU=8
MAP=9
REDUCE=10
- st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id=%s", id)
+ st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id)
data = st.fetchone()
if data:
if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[START] == job.start and data[FINISH] == job.finish:
if debug >= 2: print '[db] job %s found' % id
else:
- st.execute("UPDATE job SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id))
+ st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id))
if debug >= 2: print '[db] job %s updated' % id
changed = 1
else:
- st.execute("INSERT INTO job (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish))
+ st.execute("INSERT INTO jobs (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish))
if debug >= 2: print '[db] job %s inserted' % id
changed = 1
if job.mapred:
if data and data[SUBMIT] == job.mapred['submitTime'] and data[MAP] == job.mapred['mapsTotal'] and data[REDUCE] == job.mapred['reducesTotal']:
if debug >= 2: print '[db] job %s mapred is actual' % id
else:
- st.execute("UPDATE job SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id))
+ st.execute("UPDATE jobs SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id))
if debug >= 2: print '[db] job %s mapred updated' % id
changed = 1
if job.yarn:
if data and data[MEMORY] == job.yarn['memorySeconds'] and data[CPU] == job.yarn['vcoreSeconds']:
if debug >= 2: print '[db] job %s yarn is actual' % id
else:
- st.execute("UPDATE job SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
+ st.execute("UPDATE jobs SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
if debug >= 2: print '[db] job %s yarn updated' % id
changed = 1
# get details (intensive!), if new job or any other difference
- nodes = dict()
+ jobnodes = dict()
if job.mapred and (not db or changed):
t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
if t['tasks']:
if a['taskAttempts']:
for attempt in a['taskAttempts']['taskAttempt']:
nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
- if nodeHost not in nodes:
- nodes[nodeHost] = Node()
- nodes[nodeHost].elapsed += attempt['elapsedTime']
+ if nodeHost not in jobnodes:
+ jobnodes[nodeHost] = JobNode()
+ jobnodes[nodeHost].elapsed += attempt['elapsedTime']
if attempt['type'] == 'MAP':
- nodes[nodeHost].map += 1
+ jobnodes[nodeHost].map += 1
elif attempt['type'] == 'REDUCE':
- nodes[nodeHost].reduce += 1
+ jobnodes[nodeHost].reduce += 1
else:
raise Exception('unknown type %s' % attempt['type'])
# print 'tasks elapsed: %d' % aggregate
aggregate=0
- for nodename, node in nodes.iteritems():
- print ' node %s: %d' % (nodename, node.elapsed)
- aggregate += node.elapsed
+ for nodename, jobnode in jobnodes.iteritems():
+ print ' node %s: %d' % (nodename, jobnode.elapsed)
+ aggregate += jobnode.elapsed
print ' ==> aggregated %d' % aggregate
- if nodes and db:
- st.execute("DELETE FROM node WHERE jobid=%s", id)
- for nodename, node in nodes.iteritems():
- st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, nodename, node.elapsed, node.map, node.reduce))
+ if jobnodes and db:
+ st.execute("DELETE FROM jobnodes WHERE jobid=%s", id)
+ for nodename, jobnode in jobnodes.iteritems():
+ if not nodename in node_hosts.keys():
+ st.execute('INSERT INTO nodes (host) VALUES (%s)', nodename)
+ node = Node()
+ node.id = db.insert_id()
+ node.host = nodename
+ node_hosts[nodename] = node
+ node_ids[node.id] = node
+ st.execute("INSERT INTO jobnodes (jobid, nodeid, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, node_hosts[nodename].id, jobnode.elapsed, jobnode.map, jobnode.reduce))
if debug >= 2: print '[db] job %s nodes updated' % id
# better to update timestamp exlicitly on the end of the transaction
- st.execute('UPDATE job SET changed=NOW() WHERE id=%s', id);
+ st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id);
if db:
db.commit()