break
regHost = re.compile(':\d+')
-counter=1
+counter=0
for id, job in jobs.iteritems():
- # merge MapRed and YARN
+ counter += 1
+
if job.mapred:
job.name = job.mapred['name']
job.status = job.mapred['state']
job.queue = job.mapred['queue']
job.start = job.mapred['startTime']
job.finish = job.mapred['finishTime']
+
+ if db:
+ changed = 0
+ ID=0
+ NAME=1
+ USER=2
+ STATUS=3
+ QUEUE=4
+ SUBMIT=5
+ START=6
+ FINISH=7
+ MEMORY=8
+ CPU=9
+ MAP=10
+ REDUCE=11
+ st.execute("SELECT id, name, user, status, queue, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id)
+ data = st.fetchone()
+ if data:
+ if not job.name:
+ job.name = data[NAME]
+ if not job.status or job.status == 'UNDEFINED':
+ job.status = data[STATUS]
+ if not job.user:
+ job.user = data[USER]
+ if not job.queue:
+ job.queue = data[QUEUE]
+ if not job.start:
+ job.start = data[START]
+ if not job.finish:
+ job.finish = data[FINISH]
+
if job.yarn:
if not job.name:
job.name = job.yarn['name']
print ' user: %s' % job.user
print ' queue: %s' % job.queue
- counter += 1
-
- if job.mapred and debug >= 1:
- print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
- print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
- print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
+ if job.mapred:
+ print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
+ print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
+ print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
- if job.yarn and 'memorySeconds' in job.yarn.keys() and debug >= 1:
- print ' MB x s: %d' % job.yarn['memorySeconds']
- print ' CPU x s: %d' % job.yarn['vcoreSeconds']
+ if job.yarn and 'memorySeconds' in job.yarn.keys():
+ print ' MB x s: %d' % job.yarn['memorySeconds']
+ print ' CPU x s: %d' % job.yarn['vcoreSeconds']
if db:
- changed = 0
- ID=0
- NAME=1
- USER=2
- STATUS=3
- QUEUE=4
- SUBMIT=5
- START=6
- FINISH=7
- MEMORY=8
- CPU=9
- MAP=10
- REDUCE=11
- st.execute("SELECT id, name, user, status, queue, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id)
- data = st.fetchone()
if data:
if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[QUEUE] == job.queue and data[START] == job.start and data[FINISH] == job.finish:
if debug >= 3: print '[db] job %s found' % id
st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, queue=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.queue, job.start, job.finish, id))
if debug >= 3: print '[db] job %s updated' % id
changed = 1
-
else:
st.execute("INSERT INTO jobs (id, name, user, status, queue, start, finish) VALUES(%s, %s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.queue, job.start, job.finish))
if debug >= 3: print '[db] job %s inserted' % id
if debug >= 3: print '[db] job %s yarn updated' % id
changed = 1
- # check for details in DB
+ # check for details in DB, set changed flag if missing
st.execute('SELECT * FROM jobnodes WHERE jobid=%s', id)
data = st.fetchone()
if data: