From 8d452d204b9db195fbcd98e321a1fc60abef72ac Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Fri, 6 Mar 2015 22:50:03 +0100 Subject: [PATCH] Always take precedence to mapred before old data or yarn. --- jobs.py | 70 +++++++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/jobs.py b/jobs.py index 08bfcb3..9018e53 100755 --- a/jobs.py +++ b/jobs.py @@ -270,10 +270,11 @@ if db: break regHost = re.compile(':\d+') -counter=1 +counter=0 for id, job in jobs.iteritems(): - # merge MapRed and YARN + counter += 1 + if job.mapred: job.name = job.mapred['name'] job.status = job.mapred['state'] @@ -281,6 +282,37 @@ for id, job in jobs.iteritems(): job.queue = job.mapred['queue'] job.start = job.mapred['startTime'] job.finish = job.mapred['finishTime'] + + if db: + changed = 0 + ID=0 + NAME=1 + USER=2 + STATUS=3 + QUEUE=4 + SUBMIT=5 + START=6 + FINISH=7 + MEMORY=8 + CPU=9 + MAP=10 + REDUCE=11 + st.execute("SELECT id, name, user, status, queue, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id) + data = st.fetchone() + if data: + if not job.name: + job.name = data[NAME] + if not job.status or job.status == 'UNDEFINED': + job.status = data[STATUS] + if not job.user: + job.user = data[USER] + if not job.queue: + job.queue = data[QUEUE] + if not job.start: + job.start = data[START] + if not job.finish: + job.finish = data[FINISH] + if job.yarn: if not job.name: job.name = job.yarn['name'] @@ -304,33 +336,16 @@ for id, job in jobs.iteritems(): print ' user: %s' % job.user print ' queue: %s' % job.queue - counter += 1 - - if job.mapred and debug >= 1: - print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime']) - print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0) - print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0) + if job.mapred: + print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime']) + print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0) + print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0) - if job.yarn and 'memorySeconds' in job.yarn.keys() and debug >= 1: - print ' MB x s: %d' % job.yarn['memorySeconds'] - print ' CPU x s: %d' % job.yarn['vcoreSeconds'] + if job.yarn and 'memorySeconds' in job.yarn.keys(): + print ' MB x s: %d' % job.yarn['memorySeconds'] + print ' CPU x s: %d' % job.yarn['vcoreSeconds'] if db: - changed = 0 - ID=0 - NAME=1 - USER=2 - STATUS=3 - QUEUE=4 - SUBMIT=5 - START=6 - FINISH=7 - MEMORY=8 - CPU=9 - MAP=10 - REDUCE=11 - st.execute("SELECT id, name, user, status, queue, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id) - data = st.fetchone() if data: if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[QUEUE] == job.queue and data[START] == job.start and data[FINISH] == job.finish: if debug >= 3: print '[db] job %s found' % id @@ -338,7 +353,6 @@ for id, job in jobs.iteritems(): st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, queue=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.queue, job.start, job.finish, id)) if debug >= 3: print '[db] job %s updated' % id changed = 1 - else: st.execute("INSERT INTO jobs (id, name, user, status, queue, start, finish) VALUES(%s, %s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.queue, job.start, job.finish)) if debug >= 3: print '[db] job %s inserted' % id @@ -358,7 +372,7 @@ for id, job in jobs.iteritems(): if debug >= 3: print '[db] job %s yarn updated' % id changed = 1 - # check for details in DB + # check for details in DB, set changed flag if missing st.execute('SELECT * FROM jobnodes WHERE jobid=%s', id) data = st.fetchone() if data: -- 1.8.2.3