From: František Dvořák Date: Mon, 9 Mar 2015 14:23:32 +0000 (+0100) Subject: Store also job counters in bookkeeping. X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=c33b7fa7c59b02c9160934a9d7c0b291e10e2768;p=hadoop-jobstat.git Store also job counters in bookkeeping. --- diff --git a/create.sql b/create.sql index af9aef2..606e741 100644 --- a/create.sql +++ b/create.sql @@ -45,6 +45,27 @@ CREATE TABLE jobnodes ( INDEX (jobid) ); + +CREATE TABLE jobcounters ( + jobid CHAR(80) NOT NULL, + counterid INTEGER, + + reduce BIGINT, + map BIGINT, + total BIGINT, + + INDEX(jobid), + INDEX(counterid) +); + + +CREATE TABLE counters ( + id INTEGER PRIMARY KEY AUTO_INCREMENT, + groupName CHAR(128), + name CHAR(128) +); + + CREATE TABLE nodes ( id INTEGER PRIMARY KEY AUTO_INCREMENT, host VARCHAR(256), diff --git a/jobs.py b/jobs.py index 49bc8c1..cbc57d7 100755 --- a/jobs.py +++ b/jobs.py @@ -29,6 +29,10 @@ class JobNode: class Node: host = None +class Counter: + group = None + counter = None + base_mapred_url='' base_yarn_url='' base_yarn_urls=list() @@ -45,9 +49,10 @@ id = None node_hosts = dict() node_ids = dict() +counter_list=dict() jobs = dict() -c = pycurl.Curl() +curl = pycurl.Curl() def get_rest(base_url, url): @@ -55,16 +60,16 @@ def get_rest(base_url, url): print '# %s%s' % (base_url, url) b = BytesIO() - c.setopt(pycurl.URL, str(base_url + url)) - #c.setopt(pycurl.WRITEDATA, b) - c.setopt(pycurl.WRITEFUNCTION, b.write) - c.perform() + curl.setopt(pycurl.URL, str(base_url + url)) + #curl.setopt(pycurl.WRITEDATA, b) + curl.setopt(pycurl.WRITEFUNCTION, b.write) + curl.perform() s = b.getvalue().decode('utf-8') - if c.getinfo(c.RESPONSE_CODE) != 200: + if curl.getinfo(curl.RESPONSE_CODE) != 200: print s - print 'Status: %d' % c.getinfo(c.RESPONSE_CODE) - c.close() + print 'Status: %d' % curl.getinfo(curl.RESPONSE_CODE) + curl.close() b.close() raise Exception() @@ -80,7 +85,7 @@ def get_cluster_status(base_url): try: j = get_rest(base_url, '/ws/v1/cluster/info') except pycurl.error: - if c.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED: + if curl.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED: j = json.loads('{"clusterInfo":{"state":"NO CONNETION"}}') else: raise @@ -198,8 +203,8 @@ OPTIONS are:\n\ sys.exit(2) if ssl: - c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE) - c.setopt(pycurl.USERPWD, ":") + curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE) + curl.setopt(pycurl.USERPWD, ":") for i in range(len(base_yarn_urls)): base_yarn_urls[i] = gen_url(base_yarn_urls[i], 8088, 8090) @@ -241,7 +246,7 @@ if id: except Exception: j2 = None - counter = 0 + jcounter = 0 if j1 and j1['job']: if id not in jobs: job = Job() @@ -249,10 +254,10 @@ if id: else: job = jobs[id] job.mapred = j1['job']; - counter += 1 - if debug >= 2: print '[MR] %d jobs' % counter + jcounter += 1 + if debug >= 2: print '[MR] %d jobs' % jcounter - counter = 0 + jcounter = 0 if j2 and j2['app']: if id not in jobs: job = Job() @@ -260,8 +265,8 @@ if id: else: job = jobs[id] job.yarn = j2['app']; - counter += 1 - if debug >= 2: print '[YARN] %d jobs' % counter + jcounter += 1 + if debug >= 2: print '[YARN] %d jobs' % jcounter else: mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs' + query yarn_url = base_yarn_url + '/ws/v1/cluster/apps' + query @@ -269,7 +274,7 @@ else: j1 = get_rest(mapred_url, '') j2 = get_rest(yarn_url, '') - counter = 0 + jcounter = 0 if j1["jobs"]: for j in j1["jobs"]["job"]: id = regJob.sub('', j['id']) @@ -279,10 +284,10 @@ else: else: job = jobs[id] job.mapred = j; - counter += 1 - if debug >= 2: print '[MR] %d jobs' % counter + jcounter += 1 + if debug >= 2: print '[MR] %d jobs' % jcounter - counter = 0 + jcounter = 0 if j2["apps"]: for j in j2["apps"]["app"]: id = regApp.sub('', j['id']) @@ -292,8 +297,8 @@ else: else: job = jobs[id] job.yarn = j; - counter += 1 - if debug >= 2: print '[YARN] %d jobs' % counter + jcounter += 1 + if debug >= 2: print '[YARN] %d jobs' % jcounter if db: db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname) @@ -311,11 +316,20 @@ if db: else: break + data = st.execute('SELECT id, groupName, name FROM counters') + while 1: + data = st.fetchone() + if data: + counter_name = '%s/%s' % (data[1], data[2]) + counter_list[counter_name] = data[0] + else: + break + regHost = re.compile(':\d+') -counter=0 +jcounter=0 for id, job in jobs.iteritems(): - counter += 1 + jcounter += 1 if job.mapred: job.name = job.mapred['name'] @@ -327,6 +341,7 @@ for id, job in jobs.iteritems(): if db: changed = 0 + changed_counters = 0 ID=0 NAME=1 USER=2 @@ -372,7 +387,7 @@ for id, job in jobs.iteritems(): if debug >= 2: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish) if debug >= 1: - print 'job %s (%d):' % (id, counter) + print 'job %s (%d):' % (id, jcounter) print ' name: %s' % job.name print ' status: %s' % job.status print ' user: %s' % job.user @@ -423,6 +438,11 @@ for id, job in jobs.iteritems(): if not data: changed = 1 + st.execute('SELECT * FROM jobcounters WHERE jobid=%s', id) + data = st.fetchone() + if not data: + changed_counters = 1 + # get details (intensive!), if new job or any other difference jobnodes = dict() subjobs = list() @@ -460,6 +480,22 @@ for id, job in jobs.iteritems(): print ' subjobs: %d' % len(subjobs) print ' ==> aggregated %d' % aggregate + counters = list() + counters_print = list() + if job.mapred and (not db or changed_counters or changed): + cs = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/counters' % id) + if cs and 'jobCounters' in cs.keys(): + if 'counterGroup' in cs['jobCounters'].keys(): + for cg in cs['jobCounters']['counterGroup']: + for c in cg['counter']: + counter = Counter() + counter.group = cg['counterGroupName'] + counter.counter = c + counters.append(counter) + counters_print.append('(%s=%d,%d,%d)' % (c['name'], c['reduceCounterValue'], c['mapCounterValue'], c['totalCounterValue'])) + if counters_print and debug >= 1: + print ' counters: ' + ''.join(counters_print) + if jobnodes and db: st.execute("DELETE FROM jobnodes WHERE jobid=%s", id) for nodename, jobnode in jobnodes.iteritems(): @@ -479,9 +515,22 @@ for id, job in jobs.iteritems(): st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime'])) if debug >= 3: print '[db] job %s subjobs updated' % id - # better to update timestamp again explicitly on the end of the transaction + if counters and db: + st.execute('DELETE FROM jobcounters WHERE jobid=%s', id) + for counter in counters: + counter_name = '%s/%s' % (counter.group, counter.counter['name']) + if not counter_name in counter_list.keys(): + st.execute('INSERT INTO counters (groupName, name) VALUES (%s, %s)', (counter.group, counter.counter['name'])) + counter_list[counter_name] = db.insert_id() + if debug >= 3: print '[db] new counter %s inserted' % counter_name + st.execute('INSERT INTO jobcounters (jobid, counterid, reduce, map, total) VALUES (%s, %s, %s, %s, %s)', (id, counter_list[counter_name], counter.counter['reduceCounterValue'], counter.counter['mapCounterValue'], counter.counter['totalCounterValue'])) + if debug >= 3: print '[db] job %s counters updated' % id + + # better to update timestamp again explicitly on the end of the transaction + if db and (jobnodes or counters): st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id); + if db: db.commit() @@ -490,4 +539,4 @@ for id, job in jobs.iteritems(): if db: db.close() -c.close() +curl.close()