Usage statistics of each node mined from the subjobs (map/reduce tasks).
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 23:34:39 +0000 (00:34 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 23:34:39 +0000 (00:34 +0100)
chroust.py [new file with mode: 0755]

diff --git a/chroust.py b/chroust.py
new file mode 100755 (executable)
index 0000000..ffaa02b
--- /dev/null
@@ -0,0 +1,153 @@
+#! /usr/bin/python2
+
+#
+# Generate data of usage on each node.
+#
+# Maximal theoretical usage of the node in each interval is the limit number of
+# containers on that node.
+#
+
+import getopt
+import re
+import sys
+import datetime
+import MySQLdb
+
+dbhost = 'localhost'
+dbname = 'hadoop'
+dbuser = 'hadoop'
+dbpassword = ''
+debug = 0
+interval = 24 * 3600
+
+db = None
+st = None
+nodes = dict()
+
+def strptime(val):
+       epoch = long(val) / 1000
+       frag = long(val) % 1000
+
+       date = datetime.datetime.utcfromtimestamp(epoch)
+       return date.replace(microsecond=1000*frag)
+
+
+def strp(val):
+       return strptime(val).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+
+
+try:
+       opts, args = getopt.getopt(sys.argv[1:], 'hc:di:', ['help', 'config=', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'interval=' ])
+except getopt.GetoptError:
+       print 'Args error'
+       sys.exit(2)
+for opt, arg in opts:
+       if opt in ('-h', '--help'):
+               print "chroust.py [OPTIONS]\n\
+OPTIONS are:\n\
+  -h, --help ............ help message\n\
+  -c, --config .......... config file\n\
+  --dbhost\n\
+  --dbname\n\
+  --dbuser\n\
+  -d, --debug\n\
+  --dbpassword\n\
+  -i, --interval TIME ... time interval"
+               sys.exit(0)
+       elif opt in ('-c', '--config'):
+               f = open(arg, 'r')
+               for line in f:
+                       cfg=line.rstrip().split('=')
+                       if cfg[0] == 'dbhost':
+                               dbhost = cfg[1]
+                       elif cfg[0] == 'dbname':
+                               dbname = cfg[1]
+                       elif cfg[0] == 'dbuser':
+                               dbuser = cfg[1]
+                       elif cfg[0] == 'dbpassword':
+                               dbpassword = cfg[1]
+               f.close()
+       elif opt in ('--dbhost'):
+               dbhost = arg
+       elif opt in ('--dbname'):
+               dbname = arg
+       elif opt in ('--dbuser'):
+               dbuser = arg
+       elif opt in ('--dbpassword'):
+               dbpassword = arg
+       elif opt in ('-d', '--debug'):
+               debug = int(arg)
+       elif opt in ('-i', '--interval'):
+               interval = long(arg)
+       else:
+               print 'Args error'
+               sys.exit(2)
+
+db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
+st = db.cursor(MySQLdb.cursors.DictCursor)
+
+data = st.execute('SELECT id, host FROM nodes')
+while 1:
+       data = st.fetchone()
+       if not data:
+               break
+       nodes[data['id']] = data['host']
+
+
+st.execute('SELECT MIN(start) AS min, MAX(finish) AS max FROM subjobs')
+data = st.fetchone()
+min = data['min']
+max = data['max']
+
+if debug:
+       print 'Min: %s' % strp(min)
+       print 'Max: %s' % strp(max)
+       print 'Interval: %s s' % interval
+
+min = 1000 * interval * (min / 1000 / interval)
+max = 1000 * interval * ((max / 1000 + interval - 1) / interval)
+
+if debug:
+       print 'Range: %s ... %s' % (strp(min), strp(max))
+
+usages = list()
+usages.append('time')
+usages.append('mapred')
+for nodeid in sorted(nodes.keys()):
+       usages.append(nodes[nodeid])
+print "\t".join(usages)
+
+i = min
+while i <= max:
+       start = i - interval * 500
+       end = i + interval * 500
+       st.execute('SELECT * FROM subjobs WHERE finish >= %s AND start <= %s', (start, end))
+       #print '%s: %d subjobs' % (strp(i), st.rowcount)
+       subjobs = st.rowcount
+       nodes_usage = dict()
+       while 1:
+               data = st.fetchone()
+               if not data:
+                       break
+               len = 1000 * interval
+               if data['start'] and data['start'] > start:
+                       len -= (data['start'] - start)
+               if data['finish'] and data['finish'] < end:
+                       len -= (end - data['finish'])
+               usage = float(len) / float(1000.0 * interval)
+               #print '%d: %s, %0.2f' % (i, data['jobid'], usage)
+               if data['nodeid'] not in nodes_usage.keys():
+                       nodes_usage[data['nodeid']] = 0.0
+               nodes_usage[data['nodeid']] += usage;
+
+       usages = list()
+       for nodeid in sorted(nodes.keys()):
+               if nodeid in nodes_usage.keys():
+                       usages.append("%s" % nodes_usage[nodeid])
+               else:
+                       usages.append("0")
+       print '%s\t%d\t%s' % (strp(i), subjobs, "\t".join(usages))
+
+       i += 1000 * interval
+
+db.close()