From 9e8e1c565bf60a8c3f0cce02cfdd3a097016d372 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Wed, 4 Mar 2015 23:26:15 +0100 Subject: [PATCH] Use all database call properly. Add name also to new jobs. --- jobs.py | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/jobs.py b/jobs.py index 72ec9cd..84d6a96 100755 --- a/jobs.py +++ b/jobs.py @@ -86,20 +86,6 @@ def gen_url(base_url, port_nossl, port_ssl): return base_url -def dbint(s): - if s is None: - return 'NULL' - else: - return s - - -def dbstr(s): - if s is None: - return 'NULL' - else: - return "'" + s + "'" - - try: opts, args = getopt.getopt(sys.argv[1:], 'hb:c:d:j:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug=', 'jobid=', 'mapred=', 'yarn=', 'ssl']) except getopt.GetoptError: @@ -312,32 +298,32 @@ for id, job in jobs.iteritems(): CPU=8 MAP=9 REDUCE=10 - st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'", id) + st.execute("SELECT id, name, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id=%s", id) data = st.fetchone() if data: if data[NAME] == job.name and data[USER] == job.user and data[STATUS] == job.status and data[START] == job.start and data[FINISH] == job.finish: if debug >= 2: print '[db] job %s found' % id else: - st.execute("UPDATE job SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, dbint(job.start), dbint(job.finish), id)) + st.execute("UPDATE job SET name=%s, user=%s, status=%s, start=%s, finish=%s WHERE id=%s", (job.name, job.user, job.status, job.start, job.finish, id)) if debug >= 2: print '[db] job %s updated' % id changed = 1 else: - st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', '%s', '%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish))) + st.execute("INSERT INTO job (id, name, user, status, start, finish) VALUES(%s, %s, %s, %s, %s, %s)", (id, job.name, job.user, job.status, job.start, job.finish)) if debug >= 2: print '[db] job %s inserted' % id changed = 1 if job.mapred: if data and data[SUBMIT] == job.mapred['submitTime'] and data[MAP] == job.mapred['mapsTotal'] and data[REDUCE] == job.mapred['reducesTotal']: if debug >= 2: print '[db] job %s mapred is actual' % id else: - st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id)) + st.execute("UPDATE job SET submit=%s, map=%s, reduce=%s WHERE id=%s", (job.mapred['submitTime'], job.mapred['mapsTotal'], job.mapred['reducesTotal'], id)) if debug >= 2: print '[db] job %s mapred updated' % id changed = 1 if job.yarn: if data and data[MEMORY] == job.yarn['memorySeconds'] and data[CPU] == job.yarn['vcoreSeconds']: if debug >= 2: print '[db] job %s yarn is actual' % id else: - st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id)) + st.execute("UPDATE job SET memory_seconds=%s, cpu_seconds=%s WHERE id=%s", (job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id)) if debug >= 2: print '[db] job %s yarn updated' % id changed = 1 @@ -372,9 +358,9 @@ for id, job in jobs.iteritems(): print ' ==> aggregated %d' % aggregate if nodes and db: - st.execute("DELETE FROM node WHERE jobid='%s'" % id) + st.execute("DELETE FROM node WHERE jobid=%s", id) for nodename, node in nodes.iteritems(): - st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce)) + st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES (%s, %s, %s, %s, %s)", (id, nodename, node.elapsed, node.map, node.reduce)) if debug >= 2: print '[db] job %s nodes updated' % id if db: -- 1.8.2.3