The first real PBS logs.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 16:33:37 +0000 (17:33 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 16:33:37 +0000 (17:33 +0100)
stat2pbs.py

index 7986ffb..70b994c 100755 (executable)
@@ -1,7 +1,9 @@
 #! /usr/bin/python2
 
 import getopt
+import re
 import sys
+import time
 import MySQLdb
 
 dbhost = 'localhost'
@@ -10,6 +12,76 @@ dbuser = 'hadoop'
 dbpassword = ''
 statfile = 'stat2pbs.txt'
 
+cluster_hostname ='hador.ics.muni.cz'
+realm = 'META'
+
+db = None
+st = None
+
+def gen_string(job):
+       jobnodes = dict()
+
+       st_local = db.cursor(MySQLdb.cursors.DictCursor)
+
+       st_local.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id'])
+       while 1:
+               jobnode = st_local.fetchone()
+               if not jobnode:
+                       break
+               jobnodes[jobnode['host']] = jobnode
+
+       st_local.close()
+
+       log_time = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime())
+       if job['status'] == 'RUNNING':
+               status = 'R'
+       elif job['status'] in ('SUCCEEDED', 'KILLED'):
+               status = 'C'
+       elif job['status'] in ('SUBMITTED', 'ACCEPTED'):
+               status = 'Q'
+       elif job['status'] in ('NEW', 'NEW_SAVING'):
+               status = 'T'
+       elif job['status'] in ('FAILED', 'UNDEFINED'):
+               status = 'E'
+
+       if job['status'] == 'SUCCEEDED':
+               pbs_exit_status=' exit_status=0'
+       elif job['status'] in ('NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED'):
+               pbs_exit_status=''
+       else:
+               pbs_exit_status=' exit_status=-1'
+
+       pbs_name = ''
+       if job['name']:
+               #name = re.sub("[^a-z0-9A-Z\./\"'()]", '_', job['name'])
+               name = re.sub("[ :]", '_', job['name'])
+               pbs_name=' jobname=%s' % name
+
+       pbs_submit = ''
+       pbs_qtime = ''
+       pbs_etime = ''
+       if job['submit']:
+               pbs_submit = ' ctime=%d' % (int(job['submit']) / 1000)
+               pbs_qtime = ' qtime=%d' % (int(job['submit']) / 1000)
+               pbs_etime = ' etime=%d' % (int(job['submit']) / 1000)
+
+       pbs_exec_host = ''
+       if jobnodes:
+               joblist = jobnodes.keys()
+               joblist.sort()
+               exec_host=str.join("+", joblist)
+               pbs_exec_host = ' exec_host=%s' % exec_host
+
+       pbs_cput=''
+       if job['cpu_seconds']:
+               cput = time.strftime('%H:%M:%S', time.gmtime(job['cpu_seconds']))
+               pbs_cput = ' resources_used.cput=%s' % cput
+
+       walltime = time.strftime('%H:%M:%S', time.gmtime((long(job['finish']) - long(job['start'])) / 1000))
+
+       return '%s;%s;%s.%s;user=%s group=hadoop%s queue=%s%s%s%s start=%d end=%d owner=%s@%s%s%s%s resources_used.walltime=%s' % (log_time, status, job['id'], cluster_hostname, job['user'], pbs_name, job['queue'], pbs_submit, pbs_qtime, pbs_etime, int(job['start']) / 1000, int(job['finish']) / 1000, job['user'], realm, pbs_exec_host, pbs_exit_status, pbs_cput, walltime)
+
+
 try:
        opts, args = getopt.getopt(sys.argv[1:], 'hc:s:', ['help', 'config=', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'statfile=' ])
 except getopt.GetoptError:
@@ -70,20 +142,19 @@ print 'Status file: %s' % statfile
 print 'Timestamp: %s' % ts
 
 db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
-st = db.cursor()
+st = db.cursor(MySQLdb.cursors.DictCursor)
 
 # beware of actually running second - rather get only >5 seconds older changes
 if ts:
-       st.execute('SELECT id, name, user, changed FROM job WHERE changed > %s AND TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY CHANGED', ts)
+       st.execute('SELECT * FROM jobs WHERE changed > %s AND TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY CHANGED', ts)
 else:
-       st.execute('SELECT id, name, user, changed FROM job WHERE TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY changed')
+       st.execute('SELECT * FROM jobs WHERE TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY changed')
 while 1:
-       data = st.fetchone()
-       if data:
-               print '%s, %s, %s' % (data[0], data[2], data[3])
-               ts = data[3]
-       else:
+       job = st.fetchone()
+       if not job:
                break
+       ts = job['changed']
+       print gen_string(job)
 
 db.close()