base_mapred_url=''
base_yarn_url=''
+base_yarn_urls=list()
db = None
dbhost = 'localhost'
dbname = 'bookkeeping'
def get_cluster_status(base_url):
try:
- j = get_rest(base_yarn_url, '/ws/v1/cluster/info')
+ j = get_rest(base_url, '/ws/v1/cluster/info')
except pycurl.error:
if c.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED:
j = json.loads('{"clusterInfo":{"state":"NO CONNETION"}}')
raise
if not j['clusterInfo']:
- if debug >= 2:
+ if debug >= 3:
print 'Error with YARN RM'
return None
ci = j['clusterInfo']
if not 'haState' in ci.keys():
ci['haState'] = 'NONE'
- if debug >= 2:
+ if debug >= 3:
print '[YARN] state=%s, haState=%s' % (ci['state'], ci['haState'])
if ci['state'] != 'STARTED':
return None
elif cfg[0] == 'mapred':
base_mapred_url = cfg[1]
elif cfg[0] == 'yarn':
- base_yarn_url = cfg[1]
+ base_yarn_urls.append(cfg[1])
elif cfg[0] == 'ssl':
ssl = int(cfg[1])
elif opt in ('-d', '--debug'):
elif opt in ('-m', '--mapred'):
base_mapred_url = arg
elif opt in ('-y', '--yarn'):
- base_yarn_url = arg
+ base_yarn_urls.append(arg)
elif opt in ('-s', '--ssl'):
ssl=1
elif opt in ('-q', '--query'):
c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
c.setopt(pycurl.USERPWD, ":")
-base_yarn_url = gen_url(base_yarn_url, 8088, 8090)
+for i in range(len(base_yarn_urls)):
+ base_yarn_urls[i] = gen_url(base_yarn_urls[i], 8088, 8090)
+ if debug >= 2:
+ print '[YARN] URL %d: %s' % (i, base_yarn_urls[i])
+ j = get_cluster_status(base_yarn_urls[i])
+ if j:
+ base_yarn_url = base_yarn_urls[i]
+ break
+if not base_yarn_url:
+ print '[YARN] probem with RM'
+ sys.exit(2)
+
base_mapred_url = gen_url(base_mapred_url, 19888, 19890)
if debug >= 2:
print '[MR] URL: ' + base_mapred_url
print '[YARN] URL: ' + base_yarn_url
-j = get_cluster_status(base_yarn_url)
-if not j:
- print '[YARN] probem with RM'
- sys.exit(2)
-
regJob = re.compile('^job_')
regApp = re.compile('^application_')
regAtt = re.compile('^attempt_')