From: František Dvořák Date: Wed, 4 Mar 2015 16:40:16 +0000 (+0100) Subject: Support database. X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=e726ab1a6d49065e6ccc805fa17d22c5d2f871af;p=hadoop-jobstat.git Support database. --- diff --git a/jobs.py b/jobs.py index 120b195..359d629 100755 --- a/jobs.py +++ b/jobs.py @@ -8,14 +8,28 @@ import sys import getopt import socket +import MySQLdb + class Job: status = None user = None mapred = None yarn = None + start = None + finish = None + +class Node: + elapsed = 0 + map = 0 + reduce = 0 base_mapred_url='' base_yarn_url='' +db = None +dbhost = 'localhost' +dbname = 'hadoop' +dbuser = 'hadoop' +dbpassword = '' debug=0 ssl=0 details=1 @@ -70,14 +84,43 @@ def gen_url(base_url, port_nossl, port_ssl): return base_url +def dbint(s): + if s is None: + return 'NULL' + else: + return s + + +def dbstr(s): + if s is None: + return 'NULL' + else: + return "'" + s + "'" + + try: - opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'debug', 'jobs=', 'mapred=', 'yarn=', 'ssl']) + opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'jobid=', 'mapred=', 'yarn=', 'ssl']) except getopt.GetoptError: print 'Args error' sys.exit(2) for opt, arg in opts: if opt in ('-h', '--help'): - print('jobs.py [-h|--help] [-b|--base] [-c|--config] [-d|--debug] [-j|--jobs] [-m|--mapred URL] [-y|--yarn URL] [-s|--ssl]') + print "jobs.py [OPTIONS]\n\ +OPTIONS are:\n\ + -h, --help ........ help message\n\ + -b, --base ........ default hostname for YARN ans MapReduce\n\ + -c, --config ...... config file\n\ + -d, --debug ....... debug output\n\ + --db .............. enable database\n\ + --dbhost\n\ + --dbname\n\ + --dbuser\n\ + --dbpassword\n\ + -j, --jobid ....... single job query istead of list all\n\ + -m, --mapred URL .. MapReduce Job History server\n\ + -y, --yarn URL .... YARN Resource Manager\n\ + -s, --ssl ......... enable default SSL schema and ports" + sys.exit(0) elif opt in ('-b', '--base'): host = arg @@ -87,7 +130,17 @@ for opt, arg in opts: cfg=line.rstrip().split('=') if cfg[0] == 'base': host = cfg[1] - if cfg[0] == 'mapred': + elif cfg[0] == 'dbhost': + dbhost = cfg[1] + elif cfg[0] == 'db': + db = 1 + elif cfg[0] == 'dbname': + dbname = cfg[1] + elif cfg[0] == 'dbuser': + dbuser = cfg[1] + elif cfg[0] == 'dbpassword': + dbpassword = cfg[1] + elif cfg[0] == 'mapred': base_mapred_url = cfg[1] print cfg elif cfg[0] == 'yarn': @@ -97,7 +150,17 @@ for opt, arg in opts: ssl = cfg[1] elif opt in ('-d', '--debug'): debug=1 - elif opt in ('-j', '--jobs'): + elif opt in ('--db'): + db = 1 + elif opt in ('--dbhost'): + dbhost = arg + elif opt in ('--dbname'): + dbname = arg + elif opt in ('--dbuser'): + dbuser = arg + elif opt in ('--dbpassword'): + dbpassword = arg + elif opt in ('-j', '--jobid'): id = arg elif opt in ('-m', '--mapred'): base_mapred_url = arg @@ -140,8 +203,6 @@ if id: else: job = jobs[id] job.mapred = j1['job']; - job.status = job.mapred['state'] - job.user = job.mapred['user'] if j2 and j2['app']: if id not in jobs: @@ -150,10 +211,6 @@ if id: else: job = jobs[id] job.yarn = j2['app']; - if not job.status: - job.status = job.yarn['finalStatus'] - if not job.user: - job.user = job.yarn['user'] print jobs else: @@ -174,8 +231,6 @@ else: else: job = jobs[id] job.mapred = j; - job.status = job.mapred['state'] - job.user = job.mapred['user'] if j2["apps"]: for j in j2["apps"]["app"]: @@ -186,15 +241,33 @@ else: else: job = jobs[id] job.yarn = j; - if not job.status: - job.status = job.yarn['finalStatus'] - if not job.user: - job.user = job.yarn['user'] +if db: + db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname) + st = db.cursor() regHost = re.compile(':\d+') counter=1 for id, job in jobs.iteritems(): + + # merge MapRed and YARN + if job.mapred: + job.status = job.mapred['state'] + job.user = job.mapred['user'] + job.start = job.mapred['startTime'] + job.finish = job.mapred['finishTime'] + if job.yarn: + if not job.status: + job.status = job.yarn['finalStatus'] + if not job.user: + job.user = job.yarn['user'] + if not job.start: + job.start = job.yarn['startedTime'] + if debug: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start) + if not job.finish: + job.finish = job.yarn['finishedTime'] + if debug: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish) + print 'job %s (%d):' % (id, counter) counter += 1 print ' status: %s' % job.status @@ -217,21 +290,61 @@ for id, job in jobs.iteritems(): for attempt in a['taskAttempts']['taskAttempt']: nodeHost = regHost.sub('', attempt['nodeHttpAddress']) if nodeHost not in nodes: - nodes[nodeHost] = 0 - nodes[nodeHost] += attempt['elapsedTime'] + nodes[nodeHost] = Node() + nodes[nodeHost].elapsed += attempt['elapsedTime'] + if attempt['type'] == 'MAP': + nodes[nodeHost].map += 1 + elif attempt['type'] == 'REDUCE': + nodes[nodeHost].reduce += 1 + else: + raise Exception('unknown type %s' % attempt['type']) # # print 'tasks elapsed: %d' % aggregate aggregate=0 - for node, elapsed in nodes.iteritems(): - print ' node %s: %d' % (node, elapsed) - aggregate += elapsed + for nodename, node in nodes.iteritems(): + print ' node %s: %d' % (nodename, node.elapsed) + aggregate += node.elapsed print ' ==> aggregated %d' % aggregate if job.yarn: print ' MB x s: %d' % job.yarn['memorySeconds'] print ' CPU x s: %d' % job.yarn['vcoreSeconds'] + if db: + st.execute("SELECT id, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'" % id) + data = st.fetchone() + if data: + if data[1] == job.user and data[2] == job.status and data[4] == job.start and data[5] == job.finish: + if debug: print '[db] job %s found' % id + else: + st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id)) + if debug: print '[db] job %s updated' % id + + else: + st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish))) + if debug: print '[db] job %s inserted' % id + if job.mapred: + if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']: + if debug: print '[db] job %s mapred is actual' % id + else: + st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id)) + if debug: print '[db] job %s mapred updated' % id + if job.yarn: + if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']: + if debug: print '[db] job %s yarn is actual' % id + else: + st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id)) + if debug: print '[db] job %s yarn updated' % id + if nodes: + st.execute("DELETE FROM node WHERE jobid='%s'" % id) + for nodename, node in nodes.iteritems(): + st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce)) + if debug: print '[db] job %s nodes updated' % id + db.commit() + print -c.close() +if db: + db.close() +c.close()