import MySQLdb
class Job:
+ name = None
status = None
user = None
mapred = None
# merge MapRed and YARN
if job.mapred:
+ job.name = job.mapred['name']
job.status = job.mapred['state']
job.user = job.mapred['user']
job.start = job.mapred['startTime']
job.finish = job.mapred['finishTime']
if job.yarn:
+ if not job.name:
+ job.name = job.yarn['name']
if not job.status:
job.status = job.yarn['finalStatus']
if not job.user:
print 'job %s (%d):' % (id, counter)
counter += 1
+ print ' name: %s' % job.name
print ' status: %s' % job.status
print ' user: %s' % job.user
if job.mapred:
if db:
changed = 0
- st.execute("SELECT id, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'" % id)
+ ID=0
+ NAME=1
+ USER=2
+ STATUS=3
+ SUBMIT=4
+ START=5
+ FINISH=6
+ MEMORY=7
+ CPU=8
+ MAP=9
+ REDUCE=10
+ st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'", id)
data = st.fetchone()
if data:
- if data[1] == job.user and data[2] == job.status and data[4] == job.start and data[5] == job.finish:
+ if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[START] == job.start and data[FINISH] == job.finish:
if debug >= 2: print '[db] job %s found' % id
else:
- st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id))
+ st.execute("UPDATE job SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, dbint(job.start), dbint(job.finish), id))
if debug >= 2: print '[db] job %s updated' % id
changed = 1
if debug >= 2: print '[db] job %s inserted' % id
changed = 1
if job.mapred:
- if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']:
+ if data and data[SUBMIT] == job.mapred['submitTime'] and data[MAP] == job.mapred['mapsTotal'] and data[REDUCE] == job.mapred['reducesTotal']:
if debug >= 2: print '[db] job %s mapred is actual' % id
else:
st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id))
if debug >= 2: print '[db] job %s mapred updated' % id
changed = 1
if job.yarn:
- if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']:
+ if data and data[MEMORY] == job.yarn['memorySeconds'] and data[CPU] == job.yarn['vcoreSeconds']:
if debug >= 2: print '[db] job %s yarn is actual' % id
else:
st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))