Always take precedence to mapred before old data or yarn.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 6 Mar 2015 21:50:03 +0000 (22:50 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 6 Mar 2015 21:50:03 +0000 (22:50 +0100)
jobs.py

diff --git a/jobs.py b/jobs.py
index 08bfcb3..9018e53 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -270,10 +270,11 @@ if db:
                        break
 
 regHost = re.compile(':\d+')
-counter=1
+counter=0
 for id, job in jobs.iteritems():
 
-       # merge MapRed and YARN
+       counter += 1
+
        if job.mapred:
                job.name = job.mapred['name']
                job.status = job.mapred['state']
@@ -281,6 +282,37 @@ for id, job in jobs.iteritems():
                job.queue = job.mapred['queue']
                job.start = job.mapred['startTime']
                job.finish = job.mapred['finishTime']
+
+       if db:
+               changed = 0
+               ID=0
+               NAME=1
+               USER=2
+               STATUS=3
+               QUEUE=4
+               SUBMIT=5
+               START=6
+               FINISH=7
+               MEMORY=8
+               CPU=9
+               MAP=10
+               REDUCE=11
+               st.execute("SELECT id, name, user, status, queue, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id)
+               data = st.fetchone()
+               if data:
+                       if not job.name:
+                               job.name = data[NAME]
+                       if not job.status or job.status == 'UNDEFINED':
+                               job.status = data[STATUS]
+                       if not job.user:
+                               job.user = data[USER]
+                       if not job.queue:
+                               job.queue = data[QUEUE]
+                       if not job.start:
+                               job.start = data[START]
+                       if not job.finish:
+                               job.finish = data[FINISH]
+
        if job.yarn:
                if not job.name:
                        job.name = job.yarn['name']
@@ -304,33 +336,16 @@ for id, job in jobs.iteritems():
                print '  user: %s' % job.user
                print '  queue: %s' % job.queue
 
-       counter += 1
-
-       if job.mapred and debug >= 1:
-               print '  submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
-               print '  elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
-               print '  finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
+               if job.mapred:
+                       print '  submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
+                       print '  elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
+                       print '  finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
 
-       if job.yarn and 'memorySeconds' in job.yarn.keys() and debug >= 1:
-               print '  MB x s: %d' % job.yarn['memorySeconds']
-               print '  CPU x s: %d' % job.yarn['vcoreSeconds']
+               if job.yarn and 'memorySeconds' in job.yarn.keys():
+                       print '  MB x s: %d' % job.yarn['memorySeconds']
+                       print '  CPU x s: %d' % job.yarn['vcoreSeconds']
 
        if db:
-               changed = 0
-               ID=0
-               NAME=1
-               USER=2
-               STATUS=3
-               QUEUE=4
-               SUBMIT=5
-               START=6
-               FINISH=7
-               MEMORY=8
-               CPU=9
-               MAP=10
-               REDUCE=11
-               st.execute("SELECT id, name, user, status, queue, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id)
-               data = st.fetchone()
                if data:
                        if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[QUEUE] == job.queue and data[START] == job.start and data[FINISH] == job.finish:
                                if debug >= 3: print '[db] job %s found' % id
@@ -338,7 +353,6 @@ for id, job in jobs.iteritems():
                                st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, queue=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.queue, job.start, job.finish, id))
                                if debug >= 3: print '[db] job %s updated' % id
                                changed = 1
-
                else:
                        st.execute("INSERT INTO jobs (id, name, user, status, queue, start, finish) VALUES(%s, %s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.queue, job.start, job.finish))
                        if debug >= 3: print '[db] job %s inserted' % id
@@ -358,7 +372,7 @@ for id, job in jobs.iteritems():
                                if debug >= 3: print '[db] job %s yarn updated' % id
                                changed = 1
 
-               # check for details in DB
+               # check for details in DB, set changed flag if missing
                st.execute('SELECT * FROM jobnodes WHERE jobid=%s', id)
                data = st.fetchone()
                if data: