From 06ac4b761bfc282ed2c0e7f52b50a86435d9a189 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Sun, 8 Mar 2015 13:55:34 +0100 Subject: [PATCH] Support for high availability of YARN Resource Manager. --- jobs.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/jobs.py b/jobs.py index 5766383..7f27d9d 100755 --- a/jobs.py +++ b/jobs.py @@ -31,6 +31,7 @@ class Node: base_mapred_url='' base_yarn_url='' +base_yarn_urls=list() db = None dbhost = 'localhost' dbname = 'bookkeeping' @@ -77,7 +78,7 @@ def get_rest(base_url, url): def get_cluster_status(base_url): try: - j = get_rest(base_yarn_url, '/ws/v1/cluster/info') + j = get_rest(base_url, '/ws/v1/cluster/info') except pycurl.error: if c.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED: j = json.loads('{"clusterInfo":{"state":"NO CONNETION"}}') @@ -85,14 +86,14 @@ def get_cluster_status(base_url): raise if not j['clusterInfo']: - if debug >= 2: + if debug >= 3: print 'Error with YARN RM' return None ci = j['clusterInfo'] if not 'haState' in ci.keys(): ci['haState'] = 'NONE' - if debug >= 2: + if debug >= 3: print '[YARN] state=%s, haState=%s' % (ci['state'], ci['haState']) if ci['state'] != 'STARTED': return None @@ -167,7 +168,7 @@ OPTIONS are:\n\ elif cfg[0] == 'mapred': base_mapred_url = cfg[1] elif cfg[0] == 'yarn': - base_yarn_url = cfg[1] + base_yarn_urls.append(cfg[1]) elif cfg[0] == 'ssl': ssl = int(cfg[1]) elif opt in ('-d', '--debug'): @@ -187,7 +188,7 @@ OPTIONS are:\n\ elif opt in ('-m', '--mapred'): base_mapred_url = arg elif opt in ('-y', '--yarn'): - base_yarn_url = arg + base_yarn_urls.append(arg) elif opt in ('-s', '--ssl'): ssl=1 elif opt in ('-q', '--query'): @@ -200,18 +201,24 @@ if ssl: c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE) c.setopt(pycurl.USERPWD, ":") -base_yarn_url = gen_url(base_yarn_url, 8088, 8090) +for i in range(len(base_yarn_urls)): + base_yarn_urls[i] = gen_url(base_yarn_urls[i], 8088, 8090) + if debug >= 2: + print '[YARN] URL %d: %s' % (i, base_yarn_urls[i]) + j = get_cluster_status(base_yarn_urls[i]) + if j: + base_yarn_url = base_yarn_urls[i] + break +if not base_yarn_url: + print '[YARN] probem with RM' + sys.exit(2) + base_mapred_url = gen_url(base_mapred_url, 19888, 19890) if debug >= 2: print '[MR] URL: ' + base_mapred_url print '[YARN] URL: ' + base_yarn_url -j = get_cluster_status(base_yarn_url) -if not j: - print '[YARN] probem with RM' - sys.exit(2) - regJob = re.compile('^job_') regApp = re.compile('^application_') regAtt = re.compile('^attempt_') -- 1.8.2.3