--- /dev/null
+#! /usr/bin/python2
+
+#
+# Generate data of usage on each node.
+#
+# Maximal theoretical usage of the node in each interval is the limit number of
+# containers on that node.
+#
+
+import getopt
+import re
+import sys
+import datetime
+import MySQLdb
+import MySQLdb.cursors
+
+dbhost = 'localhost'
+dbname = 'bookkeeping'
+dbuser = 'bookkeeping'
+dbpassword = ''
+debug = 0
+interval = 24 * 3600
+min = None
+max = None
+header = 1
+
+db = None
+st = None
+nodes = dict()
+
+def strptime(val):
+ epoch = long(val) / 1000
+ frag = long(val) % 1000
+
+ date = datetime.datetime.utcfromtimestamp(epoch)
+ return date.replace(microsecond=1000*frag)
+
+
+def strp(val):
+ return strptime(val).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
+
+
+def time_range(start, finish, imax = None):
+ istart = int(float(start) / 1000.0 / interval)
+ ifinish = imax
+ if finish == -1:
+ # TODO: whole interval
+ ifinish = istart + 1
+ elif finish < 0:
+ assert(finish < 0)
+ elif finish is not None and finish != 0:
+ ifinish = int((float(finish) / 1000.0 + interval - 1) / interval)
+
+ start = 1000 * interval * istart;
+ finish = 1000 * interval * ifinish;
+
+ return [istart, ifinish, start, finish]
+
+
+def time_range2(start, finish, imax = None):
+ istart = int(float(start) / 1000.0 / interval)
+ ifinish = imax
+ if finish == -1:
+ # TODO: whole interval
+ ifinish = istart
+ elif finish < 0:
+ assert(finish < 0)
+ elif finish is not None and finish != 0:
+ ifinish = int(float(finish) / 1000.0 / interval)
+
+ start = 1000 * interval * istart;
+ finish = 1000 * interval * (ifinish + 1);
+
+ return [istart, ifinish, start, finish]
+
+
+try:
+ opts, args = getopt.getopt(sys.argv[1:], 'hc:di:n:x:H', ['help', 'config=', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'interval=', 'min=', 'max=', 'no-header' ])
+except getopt.GetoptError:
+ print 'Args error'
+ sys.exit(2)
+for opt, arg in opts:
+ if opt in ['-h', '--help']:
+ print "chroust.py [OPTIONS]\n\
+OPTIONS are:\n\
+ -h, --help ............ help message\n\
+ -c, --config .......... config file\n\
+ --dbhost\n\
+ --dbname\n\
+ --dbuser\n\
+ -d, --debug\n\
+ --dbpassword\n\
+ -i, --interval TIME ... time interval [s]\n\
+ -n, --min ............. start time [s] (default: autodetect)\n\
+ -x, --max ............. end time [s] (default: autodetect)\n\
+ -H, --no-header ....... don't print the header"
+ sys.exit(0)
+ elif opt in ['-c', '--config']:
+ f = open(arg, 'r')
+ for line in f:
+ cfg=line.rstrip().split('=')
+ if cfg[0] == 'dbhost':
+ dbhost = cfg[1]
+ elif cfg[0] == 'dbname':
+ dbname = cfg[1]
+ elif cfg[0] == 'dbuser':
+ dbuser = cfg[1]
+ elif cfg[0] == 'dbpassword':
+ dbpassword = cfg[1]
+ f.close()
+ elif opt in ['--dbhost']:
+ dbhost = arg
+ elif opt in ['--dbname']:
+ dbname = arg
+ elif opt in ['--dbuser']:
+ dbuser = arg
+ elif opt in ['--dbpassword']:
+ dbpassword = arg
+ elif opt in ['-d', '--debug']:
+ debug = 1
+ elif opt in ['-i', '--interval']:
+ interval = long(arg)
+ elif opt in ['-n', '--min']:
+ min = long(arg) * 1000
+ elif opt in ['-x', '--max']:
+ max = long(arg) * 1000
+ elif opt in ['-H', '--no-header']:
+ header = 0
+ else:
+ print 'Args error'
+ sys.exit(2)
+
+db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
+st = db.cursor(MySQLdb.cursors.SSDictCursor)
+
+data = st.execute('SELECT id, host FROM nodes')
+while 1:
+ data = st.fetchone()
+ if not data:
+ break
+ nodes[data['id']] = data['host']
+st.close()
+
+
+if not min or not max:
+ st = db.cursor(MySQLdb.cursors.SSDictCursor)
+ st.execute('SELECT MIN(start) AS min, MAX(finish) AS max FROM subjobs')
+ data = st.fetchone()
+ st.close()
+if not min:
+ min = data['min']
+if not max:
+ max = data['max']
+
+if debug:
+ print 'Min: %s (%s)' % (min, strp(min))
+ print 'Max: %s (%s)' % (max, strp(max))
+ print 'Interval: %s s' % interval
+
+imin, imax, min, max = time_range(min, max)
+
+if debug:
+ print 'Range: %s ... %s (%d intervals)' % (strp(min), strp(max), imax - imin)
+
+if header:
+ usages = list()
+ usages.append('time')
+ usages.append('mapred')
+ for nodeid in sorted(nodes.keys()):
+ usages.append(nodes[nodeid])
+ print "\t".join(usages)
+
+nodes_usage = dict()
+nodes_subjobs = dict()
+for nodeid in sorted(nodes.keys()):
+ nodes_usage[nodeid] = list(0 for i in range(0, imax - imin))
+ nodes_subjobs[nodeid] = list(0 for i in range(0, imax - imin))
+
+# SELECT * FROM subjobs WHERE (finish >= %s OR finish = 0 OR finish IS NULL) AND start <= %s', (start, end)
+st = db.cursor(MySQLdb.cursors.SSDictCursor)
+st.execute('SELECT * FROM subjobs ORDER BY start')
+data = st.fetchone()
+while data is not None:
+ if data['finish'] == -1:
+ data = st.fetchone()
+ continue
+ ## if to define unknown subjobs as 30 seconds long
+ #if data['finish'] < 0:
+ # data['finish'] = data['start'] + 30 * 1000;
+
+ istart, ifinish, start, finish = time_range2(data['start'], data['finish'], imax)
+
+ for i in range(istart, ifinish + 1):
+ coverage = 1000.0 * interval
+ if i == istart:
+ coverage -= (data['start'] - start)
+ if i == ifinish:
+ if finish is None or finish == 0:
+ # running
+ pass
+ elif finish < 0:
+ # failed, unknown finish
+ pass
+ else:
+ coverage -= (finish - data['finish'])
+ assert(coverage > 0.0)
+ usage = float(coverage) / 1000.0 / interval
+
+ #if data['id'] in ['1425399410099_2575_m_000001_0', '1425399410099_2575_m_000000_0', '1425399410099_2575_r_000000_0']:
+ # print '##### int: %d, cov: %s, index: %s..%s, data: %s..%s, range: %s..%s' % (i, coverage, istart, ifinish, data['start'], data['finish'], start, finish)
+ # print '##### int: %d, cutstart: %s, cutend: %s' % (i, data['start'] - start, finish - data['finish'])
+
+ nodes_usage[data['nodeid']][i - imin] += usage;
+ nodes_subjobs[data['nodeid']][i - imin] += 1;
+
+ data = st.fetchone()
+
+st.close()
+db.close()
+
+for i in range(0, imax - imin):
+ usages = []
+ subjobs = 0
+ for nodeid in sorted(nodes.keys()):
+ usages.append("%s" % nodes_usage[nodeid][i])
+ subjobs += nodes_subjobs[nodeid][i]
+ #print '#%s' % (int(i * 1000.0 * interval + min))
+ print '%s\t%d\t%s' % (strp(i * 1000 * interval + min), subjobs, "\t".join(usages))