#! /usr/bin/python2
import getopt
+import re
import sys
+import time
import MySQLdb
dbhost = 'localhost'
dbpassword = ''
statfile = 'stat2pbs.txt'
+cluster_hostname ='hador.ics.muni.cz'
+realm = 'META'
+
+db = None
+st = None
+
+def gen_string(job):
+ jobnodes = dict()
+
+ st_local = db.cursor(MySQLdb.cursors.DictCursor)
+
+ st_local.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id'])
+ while 1:
+ jobnode = st_local.fetchone()
+ if not jobnode:
+ break
+ jobnodes[jobnode['host']] = jobnode
+
+ st_local.close()
+
+ log_time = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime())
+ if job['status'] == 'RUNNING':
+ status = 'R'
+ elif job['status'] in ('SUCCEEDED', 'KILLED'):
+ status = 'C'
+ elif job['status'] in ('SUBMITTED', 'ACCEPTED'):
+ status = 'Q'
+ elif job['status'] in ('NEW', 'NEW_SAVING'):
+ status = 'T'
+ elif job['status'] in ('FAILED', 'UNDEFINED'):
+ status = 'E'
+
+ if job['status'] == 'SUCCEEDED':
+ pbs_exit_status=' exit_status=0'
+ elif job['status'] in ('NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED'):
+ pbs_exit_status=''
+ else:
+ pbs_exit_status=' exit_status=-1'
+
+ pbs_name = ''
+ if job['name']:
+ #name = re.sub("[^a-z0-9A-Z\./\"'()]", '_', job['name'])
+ name = re.sub("[ :]", '_', job['name'])
+ pbs_name=' jobname=%s' % name
+
+ pbs_submit = ''
+ pbs_qtime = ''
+ pbs_etime = ''
+ if job['submit']:
+ pbs_submit = ' ctime=%d' % (int(job['submit']) / 1000)
+ pbs_qtime = ' qtime=%d' % (int(job['submit']) / 1000)
+ pbs_etime = ' etime=%d' % (int(job['submit']) / 1000)
+
+ pbs_exec_host = ''
+ if jobnodes:
+ joblist = jobnodes.keys()
+ joblist.sort()
+ exec_host=str.join("+", joblist)
+ pbs_exec_host = ' exec_host=%s' % exec_host
+
+ pbs_cput=''
+ if job['cpu_seconds']:
+ cput = time.strftime('%H:%M:%S', time.gmtime(job['cpu_seconds']))
+ pbs_cput = ' resources_used.cput=%s' % cput
+
+ walltime = time.strftime('%H:%M:%S', time.gmtime((long(job['finish']) - long(job['start'])) / 1000))
+
+ return '%s;%s;%s.%s;user=%s group=hadoop%s queue=%s%s%s%s start=%d end=%d owner=%s@%s%s%s%s resources_used.walltime=%s' % (log_time, status, job['id'], cluster_hostname, job['user'], pbs_name, job['queue'], pbs_submit, pbs_qtime, pbs_etime, int(job['start']) / 1000, int(job['finish']) / 1000, job['user'], realm, pbs_exec_host, pbs_exit_status, pbs_cput, walltime)
+
+
try:
opts, args = getopt.getopt(sys.argv[1:], 'hc:s:', ['help', 'config=', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'statfile=' ])
except getopt.GetoptError:
print 'Timestamp: %s' % ts
db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
-st = db.cursor()
+st = db.cursor(MySQLdb.cursors.DictCursor)
# beware of actually running second - rather get only >5 seconds older changes
if ts:
- st.execute('SELECT id, name, user, changed FROM job WHERE changed > %s AND TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY CHANGED', ts)
+ st.execute('SELECT * FROM jobs WHERE changed > %s AND TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY CHANGED', ts)
else:
- st.execute('SELECT id, name, user, changed FROM job WHERE TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY changed')
+ st.execute('SELECT * FROM jobs WHERE TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY changed')
while 1:
- data = st.fetchone()
- if data:
- print '%s, %s, %s' % (data[0], data[2], data[3])
- ts = data[3]
- else:
+ job = st.fetchone()
+ if not job:
break
+ ts = job['changed']
+ print gen_string(job)
db.close()