From: František Dvořák Date: Thu, 5 Mar 2015 23:34:39 +0000 (+0100) Subject: Usage statistics of each node mined from the subjobs (map/reduce tasks). X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=5f63713b7e1429677261381d066acb56f28c5eb2;p=hadoop-jobstat.git Usage statistics of each node mined from the subjobs (map/reduce tasks). --- diff --git a/chroust.py b/chroust.py new file mode 100755 index 0000000..ffaa02b --- /dev/null +++ b/chroust.py @@ -0,0 +1,153 @@ +#! /usr/bin/python2 + +# +# Generate data of usage on each node. +# +# Maximal theoretical usage of the node in each interval is the limit number of +# containers on that node. +# + +import getopt +import re +import sys +import datetime +import MySQLdb + +dbhost = 'localhost' +dbname = 'hadoop' +dbuser = 'hadoop' +dbpassword = '' +debug = 0 +interval = 24 * 3600 + +db = None +st = None +nodes = dict() + +def strptime(val): + epoch = long(val) / 1000 + frag = long(val) % 1000 + + date = datetime.datetime.utcfromtimestamp(epoch) + return date.replace(microsecond=1000*frag) + + +def strp(val): + return strptime(val).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + +try: + opts, args = getopt.getopt(sys.argv[1:], 'hc:di:', ['help', 'config=', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'interval=' ]) +except getopt.GetoptError: + print 'Args error' + sys.exit(2) +for opt, arg in opts: + if opt in ('-h', '--help'): + print "chroust.py [OPTIONS]\n\ +OPTIONS are:\n\ + -h, --help ............ help message\n\ + -c, --config .......... config file\n\ + --dbhost\n\ + --dbname\n\ + --dbuser\n\ + -d, --debug\n\ + --dbpassword\n\ + -i, --interval TIME ... time interval" + sys.exit(0) + elif opt in ('-c', '--config'): + f = open(arg, 'r') + for line in f: + cfg=line.rstrip().split('=') + if cfg[0] == 'dbhost': + dbhost = cfg[1] + elif cfg[0] == 'dbname': + dbname = cfg[1] + elif cfg[0] == 'dbuser': + dbuser = cfg[1] + elif cfg[0] == 'dbpassword': + dbpassword = cfg[1] + f.close() + elif opt in ('--dbhost'): + dbhost = arg + elif opt in ('--dbname'): + dbname = arg + elif opt in ('--dbuser'): + dbuser = arg + elif opt in ('--dbpassword'): + dbpassword = arg + elif opt in ('-d', '--debug'): + debug = int(arg) + elif opt in ('-i', '--interval'): + interval = long(arg) + else: + print 'Args error' + sys.exit(2) + +db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname) +st = db.cursor(MySQLdb.cursors.DictCursor) + +data = st.execute('SELECT id, host FROM nodes') +while 1: + data = st.fetchone() + if not data: + break + nodes[data['id']] = data['host'] + + +st.execute('SELECT MIN(start) AS min, MAX(finish) AS max FROM subjobs') +data = st.fetchone() +min = data['min'] +max = data['max'] + +if debug: + print 'Min: %s' % strp(min) + print 'Max: %s' % strp(max) + print 'Interval: %s s' % interval + +min = 1000 * interval * (min / 1000 / interval) +max = 1000 * interval * ((max / 1000 + interval - 1) / interval) + +if debug: + print 'Range: %s ... %s' % (strp(min), strp(max)) + +usages = list() +usages.append('time') +usages.append('mapred') +for nodeid in sorted(nodes.keys()): + usages.append(nodes[nodeid]) +print "\t".join(usages) + +i = min +while i <= max: + start = i - interval * 500 + end = i + interval * 500 + st.execute('SELECT * FROM subjobs WHERE finish >= %s AND start <= %s', (start, end)) + #print '%s: %d subjobs' % (strp(i), st.rowcount) + subjobs = st.rowcount + nodes_usage = dict() + while 1: + data = st.fetchone() + if not data: + break + len = 1000 * interval + if data['start'] and data['start'] > start: + len -= (data['start'] - start) + if data['finish'] and data['finish'] < end: + len -= (end - data['finish']) + usage = float(len) / float(1000.0 * interval) + #print '%d: %s, %0.2f' % (i, data['jobid'], usage) + if data['nodeid'] not in nodes_usage.keys(): + nodes_usage[data['nodeid']] = 0.0 + nodes_usage[data['nodeid']] += usage; + + usages = list() + for nodeid in sorted(nodes.keys()): + if nodeid in nodes_usage.keys(): + usages.append("%s" % nodes_usage[nodeid]) + else: + usages.append("0") + print '%s\t%d\t%s' % (strp(i), subjobs, "\t".join(usages)) + + i += 1000 * interval + +db.close()