From abf2f6a5862c29cd819cbd111da3a3fb6062faa4 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Thu, 5 Mar 2015 13:55:20 +0100 Subject: [PATCH] Add job queue column. --- create.sql | 1 + jobs.py | 32 +++++++++++++++++++------------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/create.sql b/create.sql index a1f9bcc..be25f40 100644 --- a/create.sql +++ b/create.sql @@ -3,6 +3,7 @@ CREATE TABLE jobs ( name CHAR(128), user CHAR(20), status CHAR(20), + queue CHAR(80), submit BIGINT, start BIGINT, finish BIGINT, diff --git a/jobs.py b/jobs.py index cfeb74f..cc83ea3 100755 --- a/jobs.py +++ b/jobs.py @@ -14,10 +14,11 @@ class Job: name = None status = None user = None - mapred = None - yarn = None + queue = None start = None finish = None + mapred = None + yarn = None class JobNode: elapsed = 0 @@ -273,6 +274,7 @@ for id, job in jobs.iteritems(): job.name = job.mapred['name'] job.status = job.mapred['state'] job.user = job.mapred['user'] + job.queue = job.mapred['queue'] job.start = job.mapred['startTime'] job.finish = job.mapred['finishTime'] if job.yarn: @@ -282,6 +284,8 @@ for id, job in jobs.iteritems(): job.status = job.yarn['finalStatus'] if not job.user: job.user = job.yarn['user'] + if not job.queue: + job.queue = job.yarn['queue'] if not job.start: job.start = job.yarn['startedTime'] if debug >= 1: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start) @@ -294,6 +298,7 @@ for id, job in jobs.iteritems(): print ' name: %s' % job.name print ' status: %s' % job.status print ' user: %s' % job.user + print ' queue: %s' % job.queue if job.mapred: print ' submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime']) print ' elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0) @@ -309,25 +314,26 @@ for id, job in jobs.iteritems(): NAME=1 USER=2 STATUS=3 - SUBMIT=4 - START=5 - FINISH=6 - MEMORY=7 - CPU=8 - MAP=9 - REDUCE=10 - st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id) + QUEUE=4 + SUBMIT=5 + START=6 + FINISH=7 + MEMORY=8 + CPU=9 + MAP=10 + REDUCE=11 + st.execute("SELECT id, name, user, status, queue, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id) data = st.fetchone() if data: - if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[START] == job.start and data[FINISH] == job.finish: + if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[QUEUE] == job.queue and data[START] == job.start and data[FINISH] == job.finish: if debug >= 2: print '[db] job %s found' % id else: - st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id)) + st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, queue=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.queue, job.start, job.finish, id)) if debug >= 2: print '[db] job %s updated' % id changed = 1 else: - st.execute("INSERT INTO jobs (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish)) + st.execute("INSERT INTO jobs (id, name, user, status, queue, start, finish) VALUES(%s, %s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.queue, job.start, job.finish)) if debug >= 2: print '[db] job %s inserted' % id changed = 1 if job.mapred: -- 1.8.2.3