j = json.loads(s)
- if debug:
+ if debug >= 2:
+ print '# %s%s' % (base_url, url)
+ if debug >= 3:
print json.dumps(j, indent=4)
return j
try:
- opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'jobid=', 'mapred=', 'yarn=', 'ssl'])
+ opts, args = getopt.getopt(sys.argv[1:], 'hb:c:d:j:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug=', 'jobid=', 'mapred=', 'yarn=', 'ssl'])
except getopt.GetoptError:
print 'Args error'
sys.exit(2)
-h, --help ........ help message\n\
-b, --base ........ default hostname for YARN ans MapReduce\n\
-c, --config ...... config file\n\
- -d, --debug ....... debug output\n\
+ -d, --debug LEVEL . debug output (1=progress, 2=more, 3=json dumps)\n\
--db .............. enable database\n\
--dbhost\n\
--dbname\n\
dbuser = cfg[1]
elif cfg[0] == 'dbpassword':
dbpassword = cfg[1]
+ elif cfg[0] == 'debug':
+ debug = int(cfg[1])
elif cfg[0] == 'mapred':
base_mapred_url = cfg[1]
print cfg
elif cfg[0] == 'ssl':
ssl = cfg[1]
elif opt in ('-d', '--debug'):
- debug=1
+ debug = int(arg)
elif opt in ('--db'):
db = 1
elif opt in ('--dbhost'):
base_yarn_url = gen_url(base_yarn_url, 8088, 8090)
base_mapred_url = gen_url(base_mapred_url, 19888, 19890)
+if debug >= 1:
+ print '[MR] URL: ' + base_mapred_url
+ print '[YARN] URL: ' + base_yarn_url
+
regJob = re.compile('^job_')
regApp = re.compile('^application_')
if id:
mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs/job_%s' % id
yarn_url = base_yarn_url + '/ws/v1/cluster/apps/application_%s' % id
- print '# ' + mapred_url
- print '# ' + yarn_url
try:
j1 = get_rest(mapred_url, '')
except Exception:
j2 = None
+ counter = 0
if j1 and j1['job']:
if id not in jobs:
job = Job()
else:
job = jobs[id]
job.mapred = j1['job'];
+ counter += 1
+ if debug >= 1: print '[MR] %d jobs' % counter
+ counter = 0
if j2 and j2['app']:
if id not in jobs:
job = Job()
else:
job = jobs[id]
job.yarn = j2['app'];
-
- print jobs
+ counter += 1
+ if debug >= 1: print '[YARN] %d jobs' % counter
else:
mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs'
yarn_url = base_yarn_url + '/ws/v1/cluster/apps'
- print '# ' + mapred_url
- print '# ' + yarn_url
j1 = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs')
j2 = get_rest(base_yarn_url, '/ws/v1/cluster/apps')
+ counter = 0
if j1["jobs"]:
for j in j1["jobs"]["job"]:
id = regJob.sub('', j['id'])
else:
job = jobs[id]
job.mapred = j;
+ counter += 1
+ if debug >= 1: print '[MR] %d jobs' % counter
+ counter = 0
if j2["apps"]:
for j in j2["apps"]["app"]:
id = regApp.sub('', j['id'])
else:
job = jobs[id]
job.yarn = j;
+ counter += 1
+ if debug >= 1: print '[YARN] %d jobs' % counter
if db:
db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
job.user = job.yarn['user']
if not job.start:
job.start = job.yarn['startedTime']
- if debug: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
+ if debug >= 1: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
if not job.finish:
job.finish = job.yarn['finishedTime']
- if debug: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
+ if debug >= 1: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
print 'job %s (%d):' % (id, counter)
counter += 1
data = st.fetchone()
if data:
if data[1] == job.user and data[2] == job.status and data[4] == job.start and data[5] == job.finish:
- if debug: print '[db] job %s found' % id
+ if debug >= 2: print '[db] job %s found' % id
else:
st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id))
- if debug: print '[db] job %s updated' % id
+ if debug >= 2: print '[db] job %s updated' % id
else:
st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', '%s', '%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish)))
- if debug: print '[db] job %s inserted' % id
+ if debug >= 2: print '[db] job %s inserted' % id
if job.mapred:
if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']:
- if debug: print '[db] job %s mapred is actual' % id
+ if debug >= 2: print '[db] job %s mapred is actual' % id
else:
st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id))
- if debug: print '[db] job %s mapred updated' % id
+ if debug >= 2: print '[db] job %s mapred updated' % id
if job.yarn:
if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']:
- if debug: print '[db] job %s yarn is actual' % id
+ if debug >= 2: print '[db] job %s yarn is actual' % id
else:
st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
- if debug: print '[db] job %s yarn updated' % id
+ if debug >= 2: print '[db] job %s yarn updated' % id
if nodes:
st.execute("DELETE FROM node WHERE jobid='%s'" % id)
for nodename, node in nodes.iteritems():
st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce))
- if debug: print '[db] job %s nodes updated' % id
+ if debug >= 2: print '[db] job %s nodes updated' % id
db.commit()
print