Store also job counters in bookkeeping.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 9 Mar 2015 14:23:32 +0000 (15:23 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 9 Mar 2015 14:28:08 +0000 (15:28 +0100)
create.sql
jobs.py

index af9aef2..606e741 100644 (file)
@@ -45,6 +45,27 @@ CREATE TABLE jobnodes (
        INDEX (jobid)
 );
 
+
+CREATE TABLE jobcounters (
+       jobid CHAR(80) NOT NULL,
+       counterid INTEGER,
+
+       reduce BIGINT,
+       map BIGINT,
+       total BIGINT,
+
+       INDEX(jobid),
+       INDEX(counterid)
+);
+
+
+CREATE TABLE counters (
+       id INTEGER PRIMARY KEY AUTO_INCREMENT,
+       groupName CHAR(128),
+       name CHAR(128)
+);
+
+
 CREATE TABLE nodes (
        id INTEGER PRIMARY KEY AUTO_INCREMENT,
        host VARCHAR(256),
diff --git a/jobs.py b/jobs.py
index 49bc8c1..cbc57d7 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -29,6 +29,10 @@ class JobNode:
 class Node:
        host = None
 
+class Counter:
+       group = None
+       counter = None
+
 base_mapred_url=''
 base_yarn_url=''
 base_yarn_urls=list()
@@ -45,9 +49,10 @@ id = None
 
 node_hosts = dict()
 node_ids = dict()
+counter_list=dict()
 jobs = dict()
 
-c = pycurl.Curl()
+curl = pycurl.Curl()
 
 
 def get_rest(base_url, url):
@@ -55,16 +60,16 @@ def get_rest(base_url, url):
                print '# %s%s' % (base_url, url)
 
        b = BytesIO()
-       c.setopt(pycurl.URL, str(base_url + url))
-       #c.setopt(pycurl.WRITEDATA, b)
-       c.setopt(pycurl.WRITEFUNCTION, b.write)
-       c.perform()
+       curl.setopt(pycurl.URL, str(base_url + url))
+       #curl.setopt(pycurl.WRITEDATA, b)
+       curl.setopt(pycurl.WRITEFUNCTION, b.write)
+       curl.perform()
        s = b.getvalue().decode('utf-8')
 
-       if c.getinfo(c.RESPONSE_CODE) != 200:
+       if curl.getinfo(curl.RESPONSE_CODE) != 200:
                print s
-               print 'Status: %d' % c.getinfo(c.RESPONSE_CODE)
-               c.close()
+               print 'Status: %d' % curl.getinfo(curl.RESPONSE_CODE)
+               curl.close()
                b.close()
                raise Exception()
 
@@ -80,7 +85,7 @@ def get_cluster_status(base_url):
        try:
                j = get_rest(base_url, '/ws/v1/cluster/info')
        except pycurl.error:
-               if c.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED:
+               if curl.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED:
                        j = json.loads('{"clusterInfo":{"state":"NO CONNETION"}}')
                else:
                        raise
@@ -198,8 +203,8 @@ OPTIONS are:\n\
                sys.exit(2)
 
 if ssl:
-       c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
-       c.setopt(pycurl.USERPWD, ":")
+       curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
+       curl.setopt(pycurl.USERPWD, ":")
 
 for i in range(len(base_yarn_urls)):
        base_yarn_urls[i] = gen_url(base_yarn_urls[i], 8088, 8090)
@@ -241,7 +246,7 @@ if id:
        except Exception:
                j2 = None
 
-       counter = 0
+       jcounter = 0
        if j1 and j1['job']:
                if id not in jobs:
                        job  = Job()
@@ -249,10 +254,10 @@ if id:
                else:
                        job = jobs[id]
                job.mapred = j1['job'];
-               counter += 1
-       if debug >= 2: print '[MR] %d jobs' % counter
+               jcounter += 1
+       if debug >= 2: print '[MR] %d jobs' % jcounter
 
-       counter = 0
+       jcounter = 0
        if j2 and j2['app']:
                if id not in jobs:
                        job  = Job()
@@ -260,8 +265,8 @@ if id:
                else:
                        job = jobs[id]
                job.yarn = j2['app'];
-               counter += 1
-       if debug >= 2: print '[YARN] %d jobs' % counter
+               jcounter += 1
+       if debug >= 2: print '[YARN] %d jobs' % jcounter
 else:
        mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs' + query
        yarn_url = base_yarn_url + '/ws/v1/cluster/apps' + query
@@ -269,7 +274,7 @@ else:
        j1 = get_rest(mapred_url, '')
        j2 = get_rest(yarn_url, '')
 
-       counter = 0
+       jcounter = 0
        if j1["jobs"]:
                for j in j1["jobs"]["job"]:
                        id = regJob.sub('', j['id'])
@@ -279,10 +284,10 @@ else:
                        else:
                                job = jobs[id]
                        job.mapred = j;
-                       counter += 1
-       if debug >= 2: print '[MR] %d jobs' % counter
+                       jcounter += 1
+       if debug >= 2: print '[MR] %d jobs' % jcounter
 
-       counter = 0
+       jcounter = 0
        if j2["apps"]:
                for j in j2["apps"]["app"]:
                        id = regApp.sub('', j['id'])
@@ -292,8 +297,8 @@ else:
                        else:
                                job = jobs[id]
                        job.yarn = j;
-                       counter += 1
-       if debug >= 2: print '[YARN] %d jobs' % counter
+                       jcounter += 1
+       if debug >= 2: print '[YARN] %d jobs' % jcounter
 
 if db:
        db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
@@ -311,11 +316,20 @@ if db:
                else:
                        break
 
+       data = st.execute('SELECT id, groupName, name FROM counters')
+       while 1:
+               data = st.fetchone()
+               if data:
+                       counter_name = '%s/%s' % (data[1], data[2])
+                       counter_list[counter_name] = data[0]
+               else:
+                       break
+
 regHost = re.compile(':\d+')
-counter=0
+jcounter=0
 for id, job in jobs.iteritems():
 
-       counter += 1
+       jcounter += 1
 
        if job.mapred:
                job.name = job.mapred['name']
@@ -327,6 +341,7 @@ for id, job in jobs.iteritems():
 
        if db:
                changed = 0
+               changed_counters = 0
                ID=0
                NAME=1
                USER=2
@@ -372,7 +387,7 @@ for id, job in jobs.iteritems():
                        if debug >= 2: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
 
        if debug >= 1:
-               print 'job %s (%d):' % (id, counter)
+               print 'job %s (%d):' % (id, jcounter)
                print '  name: %s' % job.name
                print '  status: %s' % job.status
                print '  user: %s' % job.user
@@ -423,6 +438,11 @@ for id, job in jobs.iteritems():
                if not data:
                        changed = 1
 
+               st.execute('SELECT * FROM jobcounters WHERE jobid=%s', id)
+               data = st.fetchone()
+               if not data:
+                       changed_counters = 1
+
        # get details (intensive!), if new job or any other difference
        jobnodes = dict()
        subjobs = list()
@@ -460,6 +480,22 @@ for id, job in jobs.iteritems():
                        print '  subjobs: %d' % len(subjobs)
                        print '  ==> aggregated %d' % aggregate
 
+       counters = list()
+       counters_print = list()
+       if job.mapred and (not db or changed_counters or changed):
+               cs = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/counters' % id)
+               if cs and 'jobCounters' in cs.keys():
+                       if 'counterGroup' in cs['jobCounters'].keys():
+                               for cg in cs['jobCounters']['counterGroup']:
+                                       for c in cg['counter']:
+                                               counter = Counter()
+                                               counter.group = cg['counterGroupName']
+                                               counter.counter = c
+                                               counters.append(counter)
+                                               counters_print.append('(%s=%d,%d,%d)' % (c['name'], c['reduceCounterValue'], c['mapCounterValue'], c['totalCounterValue']))
+               if counters_print and debug >= 1:
+                       print '  counters: ' + ''.join(counters_print)
+
        if jobnodes and db:
                st.execute("DELETE FROM jobnodes WHERE jobid=%s", id)
                for nodename, jobnode in jobnodes.iteritems():
@@ -479,9 +515,22 @@ for id, job in jobs.iteritems():
                        st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime']))
                if debug >= 3: print '[db] job %s subjobs updated' % id
 
-               # better to update timestamp again explicitly on the end of the transaction
+       if counters and db:
+               st.execute('DELETE FROM jobcounters WHERE jobid=%s', id)
+               for counter in counters:
+                       counter_name = '%s/%s' % (counter.group, counter.counter['name'])
+                       if not counter_name in counter_list.keys():
+                               st.execute('INSERT INTO counters (groupName, name) VALUES (%s, %s)', (counter.group, counter.counter['name']))
+                               counter_list[counter_name] = db.insert_id()
+                               if debug >= 3: print '[db] new counter %s inserted' % counter_name
+                       st.execute('INSERT INTO jobcounters (jobid, counterid, reduce, map, total) VALUES (%s, %s, %s, %s, %s)', (id, counter_list[counter_name], counter.counter['reduceCounterValue'], counter.counter['mapCounterValue'], counter.counter['totalCounterValue']))
+               if debug >= 3: print '[db] job %s counters updated' % id
+
+       # better to update timestamp again explicitly on the end of the transaction
+       if db and (jobnodes or counters):
                st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id);
 
+
        if db:
                db.commit()
 
@@ -490,4 +539,4 @@ for id, job in jobs.iteritems():
 if db:
        db.close()
 
-c.close()
+curl.close()