Support for high availability of YARN Resource Manager.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 8 Mar 2015 12:55:34 +0000 (13:55 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 8 Mar 2015 12:55:34 +0000 (13:55 +0100)
jobs.py

diff --git a/jobs.py b/jobs.py
index 5766383..7f27d9d 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -31,6 +31,7 @@ class Node:
 
 base_mapred_url=''
 base_yarn_url=''
+base_yarn_urls=list()
 db = None
 dbhost = 'localhost'
 dbname = 'bookkeeping'
@@ -77,7 +78,7 @@ def get_rest(base_url, url):
 
 def get_cluster_status(base_url):
        try:
-               j = get_rest(base_yarn_url, '/ws/v1/cluster/info')
+               j = get_rest(base_url, '/ws/v1/cluster/info')
        except pycurl.error:
                if c.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED:
                        j = json.loads('{"clusterInfo":{"state":"NO CONNETION"}}')
@@ -85,14 +86,14 @@ def get_cluster_status(base_url):
                        raise
 
        if not j['clusterInfo']:
-               if debug >= 2:
+               if debug >= 3:
                        print 'Error with YARN RM'
                return None
 
        ci = j['clusterInfo']
        if not 'haState' in ci.keys():
                ci['haState'] = 'NONE'
-       if debug >= 2:
+       if debug >= 3:
                print '[YARN] state=%s, haState=%s' % (ci['state'], ci['haState'])
        if ci['state'] != 'STARTED':
                return None
@@ -167,7 +168,7 @@ OPTIONS are:\n\
                        elif cfg[0] == 'mapred':
                                base_mapred_url = cfg[1]
                        elif cfg[0] == 'yarn':
-                               base_yarn_url = cfg[1]
+                               base_yarn_urls.append(cfg[1])
                        elif cfg[0] == 'ssl':
                                ssl = int(cfg[1])
        elif opt in ('-d', '--debug'):
@@ -187,7 +188,7 @@ OPTIONS are:\n\
        elif opt in ('-m', '--mapred'):
                base_mapred_url = arg
        elif opt in ('-y', '--yarn'):
-               base_yarn_url = arg
+               base_yarn_urls.append(arg)
        elif opt in ('-s', '--ssl'):
                ssl=1
        elif opt in ('-q', '--query'):
@@ -200,18 +201,24 @@ if ssl:
        c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
        c.setopt(pycurl.USERPWD, ":")
 
-base_yarn_url = gen_url(base_yarn_url, 8088, 8090)
+for i in range(len(base_yarn_urls)):
+       base_yarn_urls[i] = gen_url(base_yarn_urls[i], 8088, 8090)
+       if debug >= 2:
+               print '[YARN] URL %d: %s' % (i, base_yarn_urls[i])
+       j = get_cluster_status(base_yarn_urls[i])
+       if j:
+               base_yarn_url = base_yarn_urls[i]
+               break
+if not base_yarn_url:
+       print '[YARN] probem with RM'
+       sys.exit(2)
+
 base_mapred_url = gen_url(base_mapred_url, 19888, 19890)
 
 if debug >= 2:
        print '[MR] URL: ' + base_mapred_url
        print '[YARN] URL: ' + base_yarn_url
 
-j = get_cluster_status(base_yarn_url)
-if not j:
-       print '[YARN] probem with RM'
-       sys.exit(2)
-
 regJob = re.compile('^job_')
 regApp = re.compile('^application_')
 regAtt = re.compile('^attempt_')