import getopt
import socket
+import MySQLdb
+
class Job:
status = None
user = None
mapred = None
yarn = None
+ start = None
+ finish = None
+
+class Node:
+ elapsed = 0
+ map = 0
+ reduce = 0
base_mapred_url=''
base_yarn_url=''
+db = None
+dbhost = 'localhost'
+dbname = 'hadoop'
+dbuser = 'hadoop'
+dbpassword = ''
debug=0
ssl=0
details=1
return base_url
+def dbint(s):
+ if s is None:
+ return 'NULL'
+ else:
+ return s
+
+
+def dbstr(s):
+ if s is None:
+ return 'NULL'
+ else:
+ return "'" + s + "'"
+
+
try:
- opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'debug', 'jobs=', 'mapred=', 'yarn=', 'ssl'])
+ opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'jobid=', 'mapred=', 'yarn=', 'ssl'])
except getopt.GetoptError:
print 'Args error'
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
- print('jobs.py [-h|--help] [-b|--base] [-c|--config] [-d|--debug] [-j|--jobs] [-m|--mapred URL] [-y|--yarn URL] [-s|--ssl]')
+ print "jobs.py [OPTIONS]\n\
+OPTIONS are:\n\
+ -h, --help ........ help message\n\
+ -b, --base ........ default hostname for YARN ans MapReduce\n\
+ -c, --config ...... config file\n\
+ -d, --debug ....... debug output\n\
+ --db .............. enable database\n\
+ --dbhost\n\
+ --dbname\n\
+ --dbuser\n\
+ --dbpassword\n\
+ -j, --jobid ....... single job query istead of list all\n\
+ -m, --mapred URL .. MapReduce Job History server\n\
+ -y, --yarn URL .... YARN Resource Manager\n\
+ -s, --ssl ......... enable default SSL schema and ports"
+
sys.exit(0)
elif opt in ('-b', '--base'):
host = arg
cfg=line.rstrip().split('=')
if cfg[0] == 'base':
host = cfg[1]
- if cfg[0] == 'mapred':
+ elif cfg[0] == 'dbhost':
+ dbhost = cfg[1]
+ elif cfg[0] == 'db':
+ db = 1
+ elif cfg[0] == 'dbname':
+ dbname = cfg[1]
+ elif cfg[0] == 'dbuser':
+ dbuser = cfg[1]
+ elif cfg[0] == 'dbpassword':
+ dbpassword = cfg[1]
+ elif cfg[0] == 'mapred':
base_mapred_url = cfg[1]
print cfg
elif cfg[0] == 'yarn':
ssl = cfg[1]
elif opt in ('-d', '--debug'):
debug=1
- elif opt in ('-j', '--jobs'):
+ elif opt in ('--db'):
+ db = 1
+ elif opt in ('--dbhost'):
+ dbhost = arg
+ elif opt in ('--dbname'):
+ dbname = arg
+ elif opt in ('--dbuser'):
+ dbuser = arg
+ elif opt in ('--dbpassword'):
+ dbpassword = arg
+ elif opt in ('-j', '--jobid'):
id = arg
elif opt in ('-m', '--mapred'):
base_mapred_url = arg
else:
job = jobs[id]
job.mapred = j1['job'];
- job.status = job.mapred['state']
- job.user = job.mapred['user']
if j2 and j2['app']:
if id not in jobs:
else:
job = jobs[id]
job.yarn = j2['app'];
- if not job.status:
- job.status = job.yarn['finalStatus']
- if not job.user:
- job.user = job.yarn['user']
print jobs
else:
else:
job = jobs[id]
job.mapred = j;
- job.status = job.mapred['state']
- job.user = job.mapred['user']
if j2["apps"]:
for j in j2["apps"]["app"]:
else:
job = jobs[id]
job.yarn = j;
- if not job.status:
- job.status = job.yarn['finalStatus']
- if not job.user:
- job.user = job.yarn['user']
+if db:
+ db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
+ st = db.cursor()
regHost = re.compile(':\d+')
counter=1
for id, job in jobs.iteritems():
+
+ # merge MapRed and YARN
+ if job.mapred:
+ job.status = job.mapred['state']
+ job.user = job.mapred['user']
+ job.start = job.mapred['startTime']
+ job.finish = job.mapred['finishTime']
+ if job.yarn:
+ if not job.status:
+ job.status = job.yarn['finalStatus']
+ if not job.user:
+ job.user = job.yarn['user']
+ if not job.start:
+ job.start = job.yarn['startedTime']
+ if debug: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
+ if not job.finish:
+ job.finish = job.yarn['finishedTime']
+ if debug: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
+
print 'job %s (%d):' % (id, counter)
counter += 1
print ' status: %s' % job.status
for attempt in a['taskAttempts']['taskAttempt']:
nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
if nodeHost not in nodes:
- nodes[nodeHost] = 0
- nodes[nodeHost] += attempt['elapsedTime']
+ nodes[nodeHost] = Node()
+ nodes[nodeHost].elapsed += attempt['elapsedTime']
+ if attempt['type'] == 'MAP':
+ nodes[nodeHost].map += 1
+ elif attempt['type'] == 'REDUCE':
+ nodes[nodeHost].reduce += 1
+ else:
+ raise Exception('unknown type %s' % attempt['type'])
# # print 'tasks elapsed: %d' % aggregate
aggregate=0
- for node, elapsed in nodes.iteritems():
- print ' node %s: %d' % (node, elapsed)
- aggregate += elapsed
+ for nodename, node in nodes.iteritems():
+ print ' node %s: %d' % (nodename, node.elapsed)
+ aggregate += node.elapsed
print ' ==> aggregated %d' % aggregate
if job.yarn:
print ' MB x s: %d' % job.yarn['memorySeconds']
print ' CPU x s: %d' % job.yarn['vcoreSeconds']
+ if db:
+ st.execute("SELECT id, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'" % id)
+ data = st.fetchone()
+ if data:
+ if data[1] == job.user and data[2] == job.status and data[4] == job.start and data[5] == job.finish:
+ if debug: print '[db] job %s found' % id
+ else:
+ st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id))
+ if debug: print '[db] job %s updated' % id
+
+ else:
+ st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish)))
+ if debug: print '[db] job %s inserted' % id
+ if job.mapred:
+ if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']:
+ if debug: print '[db] job %s mapred is actual' % id
+ else:
+ st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id))
+ if debug: print '[db] job %s mapred updated' % id
+ if job.yarn:
+ if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']:
+ if debug: print '[db] job %s yarn is actual' % id
+ else:
+ st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
+ if debug: print '[db] job %s yarn updated' % id
+ if nodes:
+ st.execute("DELETE FROM node WHERE jobid='%s'" % id)
+ for nodename, node in nodes.iteritems():
+ st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce))
+ if debug: print '[db] job %s nodes updated' % id
+ db.commit()
+
print
-c.close()
+if db:
+ db.close()
+c.close()