class Node:
host = None
+class Counter:
+ group = None
+ counter = None
+
base_mapred_url=''
base_yarn_url=''
base_yarn_urls=list()
node_hosts = dict()
node_ids = dict()
+counter_list=dict()
jobs = dict()
-c = pycurl.Curl()
+curl = pycurl.Curl()
def get_rest(base_url, url):
print '# %s%s' % (base_url, url)
b = BytesIO()
- c.setopt(pycurl.URL, str(base_url + url))
- #c.setopt(pycurl.WRITEDATA, b)
- c.setopt(pycurl.WRITEFUNCTION, b.write)
- c.perform()
+ curl.setopt(pycurl.URL, str(base_url + url))
+ #curl.setopt(pycurl.WRITEDATA, b)
+ curl.setopt(pycurl.WRITEFUNCTION, b.write)
+ curl.perform()
s = b.getvalue().decode('utf-8')
- if c.getinfo(c.RESPONSE_CODE) != 200:
+ if curl.getinfo(curl.RESPONSE_CODE) != 200:
print s
- print 'Status: %d' % c.getinfo(c.RESPONSE_CODE)
- c.close()
+ print 'Status: %d' % curl.getinfo(curl.RESPONSE_CODE)
+ curl.close()
b.close()
raise Exception()
try:
j = get_rest(base_url, '/ws/v1/cluster/info')
except pycurl.error:
- if c.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED:
+ if curl.getinfo(pycurl.OS_ERRNO) == errno.ECONNREFUSED:
j = json.loads('{"clusterInfo":{"state":"NO CONNETION"}}')
else:
raise
sys.exit(2)
if ssl:
- c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
- c.setopt(pycurl.USERPWD, ":")
+ curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
+ curl.setopt(pycurl.USERPWD, ":")
for i in range(len(base_yarn_urls)):
base_yarn_urls[i] = gen_url(base_yarn_urls[i], 8088, 8090)
except Exception:
j2 = None
- counter = 0
+ jcounter = 0
if j1 and j1['job']:
if id not in jobs:
job = Job()
else:
job = jobs[id]
job.mapred = j1['job'];
- counter += 1
- if debug >= 2: print '[MR] %d jobs' % counter
+ jcounter += 1
+ if debug >= 2: print '[MR] %d jobs' % jcounter
- counter = 0
+ jcounter = 0
if j2 and j2['app']:
if id not in jobs:
job = Job()
else:
job = jobs[id]
job.yarn = j2['app'];
- counter += 1
- if debug >= 2: print '[YARN] %d jobs' % counter
+ jcounter += 1
+ if debug >= 2: print '[YARN] %d jobs' % jcounter
else:
mapred_url = base_mapred_url + '/ws/v1/history/mapreduce/jobs' + query
yarn_url = base_yarn_url + '/ws/v1/cluster/apps' + query
j1 = get_rest(mapred_url, '')
j2 = get_rest(yarn_url, '')
- counter = 0
+ jcounter = 0
if j1["jobs"]:
for j in j1["jobs"]["job"]:
id = regJob.sub('', j['id'])
else:
job = jobs[id]
job.mapred = j;
- counter += 1
- if debug >= 2: print '[MR] %d jobs' % counter
+ jcounter += 1
+ if debug >= 2: print '[MR] %d jobs' % jcounter
- counter = 0
+ jcounter = 0
if j2["apps"]:
for j in j2["apps"]["app"]:
id = regApp.sub('', j['id'])
else:
job = jobs[id]
job.yarn = j;
- counter += 1
- if debug >= 2: print '[YARN] %d jobs' % counter
+ jcounter += 1
+ if debug >= 2: print '[YARN] %d jobs' % jcounter
if db:
db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
else:
break
+ data = st.execute('SELECT id, groupName, name FROM counters')
+ while 1:
+ data = st.fetchone()
+ if data:
+ counter_name = '%s/%s' % (data[1], data[2])
+ counter_list[counter_name] = data[0]
+ else:
+ break
+
regHost = re.compile(':\d+')
-counter=0
+jcounter=0
for id, job in jobs.iteritems():
- counter += 1
+ jcounter += 1
if job.mapred:
job.name = job.mapred['name']
if db:
changed = 0
+ changed_counters = 0
ID=0
NAME=1
USER=2
if debug >= 2: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
if debug >= 1:
- print 'job %s (%d):' % (id, counter)
+ print 'job %s (%d):' % (id, jcounter)
print ' name: %s' % job.name
print ' status: %s' % job.status
print ' user: %s' % job.user
if not data:
changed = 1
+ st.execute('SELECT * FROM jobcounters WHERE jobid=%s', id)
+ data = st.fetchone()
+ if not data:
+ changed_counters = 1
+
# get details (intensive!), if new job or any other difference
jobnodes = dict()
subjobs = list()
print ' subjobs: %d' % len(subjobs)
print ' ==> aggregated %d' % aggregate
+ counters = list()
+ counters_print = list()
+ if job.mapred and (not db or changed_counters or changed):
+ cs = get_rest(base_mapred_url, '/ws/v1/history/mapreduce/jobs/job_%s/counters' % id)
+ if cs and 'jobCounters' in cs.keys():
+ if 'counterGroup' in cs['jobCounters'].keys():
+ for cg in cs['jobCounters']['counterGroup']:
+ for c in cg['counter']:
+ counter = Counter()
+ counter.group = cg['counterGroupName']
+ counter.counter = c
+ counters.append(counter)
+ counters_print.append('(%s=%d,%d,%d)' % (c['name'], c['reduceCounterValue'], c['mapCounterValue'], c['totalCounterValue']))
+ if counters_print and debug >= 1:
+ print ' counters: ' + ''.join(counters_print)
+
if jobnodes and db:
st.execute("DELETE FROM jobnodes WHERE jobid=%s", id)
for nodename, jobnode in jobnodes.iteritems():
st.execute('INSERT INTO subjobs (id, jobid, nodeid, state, type, start, finish) VALUES (%s, %s, %s, %s, %s, %s, %s)', (subjob['id'], id, node_hosts[nodename].id, subjob['state'], subjob['type'], subjob['startTime'], subjob['finishTime']))
if debug >= 3: print '[db] job %s subjobs updated' % id
- # better to update timestamp again explicitly on the end of the transaction
+ if counters and db:
+ st.execute('DELETE FROM jobcounters WHERE jobid=%s', id)
+ for counter in counters:
+ counter_name = '%s/%s' % (counter.group, counter.counter['name'])
+ if not counter_name in counter_list.keys():
+ st.execute('INSERT INTO counters (groupName, name) VALUES (%s, %s)', (counter.group, counter.counter['name']))
+ counter_list[counter_name] = db.insert_id()
+ if debug >= 3: print '[db] new counter %s inserted' % counter_name
+ st.execute('INSERT INTO jobcounters (jobid, counterid, reduce, map, total) VALUES (%s, %s, %s, %s, %s)', (id, counter_list[counter_name], counter.counter['reduceCounterValue'], counter.counter['mapCounterValue'], counter.counter['totalCounterValue']))
+ if debug >= 3: print '[db] job %s counters updated' % id
+
+ # better to update timestamp again explicitly on the end of the transaction
+ if db and (jobnodes or counters):
st.execute('UPDATE jobs SET changed=NOW() WHERE id=%s', id);
+
if db:
db.commit()
if db:
db.close()
-c.close()
+curl.close()