PBS generator: reshuffle, add sched_nodespec and memory.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 8 Mar 2015 22:16:53 +0000 (23:16 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 8 Mar 2015 22:16:53 +0000 (23:16 +0100)
stat2pbs.py

index b071f27..c6cb412 100755 (executable)
@@ -12,80 +12,97 @@ dbuser = 'bookkeeping'
 dbpassword = ''
 statfile = 'stat2pbs.txt'
 
-cluster_hostname ='hador.ics.muni.cz'
-realm = 'META'
+CLUSTER_HOSTNAME ='hador.ics.muni.cz'
+NODE_MEM=4
+NODE_VMEM=64
+REALM = 'META'
+
 
 db = None
 st = None
+st_sub = None
 
 def gen_string(job):
        jobnodes = dict()
+       pbs = list()
 
-       st_local = db.cursor(MySQLdb.cursors.DictCursor)
-
-       st_local.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id'])
+       st_sub.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id'])
        while 1:
-               jobnode = st_local.fetchone()
+               jobnode = st_sub.fetchone()
                if not jobnode:
                        break
                jobnodes[jobnode['host']] = jobnode
 
-       st_local.close()
-
        log_time = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime())
-       if job['status'] == 'RUNNING':
-               status = 'R'
-       elif job['status'] in ('SUCCEEDED', 'KILLED', 'FINISHED'):
-               status = 'C'
-       elif job['status'] in ('SUBMITTED', 'ACCEPTED'):
-               status = 'Q'
-       elif job['status'] in ('NEW', 'NEW_SAVING'):
-               status = 'T'
-       elif job['status'] in ('FAILED', 'UNDEFINED'):
+       if job['status'] in ('SUCCEEDED', 'KILLED', 'FINISHED', 'FAILED'):
                status = 'E'
+       elif job['status'] in ('ACCEPTED', 'NEW', 'NEW_SAVING', 'RUNNING', 'SUBMITTED', 'UNDEFINED'):
+               status = 'U'
        else:
                print 'Unknown status %s' % job['status']
                sys.exit(1)
 
-       if job['status'] == 'SUCCEEDED':
-               pbs_exit_status=' exit_status=0'
-       elif job['status'] in ('NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED'):
-               pbs_exit_status=''
-       else:
-               pbs_exit_status=' exit_status=-1'
+       pbs.append('%s;%s;%s.%s;user=%s' % (log_time, status, job['id'], CLUSTER_HOSTNAME, job['user']))
+
+       pbs.append('group=hadoop')
 
-       pbs_name = ''
        if job['name']:
                #name = re.sub("[^a-z0-9A-Z\./\"'()]", '_', job['name'])
                name = re.sub("[ :]", '_', job['name'])
-               pbs_name=' jobname=%s' % name
+               pbs.append('jobname=%s' % name)
+
+       pbs.append('queue=%s' % job['queue'])
+
+       if job['status'] in ('SUCCEEDED', 'FINISHED'):
+               pbs_exit_status='exit_status=0'
+       elif job['status'] in ('ACCEPTED', 'NEW', 'NEW_SAVING', 'RUNNING', 'SUBMITTED', 'UNDEFINED'):
+               pbs_exit_status=None
+       else:
+               pbs_exit_status='exit_status=-1'
 
+       pbs_name = ''
        pbs_submit = ''
        pbs_qtime = ''
        pbs_etime = ''
        if job['submit']:
-               pbs_submit = ' ctime=%d' % (int(job['submit']) / 1000)
-               pbs_qtime = ' qtime=%d' % (int(job['submit']) / 1000)
-               pbs_etime = ' etime=%d' % (int(job['submit']) / 1000)
+               t = int(job['submit']) / 1000
+               pbs.append('ctime=%d' % t)
+               pbs.append('qtime=%d' % t)
+               pbs.append('etime=%d' % t)
+
+       pbs.append('start=%d' % (int(job['start']) / 1000))
+
+       pbs.append('end=%d' % (int(job['finish']) / 1000))
+
+       pbs.append('owner=%s@%s' % (job['user'], REALM))
 
-       pbs_exec_host = ''
        if jobnodes:
                joblist = jobnodes.keys()
                joblist.sort()
                exec_host=str.join("+", joblist)
-               pbs_exec_host = ' exec_host=%s' % exec_host
+               pbs.append('exec_host=%s' % exec_host)
+
+               for i in range(len(joblist)):
+                       joblist[i] = 'host=%s:ppn=1:mem=%dGB:vmem=%dGB' % (joblist[i], NODE_MEM, NODE_VMEM)
+               sched_nodespec = '+'.join(joblist)
+               pbs.append('sched_nodespec=%s' % sched_nodespec)
+
+       if pbs_exit_status:
+               pbs.append(pbs_exit_status)
 
-       pbs_cput=''
        if job['cpu_seconds']:
                cput = time.strftime('%H:%M:%S', time.gmtime(job['cpu_seconds']))
-               pbs_cput = ' resources_used.cput=%s' % cput
+               pbs.append('resources_used.cput=%s' % cput)
+
+       if jobnodes:
+               pbs.append('resource_used.mem=%dGB' % (len(joblist) * NODE_MEM))
+               pbs.append('resource_used.vmem=%dGB' % (len(joblist) * NODE_VMEM))
 
-       pbs_walltime = ''
        if job['finish']:
                walltime = time.strftime('%H:%M:%S', time.gmtime((long(job['finish']) - long(job['start'])) / 1000))
-               pbs_walltime = ' resources_used.walltime=%s' % walltime
+               pbs.append('resources_used.walltime=%s' % walltime)
 
-       return '%s;%s;%s.%s;user=%s group=hadoop%s queue=%s%s%s%s start=%d end=%d owner=%s@%s%s%s%s%s' % (log_time, status, job['id'], cluster_hostname, job['user'], pbs_name, job['queue'], pbs_submit, pbs_qtime, pbs_etime, int(job['start']) / 1000, int(job['finish']) / 1000, job['user'], realm, pbs_exec_host, pbs_exit_status, pbs_cput, pbs_walltime)
+       return ' '.join(pbs)
 
 
 try:
@@ -149,6 +166,8 @@ print 'Timestamp: %s' % ts
 
 db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
 st = db.cursor(MySQLdb.cursors.DictCursor)
+st_sub = db.cursor(MySQLdb.cursors.DictCursor)
+
 
 # beware of actually running second - rather get only >5 seconds older changes
 if ts: