From: František Dvořák Date: Sun, 8 Mar 2015 22:16:53 +0000 (+0100) Subject: PBS generator: reshuffle, add sched_nodespec and memory. X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=87fbe8b9973521447a4207f083ddc09ce515c2b7;p=hadoop-jobstat.git PBS generator: reshuffle, add sched_nodespec and memory. --- diff --git a/stat2pbs.py b/stat2pbs.py index b071f27..c6cb412 100755 --- a/stat2pbs.py +++ b/stat2pbs.py @@ -12,80 +12,97 @@ dbuser = 'bookkeeping' dbpassword = '' statfile = 'stat2pbs.txt' -cluster_hostname ='hador.ics.muni.cz' -realm = 'META' +CLUSTER_HOSTNAME ='hador.ics.muni.cz' +NODE_MEM=4 +NODE_VMEM=64 +REALM = 'META' + db = None st = None +st_sub = None def gen_string(job): jobnodes = dict() + pbs = list() - st_local = db.cursor(MySQLdb.cursors.DictCursor) - - st_local.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id']) + st_sub.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id']) while 1: - jobnode = st_local.fetchone() + jobnode = st_sub.fetchone() if not jobnode: break jobnodes[jobnode['host']] = jobnode - st_local.close() - log_time = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime()) - if job['status'] == 'RUNNING': - status = 'R' - elif job['status'] in ('SUCCEEDED', 'KILLED', 'FINISHED'): - status = 'C' - elif job['status'] in ('SUBMITTED', 'ACCEPTED'): - status = 'Q' - elif job['status'] in ('NEW', 'NEW_SAVING'): - status = 'T' - elif job['status'] in ('FAILED', 'UNDEFINED'): + if job['status'] in ('SUCCEEDED', 'KILLED', 'FINISHED', 'FAILED'): status = 'E' + elif job['status'] in ('ACCEPTED', 'NEW', 'NEW_SAVING', 'RUNNING', 'SUBMITTED', 'UNDEFINED'): + status = 'U' else: print 'Unknown status %s' % job['status'] sys.exit(1) - if job['status'] == 'SUCCEEDED': - pbs_exit_status=' exit_status=0' - elif job['status'] in ('NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED'): - pbs_exit_status='' - else: - pbs_exit_status=' exit_status=-1' + pbs.append('%s;%s;%s.%s;user=%s' % (log_time, status, job['id'], CLUSTER_HOSTNAME, job['user'])) + + pbs.append('group=hadoop') - pbs_name = '' if job['name']: #name = re.sub("[^a-z0-9A-Z\./\"'()]", '_', job['name']) name = re.sub("[ :]", '_', job['name']) - pbs_name=' jobname=%s' % name + pbs.append('jobname=%s' % name) + + pbs.append('queue=%s' % job['queue']) + + if job['status'] in ('SUCCEEDED', 'FINISHED'): + pbs_exit_status='exit_status=0' + elif job['status'] in ('ACCEPTED', 'NEW', 'NEW_SAVING', 'RUNNING', 'SUBMITTED', 'UNDEFINED'): + pbs_exit_status=None + else: + pbs_exit_status='exit_status=-1' + pbs_name = '' pbs_submit = '' pbs_qtime = '' pbs_etime = '' if job['submit']: - pbs_submit = ' ctime=%d' % (int(job['submit']) / 1000) - pbs_qtime = ' qtime=%d' % (int(job['submit']) / 1000) - pbs_etime = ' etime=%d' % (int(job['submit']) / 1000) + t = int(job['submit']) / 1000 + pbs.append('ctime=%d' % t) + pbs.append('qtime=%d' % t) + pbs.append('etime=%d' % t) + + pbs.append('start=%d' % (int(job['start']) / 1000)) + + pbs.append('end=%d' % (int(job['finish']) / 1000)) + + pbs.append('owner=%s@%s' % (job['user'], REALM)) - pbs_exec_host = '' if jobnodes: joblist = jobnodes.keys() joblist.sort() exec_host=str.join("+", joblist) - pbs_exec_host = ' exec_host=%s' % exec_host + pbs.append('exec_host=%s' % exec_host) + + for i in range(len(joblist)): + joblist[i] = 'host=%s:ppn=1:mem=%dGB:vmem=%dGB' % (joblist[i], NODE_MEM, NODE_VMEM) + sched_nodespec = '+'.join(joblist) + pbs.append('sched_nodespec=%s' % sched_nodespec) + + if pbs_exit_status: + pbs.append(pbs_exit_status) - pbs_cput='' if job['cpu_seconds']: cput = time.strftime('%H:%M:%S', time.gmtime(job['cpu_seconds'])) - pbs_cput = ' resources_used.cput=%s' % cput + pbs.append('resources_used.cput=%s' % cput) + + if jobnodes: + pbs.append('resource_used.mem=%dGB' % (len(joblist) * NODE_MEM)) + pbs.append('resource_used.vmem=%dGB' % (len(joblist) * NODE_VMEM)) - pbs_walltime = '' if job['finish']: walltime = time.strftime('%H:%M:%S', time.gmtime((long(job['finish']) - long(job['start'])) / 1000)) - pbs_walltime = ' resources_used.walltime=%s' % walltime + pbs.append('resources_used.walltime=%s' % walltime) - return '%s;%s;%s.%s;user=%s group=hadoop%s queue=%s%s%s%s start=%d end=%d owner=%s@%s%s%s%s%s' % (log_time, status, job['id'], cluster_hostname, job['user'], pbs_name, job['queue'], pbs_submit, pbs_qtime, pbs_etime, int(job['start']) / 1000, int(job['finish']) / 1000, job['user'], realm, pbs_exec_host, pbs_exit_status, pbs_cput, pbs_walltime) + return ' '.join(pbs) try: @@ -149,6 +166,8 @@ print 'Timestamp: %s' % ts db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname) st = db.cursor(MySQLdb.cursors.DictCursor) +st_sub = db.cursor(MySQLdb.cursors.DictCursor) + # beware of actually running second - rather get only >5 seconds older changes if ts: