Add job queue column.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 12:55:20 +0000 (13:55 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 5 Mar 2015 13:08:30 +0000 (14:08 +0100)
create.sql
jobs.py

index a1f9bcc..be25f40 100644 (file)
@@ -3,6 +3,7 @@ CREATE TABLE jobs (
        name CHAR(128),
        user CHAR(20),
        status CHAR(20),
+       queue CHAR(80),
        submit BIGINT,
        start BIGINT,
        finish BIGINT,
diff --git a/jobs.py b/jobs.py
index cfeb74f..cc83ea3 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -14,10 +14,11 @@ class Job:
        name = None
        status = None
        user = None
-       mapred = None
-       yarn = None
+       queue = None
        start = None
        finish = None
+       mapred = None
+       yarn = None
 
 class JobNode:
        elapsed = 0
@@ -273,6 +274,7 @@ for id, job in jobs.iteritems():
                job.name = job.mapred['name']
                job.status = job.mapred['state']
                job.user = job.mapred['user']
+               job.queue = job.mapred['queue']
                job.start = job.mapred['startTime']
                job.finish = job.mapred['finishTime']
        if job.yarn:
@@ -282,6 +284,8 @@ for id, job in jobs.iteritems():
                        job.status = job.yarn['finalStatus']
                if not job.user:
                        job.user = job.yarn['user']
+               if not job.queue:
+                       job.queue = job.yarn['queue']
                if not job.start:
                        job.start = job.yarn['startedTime']
                        if debug >= 1: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
@@ -294,6 +298,7 @@ for id, job in jobs.iteritems():
        print '  name: %s' % job.name
        print '  status: %s' % job.status
        print '  user: %s' % job.user
+       print '  queue: %s' % job.queue
        if job.mapred:
                print '  submit: %d, start: %d, finish: %d' % (job.mapred['submitTime'], job.mapred['startTime'], job.mapred['finishTime'])
                print '  elapsed: %.3f s' % ((job.mapred['finishTime'] - job.mapred['startTime']) / 1000.0)
@@ -309,25 +314,26 @@ for id, job in jobs.iteritems():
                NAME=1
                USER=2
                STATUS=3
-               SUBMIT=4
-               START=5
-               FINISH=6
-               MEMORY=7
-               CPU=8
-               MAP=9
-               REDUCE=10
-               st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id)
+               QUEUE=4
+               SUBMIT=5
+               START=6
+               FINISH=7
+               MEMORY=8
+               CPU=9
+               MAP=10
+               REDUCE=11
+               st.execute("SELECT id, name, user, status, queue, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM jobs WHERE id=%s", id)
                data = st.fetchone()
                if data:
-                       if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[START] == job.start and data[FINISH] == job.finish:
+                       if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[QUEUE] == job.queue and data[START] == job.start and data[FINISH] == job.finish:
                                if debug >= 2: print '[db] job %s found' % id
                        else:
-                               st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id))
+                               st.execute("UPDATE jobs SET name=%s, user=%s, status=%s, queue=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.queue, job.start, job.finish, id))
                                if debug >= 2: print '[db] job %s updated' % id
                                changed = 1
 
                else:
-                       st.execute("INSERT INTO jobs (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish))
+                       st.execute("INSERT INTO jobs (id, name, user, status, queue, start, finish) VALUES(%s, %s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.queue, job.start, job.finish))
                        if debug >= 2: print '[db] job %s inserted' % id
                        changed = 1
                if job.mapred: