dbpassword = ''
statfile = 'stat2pbs.txt'
-cluster_hostname ='hador.ics.muni.cz'
-realm = 'META'
+CLUSTER_HOSTNAME ='hador.ics.muni.cz'
+NODE_MEM=4
+NODE_VMEM=64
+REALM = 'META'
+
db = None
st = None
+st_sub = None
def gen_string(job):
jobnodes = dict()
+ pbs = list()
- st_local = db.cursor(MySQLdb.cursors.DictCursor)
-
- st_local.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id'])
+ st_sub.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id'])
while 1:
- jobnode = st_local.fetchone()
+ jobnode = st_sub.fetchone()
if not jobnode:
break
jobnodes[jobnode['host']] = jobnode
- st_local.close()
-
log_time = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime())
- if job['status'] == 'RUNNING':
- status = 'R'
- elif job['status'] in ('SUCCEEDED', 'KILLED', 'FINISHED'):
- status = 'C'
- elif job['status'] in ('SUBMITTED', 'ACCEPTED'):
- status = 'Q'
- elif job['status'] in ('NEW', 'NEW_SAVING'):
- status = 'T'
- elif job['status'] in ('FAILED', 'UNDEFINED'):
+ if job['status'] in ('SUCCEEDED', 'KILLED', 'FINISHED', 'FAILED'):
status = 'E'
+ elif job['status'] in ('ACCEPTED', 'NEW', 'NEW_SAVING', 'RUNNING', 'SUBMITTED', 'UNDEFINED'):
+ status = 'U'
else:
print 'Unknown status %s' % job['status']
sys.exit(1)
- if job['status'] == 'SUCCEEDED':
- pbs_exit_status=' exit_status=0'
- elif job['status'] in ('NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED'):
- pbs_exit_status=''
- else:
- pbs_exit_status=' exit_status=-1'
+ pbs.append('%s;%s;%s.%s;user=%s' % (log_time, status, job['id'], CLUSTER_HOSTNAME, job['user']))
+
+ pbs.append('group=hadoop')
- pbs_name = ''
if job['name']:
#name = re.sub("[^a-z0-9A-Z\./\"'()]", '_', job['name'])
name = re.sub("[ :]", '_', job['name'])
- pbs_name=' jobname=%s' % name
+ pbs.append('jobname=%s' % name)
+
+ pbs.append('queue=%s' % job['queue'])
+
+ if job['status'] in ('SUCCEEDED', 'FINISHED'):
+ pbs_exit_status='exit_status=0'
+ elif job['status'] in ('ACCEPTED', 'NEW', 'NEW_SAVING', 'RUNNING', 'SUBMITTED', 'UNDEFINED'):
+ pbs_exit_status=None
+ else:
+ pbs_exit_status='exit_status=-1'
+ pbs_name = ''
pbs_submit = ''
pbs_qtime = ''
pbs_etime = ''
if job['submit']:
- pbs_submit = ' ctime=%d' % (int(job['submit']) / 1000)
- pbs_qtime = ' qtime=%d' % (int(job['submit']) / 1000)
- pbs_etime = ' etime=%d' % (int(job['submit']) / 1000)
+ t = int(job['submit']) / 1000
+ pbs.append('ctime=%d' % t)
+ pbs.append('qtime=%d' % t)
+ pbs.append('etime=%d' % t)
+
+ pbs.append('start=%d' % (int(job['start']) / 1000))
+
+ pbs.append('end=%d' % (int(job['finish']) / 1000))
+
+ pbs.append('owner=%s@%s' % (job['user'], REALM))
- pbs_exec_host = ''
if jobnodes:
joblist = jobnodes.keys()
joblist.sort()
exec_host=str.join("+", joblist)
- pbs_exec_host = ' exec_host=%s' % exec_host
+ pbs.append('exec_host=%s' % exec_host)
+
+ for i in range(len(joblist)):
+ joblist[i] = 'host=%s:ppn=1:mem=%dGB:vmem=%dGB' % (joblist[i], NODE_MEM, NODE_VMEM)
+ sched_nodespec = '+'.join(joblist)
+ pbs.append('sched_nodespec=%s' % sched_nodespec)
+
+ if pbs_exit_status:
+ pbs.append(pbs_exit_status)
- pbs_cput=''
if job['cpu_seconds']:
cput = time.strftime('%H:%M:%S', time.gmtime(job['cpu_seconds']))
- pbs_cput = ' resources_used.cput=%s' % cput
+ pbs.append('resources_used.cput=%s' % cput)
+
+ if jobnodes:
+ pbs.append('resource_used.mem=%dGB' % (len(joblist) * NODE_MEM))
+ pbs.append('resource_used.vmem=%dGB' % (len(joblist) * NODE_VMEM))
- pbs_walltime = ''
if job['finish']:
walltime = time.strftime('%H:%M:%S', time.gmtime((long(job['finish']) - long(job['start'])) / 1000))
- pbs_walltime = ' resources_used.walltime=%s' % walltime
+ pbs.append('resources_used.walltime=%s' % walltime)
- return '%s;%s;%s.%s;user=%s group=hadoop%s queue=%s%s%s%s start=%d end=%d owner=%s@%s%s%s%s%s' % (log_time, status, job['id'], cluster_hostname, job['user'], pbs_name, job['queue'], pbs_submit, pbs_qtime, pbs_etime, int(job['start']) / 1000, int(job['finish']) / 1000, job['user'], realm, pbs_exec_host, pbs_exit_status, pbs_cput, pbs_walltime)
+ return ' '.join(pbs)
try:
db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
st = db.cursor(MySQLdb.cursors.DictCursor)
+st_sub = db.cursor(MySQLdb.cursors.DictCursor)
+
# beware of actually running second - rather get only >5 seconds older changes
if ts: