Optimized version of analyzing nodes.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 6 Jan 2016 22:32:25 +0000 (23:32 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 6 Jan 2016 22:32:25 +0000 (23:32 +0100)
HOWTO.sh
chroust-opt.py [new file with mode: 0755]

index 8d63966..51b71c9 100755 (executable)
--- a/HOWTO.sh
+++ b/HOWTO.sh
@@ -1,6 +1,7 @@
 # !/bin/sh
 #scp -p root@hador-c1:~/STAT-nodes/nodes.dat .
-./chroust.py -i $((6*3600)) -c hador.txt > nodes.dat
+#./chroust.py -i $((6*3600)) -c hador.txt > nodes.dat
+./chroust-opt.py -i $((6*3600)) -c hador.txt > nodes.dat
 ./nodes.dem
 ./nodes-sum.dem
 
diff --git a/chroust-opt.py b/chroust-opt.py
new file mode 100755 (executable)
index 0000000..0bfefa7
--- /dev/null
@@ -0,0 +1,228 @@
+#! /usr/bin/python2
+
+#
+# Generate data of usage on each node.
+#
+# Maximal theoretical usage of the node in each interval is the limit number of
+# containers on that node.
+#
+
+import getopt
+import re
+import sys
+import datetime
+import MySQLdb
+import MySQLdb.cursors
+
+dbhost = 'localhost'
+dbname = 'bookkeeping'
+dbuser = 'bookkeeping'
+dbpassword = ''
+debug = 0
+interval = 24 * 3600
+min = None
+max = None
+header = 1
+
+db = None
+st = None
+nodes = dict()
+
+def strptime(val):
+       epoch = long(val) / 1000
+       frag = long(val) % 1000
+
+       date = datetime.datetime.utcfromtimestamp(epoch)
+       return date.replace(microsecond=1000*frag)
+
+
+def strp(val):
+       return strptime(val).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+
+
+def time_range(start, finish, imax = None):
+       istart = int(float(start) / 1000.0 / interval)
+       ifinish = imax
+       if finish == -1:
+               # TODO: whole interval
+               ifinish = istart + 1
+       elif finish < 0:
+               assert(finish < 0)
+       elif finish is not None and finish != 0:
+               ifinish = int((float(finish) / 1000.0 + interval - 1) / interval)
+
+       start = 1000 * interval * istart;
+       finish = 1000 * interval * ifinish;
+
+       return [istart, ifinish, start, finish]
+
+
+def time_range2(start, finish, imax = None):
+       istart = int(float(start) / 1000.0 / interval)
+       ifinish = imax
+       if finish == -1:
+               # TODO: whole interval
+               ifinish = istart
+       elif finish < 0:
+               assert(finish < 0)
+       elif finish is not None and finish != 0:
+               ifinish = int(float(finish) / 1000.0 / interval)
+
+       start = 1000 * interval * istart;
+       finish = 1000 * interval * (ifinish + 1);
+
+       return [istart, ifinish, start, finish]
+
+
+try:
+       opts, args = getopt.getopt(sys.argv[1:], 'hc:di:n:x:H', ['help', 'config=', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'interval=', 'min=', 'max=', 'no-header' ])
+except getopt.GetoptError:
+       print 'Args error'
+       sys.exit(2)
+for opt, arg in opts:
+       if opt in ['-h', '--help']:
+               print "chroust.py [OPTIONS]\n\
+OPTIONS are:\n\
+  -h, --help ............ help message\n\
+  -c, --config .......... config file\n\
+  --dbhost\n\
+  --dbname\n\
+  --dbuser\n\
+  -d, --debug\n\
+  --dbpassword\n\
+  -i, --interval TIME ... time interval [s]\n\
+  -n, --min ............. start time [s] (default: autodetect)\n\
+  -x, --max ............. end time [s] (default: autodetect)\n\
+  -H, --no-header ....... don't print the header"
+               sys.exit(0)
+       elif opt in ['-c', '--config']:
+               f = open(arg, 'r')
+               for line in f:
+                       cfg=line.rstrip().split('=')
+                       if cfg[0] == 'dbhost':
+                               dbhost = cfg[1]
+                       elif cfg[0] == 'dbname':
+                               dbname = cfg[1]
+                       elif cfg[0] == 'dbuser':
+                               dbuser = cfg[1]
+                       elif cfg[0] == 'dbpassword':
+                               dbpassword = cfg[1]
+               f.close()
+       elif opt in ['--dbhost']:
+               dbhost = arg
+       elif opt in ['--dbname']:
+               dbname = arg
+       elif opt in ['--dbuser']:
+               dbuser = arg
+       elif opt in ['--dbpassword']:
+               dbpassword = arg
+       elif opt in ['-d', '--debug']:
+               debug = 1
+       elif opt in ['-i', '--interval']:
+               interval = long(arg)
+       elif opt in ['-n', '--min']:
+               min = long(arg) * 1000
+       elif opt in ['-x', '--max']:
+               max = long(arg) * 1000
+       elif opt in ['-H', '--no-header']:
+               header = 0
+       else:
+               print 'Args error'
+               sys.exit(2)
+
+db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
+st = db.cursor(MySQLdb.cursors.SSDictCursor)
+
+data = st.execute('SELECT id, host FROM nodes')
+while 1:
+       data = st.fetchone()
+       if not data:
+               break
+       nodes[data['id']] = data['host']
+st.close()
+
+
+if not min or not max:
+       st = db.cursor(MySQLdb.cursors.SSDictCursor)
+       st.execute('SELECT MIN(start) AS min, MAX(finish) AS max FROM subjobs')
+       data = st.fetchone()
+       st.close()
+if not min:
+       min = data['min']
+if not max:
+       max = data['max']
+
+if debug:
+       print 'Min: %s (%s)' % (min, strp(min))
+       print 'Max: %s (%s)' % (max, strp(max))
+       print 'Interval: %s s' % interval
+
+imin, imax, min, max = time_range(min, max)
+
+if debug:
+       print 'Range: %s ... %s (%d intervals)' % (strp(min), strp(max), imax - imin)
+
+if header:
+       usages = list()
+       usages.append('time')
+       usages.append('mapred')
+       for nodeid in sorted(nodes.keys()):
+               usages.append(nodes[nodeid])
+       print "\t".join(usages)
+
+nodes_usage = dict()
+nodes_subjobs = dict()
+for nodeid in sorted(nodes.keys()):
+       nodes_usage[nodeid] = list(0 for i in range(0, imax - imin))
+       nodes_subjobs[nodeid] = list(0 for i in range(0, imax - imin))
+
+# SELECT * FROM subjobs WHERE (finish >= %s OR finish = 0 OR finish IS NULL) AND start <= %s', (start, end)
+st = db.cursor(MySQLdb.cursors.SSDictCursor)
+st.execute('SELECT * FROM subjobs ORDER BY start')
+data = st.fetchone()
+while data is not None:
+       if data['finish'] == -1:
+               data = st.fetchone()
+               continue
+       ## if to define unknown subjobs as 30 seconds long
+       #if data['finish'] < 0:
+       #       data['finish'] = data['start'] + 30 * 1000;
+
+       istart, ifinish, start, finish = time_range2(data['start'], data['finish'], imax)
+
+       for i in range(istart, ifinish + 1):
+               coverage = 1000.0 * interval
+               if i == istart:
+                       coverage -= (data['start'] - start)
+               if i == ifinish:
+                       if finish is None or finish == 0:
+                               # running
+                               pass
+                       elif finish < 0:
+                               # failed, unknown finish
+                               pass
+                       else:
+                               coverage -= (finish - data['finish'])
+               assert(coverage > 0.0)
+               usage = float(coverage) / 1000.0 / interval
+
+               #if data['id'] in ['1425399410099_2575_m_000001_0', '1425399410099_2575_m_000000_0', '1425399410099_2575_r_000000_0']:
+               #       print '##### int: %d, cov: %s, index: %s..%s, data: %s..%s, range: %s..%s' % (i, coverage, istart, ifinish, data['start'], data['finish'], start, finish)
+               #       print '##### int: %d, cutstart: %s, cutend: %s' % (i, data['start'] - start, finish - data['finish'])
+
+               nodes_usage[data['nodeid']][i - imin] += usage;
+               nodes_subjobs[data['nodeid']][i - imin] += 1;
+
+       data = st.fetchone()
+
+st.close()
+db.close()
+
+for i in range(0, imax - imin):
+       usages = []
+       subjobs = 0
+       for nodeid in sorted(nodes.keys()):
+               usages.append("%s" % nodes_usage[nodeid][i])
+               subjobs += nodes_subjobs[nodeid][i]
+       #print '#%s' % (int(i * 1000.0 * interval + min))
+       print '%s\t%d\t%s' % (strp(i * 1000 * interval + min), subjobs, "\t".join(usages))