From 9fc4bb325913c01484ed6a189ee5c358c5bb958c Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Thu, 5 Mar 2015 17:33:37 +0100 Subject: [PATCH] The first real PBS logs. --- stat2pbs.py | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 79 insertions(+), 8 deletions(-) diff --git a/stat2pbs.py b/stat2pbs.py index 7986ffb..70b994c 100755 --- a/stat2pbs.py +++ b/stat2pbs.py @@ -1,7 +1,9 @@ #! /usr/bin/python2 import getopt +import re import sys +import time import MySQLdb dbhost = 'localhost' @@ -10,6 +12,76 @@ dbuser = 'hadoop' dbpassword = '' statfile = 'stat2pbs.txt' +cluster_hostname ='hador.ics.muni.cz' +realm = 'META' + +db = None +st = None + +def gen_string(job): + jobnodes = dict() + + st_local = db.cursor(MySQLdb.cursors.DictCursor) + + st_local.execute('SELECT j.*, n.host AS host FROM jobnodes j, nodes n WHERE j.nodeid = n.id AND j.jobid=%s', job['id']) + while 1: + jobnode = st_local.fetchone() + if not jobnode: + break + jobnodes[jobnode['host']] = jobnode + + st_local.close() + + log_time = time.strftime("%d/%m/%Y %H:%M:%S", time.gmtime()) + if job['status'] == 'RUNNING': + status = 'R' + elif job['status'] in ('SUCCEEDED', 'KILLED'): + status = 'C' + elif job['status'] in ('SUBMITTED', 'ACCEPTED'): + status = 'Q' + elif job['status'] in ('NEW', 'NEW_SAVING'): + status = 'T' + elif job['status'] in ('FAILED', 'UNDEFINED'): + status = 'E' + + if job['status'] == 'SUCCEEDED': + pbs_exit_status=' exit_status=0' + elif job['status'] in ('NEW', 'NEW_SAVING', 'SUBMITTED', 'ACCEPTED'): + pbs_exit_status='' + else: + pbs_exit_status=' exit_status=-1' + + pbs_name = '' + if job['name']: + #name = re.sub("[^a-z0-9A-Z\./\"'()]", '_', job['name']) + name = re.sub("[ :]", '_', job['name']) + pbs_name=' jobname=%s' % name + + pbs_submit = '' + pbs_qtime = '' + pbs_etime = '' + if job['submit']: + pbs_submit = ' ctime=%d' % (int(job['submit']) / 1000) + pbs_qtime = ' qtime=%d' % (int(job['submit']) / 1000) + pbs_etime = ' etime=%d' % (int(job['submit']) / 1000) + + pbs_exec_host = '' + if jobnodes: + joblist = jobnodes.keys() + joblist.sort() + exec_host=str.join("+", joblist) + pbs_exec_host = ' exec_host=%s' % exec_host + + pbs_cput='' + if job['cpu_seconds']: + cput = time.strftime('%H:%M:%S', time.gmtime(job['cpu_seconds'])) + pbs_cput = ' resources_used.cput=%s' % cput + + walltime = time.strftime('%H:%M:%S', time.gmtime((long(job['finish']) - long(job['start'])) / 1000)) + + return '%s;%s;%s.%s;user=%s group=hadoop%s queue=%s%s%s%s start=%d end=%d owner=%s@%s%s%s%s resources_used.walltime=%s' % (log_time, status, job['id'], cluster_hostname, job['user'], pbs_name, job['queue'], pbs_submit, pbs_qtime, pbs_etime, int(job['start']) / 1000, int(job['finish']) / 1000, job['user'], realm, pbs_exec_host, pbs_exit_status, pbs_cput, walltime) + + try: opts, args = getopt.getopt(sys.argv[1:], 'hc:s:', ['help', 'config=', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'statfile=' ]) except getopt.GetoptError: @@ -70,20 +142,19 @@ print 'Status file: %s' % statfile print 'Timestamp: %s' % ts db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname) -st = db.cursor() +st = db.cursor(MySQLdb.cursors.DictCursor) # beware of actually running second - rather get only >5 seconds older changes if ts: - st.execute('SELECT id, name, user, changed FROM job WHERE changed > %s AND TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY CHANGED', ts) + st.execute('SELECT * FROM jobs WHERE changed > %s AND TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY CHANGED', ts) else: - st.execute('SELECT id, name, user, changed FROM job WHERE TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY changed') + st.execute('SELECT * FROM jobs WHERE TIMESTAMPDIFF(SECOND,changed,NOW()) > 5 ORDER BY changed') while 1: - data = st.fetchone() - if data: - print '%s, %s, %s' % (data[0], data[2], data[3]) - ts = data[3] - else: + job = st.fetchone() + if not job: break + ts = job['changed'] + print gen_string(job) db.close() -- 1.8.2.3