import getopt
import socket
-base_jhs_url=''
-base_rm_url=''
+class Job:
+ status = None
+ user = None
+ mapred = None
+ yarn = None
+
+base_mapred_url=''
+base_yarn_url=''
debug=0
ssl=0
details=1
host=socket.getfqdn()
+id = None
+
+jobs = dict()
c = pycurl.Curl()
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
c.setopt(pycurl.USERPWD, ":")
+
def get_rest(base_url, url):
b = BytesIO()
c.setopt(pycurl.URL, base_url + url)
return j
+
+def gen_url(base_url, port_nossl, port_ssl):
+ if ssl:
+ schema = 'https://'
+ port = port_ssl
+ else:
+ schema = 'http://'
+ port = port_nossl
+
+ if not base_url:
+ base_url = host
+ if not '://' in base_url:
+ base_url = schema + base_url
+ if not re.match(r'.*:\d+$', base_url):
+ base_url = base_url + ':%d' % port
+
+ return base_url
+
+
try:
- opts, args = getopt.getopt(sys.argv[1:], 'hb:dj:r:s', ['help', 'base=', 'debug', 'base=', 'jhs-url=', 'rm-url=', 'ssl'])
+ opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'debug', 'jobs=', 'mapred=', 'yarn=', 'ssl'])
except getopt.GetoptError:
print 'Args error'
sys.exit(2)
for opt, arg in opts:
if opt in ('-h', '--help'):
- print('jobs.py [-h|--help] [-b|--base] [-d|--debug] [-j|--jhs-url URL] [-r|--rm-url URL] [-s|--ssl]')
+ print('jobs.py [-h|--help] [-b|--base] [-c|--config] [-d|--debug] [-j|--jobs] [-m|--mapred URL] [-y|--yarn URL] [-s|--ssl]')
sys.exit(0)
elif opt in ('-b', '--base'):
host = arg
+ elif opt in ('-c', '--config'):
+ f = open(arg, 'r')
+ for line in f:
+ cfg=line.rstrip().split('=')
+ if cfg[0] == 'base':
+ host = cfg[1]
+ if cfg[0] == 'mapred':
+ base_mapred_url = cfg[1]
+ print cfg
+ elif cfg[0] == 'yarn':
+ base_yarn_url = cfg[1]
+ print cfg
+ elif cfg[0] == 'ssl':
+ ssl = cfg[1]
elif opt in ('-d', '--debug'):
debug=1
- elif opt in ('-j', '--jhs-url'):
- base_jhs_url = arg
- elif opt in ('-r', '--rm-url'):
- base_rm_url = arg
+ elif opt in ('-j', '--jobs'):
+ id = arg
+ elif opt in ('-m', '--mapred'):
+ base_mapred_url = arg
+ elif opt in ('-y', '--yarn'):
+ base_yarn_url = arg
elif opt in ('-s', '--ssl'):
ssl=1
else:
print 'Args error'
sys.exit(2)
-if ssl:
- default_rm_url = "https://" + host + ":8090"
- default_jhs_url = "https://" + host + ":19890"
-else:
- default_rm_url = "http://" + host + ":8088"
- default_jhs_url = "http://" + host + ":19888"
-
-if not base_rm_url:
- base_rm_url = default_rm_url
-if not base_jhs_url:
- base_jhs_url = default_jhs_url
-
-jhs_url = base_jhs_url + '/ws/v1/history/mapreduce/jobs'
-rm_url = base_rm_url + '/ws/v1/cluster/apps'
-print '# ' + jhs_url
-print '# ' + rm_url
-
-j1 = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs')
-j2 = get_rest(base_rm_url, '/ws/v1/cluster/apps')
-
-class Job:
- status = None
- user = None
- jhs = None
- rm = None
-jobs = dict()
+base_yarn_url = gen_url(base_yarn_url, 8088, 8090)
+base_mapred_url = gen_url(base_mapred_url, 19888, 19890)
regJob = re.compile('^job_')
-if j1["jobs"]:
- for j in j1["jobs"]["job"]:
- id = regJob.sub('', j['id'])
+regApp = re.compile('^application_')
+if id:
+ if regJob.match(id): id = regJob.sub('', id)
+ if regApp.match(id): id = regApp.sub('', id)
+
+ mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs/job_%s' % id
+ yarn_url = base_yarn_url + '/ws/v1/cluster/apps/application_%s' % id
+ print '# ' + mapred_url
+ print '# ' + yarn_url
+
+ try:
+ j1 = get_rest(mapred_url, '')
+ except Exception:
+ j1 = None
+
+ try:
+ j2 = get_rest(yarn_url, '')
+ except Exception:
+ j2 = None
+
+ if j1 and j1['job']:
if id not in jobs:
job = Job()
jobs[id] = job
else:
job = jobs[id]
- job.jhs = j;
- job.status = job.jhs['state']
- job.user = job.jhs['user']
+ job.mapred = j1['job'];
+ job.status = job.mapred['state']
+ job.user = job.mapred['user']
-regApp = re.compile('^application_')
-if j2["apps"]:
- for j in j2["apps"]["app"]:
- id = regApp.sub('', j['id'])
+ if j2 and j2['app']:
if id not in jobs:
job = Job()
jobs[id] = job
else:
job = jobs[id]
- job.rm = j;
+ job.yarn = j2['app'];
if not job.status:
- job.status = job.rm['finalStatus']
+ job.status = job.yarn['finalStatus']
if not job.user:
- job.user = job.rm['user']
+ job.user = job.yarn['user']
+
+ print jobs
+else:
+ mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs'
+ yarn_url = base_yarn_url + '/ws/v1/cluster/apps'
+ print '# ' + mapred_url
+ print '# ' + yarn_url
+
+ j1 = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs')
+ j2 = get_rest(base_yarn_url, '/ws/v1/cluster/apps')
+
+ if j1["jobs"]:
+ for j in j1["jobs"]["job"]:
+ id = regJob.sub('', j['id'])
+ if id not in jobs:
+ job = Job()
+ jobs[id] = job
+ else:
+ job = jobs[id]
+ job.mapred = j;
+ job.status = job.mapred['state']
+ job.user = job.mapred['user']
+
+ if j2["apps"]:
+ for j in j2["apps"]["app"]:
+ id = regApp.sub('', j['id'])
+ if id not in jobs:
+ job = Job()
+ jobs[id] = job
+ else:
+ job = jobs[id]
+ job.yarn = j;
+ if not job.status:
+ job.status = job.yarn['finalStatus']
+ if not job.user:
+ job.user = job.yarn['user']
+
regHost = re.compile(':\d+')
counter=1
counter += 1
print ' status: %s' % job.status
print ' user: %s' % job.user
- if job.jhs:
- print ' submit: %d, start: %d, finish: %d' % (job.jhs['submitTime'], job.jhs['startTime'], job.jhs['finishTime'])
- print ' elapsed: %.3f s' % ((job.jhs['finishTime'] - job.jhs['startTime']) / 1000.0)
- print ' finished: %.3f s' % ((job.jhs['finishTime'] - job.jhs['submitTime']) / 1000.0)
+ if job.mapred:
+ print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
+ print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
+ print ' finished: %.3f s' % ((job.mapred['finishTime'] - job.mapred['submitTime']) / 1000.0)
if details:
nodes = dict()
- t = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
+ t = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks' % id)
if t['tasks']:
aggregate=0
for task in t['tasks']['task']:
# # print 'taskid: %s, elapsed: %d' % (task['id'], task['elapsedTime'])
aggregate += task['elapsedTime']
- a = get_rest(base_jhs_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
+ a = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/tasks/%s/attempts' % (id, task['id']))
if a['taskAttempts']:
for attempt in a['taskAttempts']['taskAttempt']:
nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
aggregate += elapsed
print ' ==> aggregated %d' % aggregate
- if job.rm:
- print ' MB x s: %d' % job.rm['memorySeconds']
- print ' CPU x s: %d' % job.rm['vcoreSeconds']
+ if job.yarn:
+ print ' MB x s: %d' % job.yarn['memorySeconds']
+ print ' CPU x s: %d' % job.yarn['vcoreSeconds']
print