Support database.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 4 Mar 2015 16:40:16 +0000 (17:40 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 4 Mar 2015 16:40:16 +0000 (17:40 +0100)
jobs.py

diff --git a/jobs.py b/jobs.py
index 120b195..359d629 100755 (executable)
--- a/jobs.py
+++ b/jobs.py
@@ -8,14 +8,28 @@ import sys
 import getopt
 import socket
 
+import MySQLdb
+
 class Job:
        status = None
        user = None
        mapred = None
        yarn = None
+       start = None
+       finish = None
+
+class Node:
+       elapsed = 0
+       map = 0
+       reduce = 0
 
 base_mapred_url=''
 base_yarn_url=''
+db = None
+dbhost = 'localhost'
+dbname = 'hadoop'
+dbuser = 'hadoop'
+dbpassword = ''
 debug=0
 ssl=0
 details=1
@@ -70,14 +84,43 @@ def gen_url(base_url, port_nossl, port_ssl):
        return base_url
 
 
+def dbint(s):
+       if s is None:
+               return 'NULL'
+       else:
+               return s
+
+
+def dbstr(s):
+       if s is None:
+               return 'NULL'
+       else:
+               return "'" + s + "'"
+
+
 try:
-       opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'debug', 'jobs=', 'mapred=', 'yarn=', 'ssl'])
+       opts, args = getopt.getopt(sys.argv[1:], 'hb:c:dj:m:y:s', ['help', 'base=', 'config=', 'db', 'dbhost=', 'dbname=', 'dbuser=', 'dbpassword=', 'debug', 'jobid=', 'mapred=', 'yarn=', 'ssl'])
 except getopt.GetoptError:
        print 'Args error'
        sys.exit(2)
 for opt, arg in opts:
        if opt in ('-h', '--help'):
-               print('jobs.py [-h|--help] [-b|--base] [-c|--config] [-d|--debug] [-j|--jobs] [-m|--mapred URL] [-y|--yarn URL] [-s|--ssl]')
+               print "jobs.py [OPTIONS]\n\
+OPTIONS are:\n\
+  -h, --help ........ help message\n\
+  -b, --base ........ default hostname for YARN ans MapReduce\n\
+  -c, --config ...... config file\n\
+  -d, --debug ....... debug output\n\
+  --db .............. enable database\n\
+  --dbhost\n\
+  --dbname\n\
+  --dbuser\n\
+  --dbpassword\n\
+  -j, --jobid ....... single job query istead of list all\n\
+  -m, --mapred URL .. MapReduce Job History server\n\
+  -y, --yarn URL .... YARN Resource Manager\n\
+  -s, --ssl ......... enable default SSL schema and ports"
+
                sys.exit(0)
        elif opt in ('-b', '--base'):
                host = arg
@@ -87,7 +130,17 @@ for opt, arg in opts:
                        cfg=line.rstrip().split('=')
                        if cfg[0] == 'base':
                                host = cfg[1]
-                       if cfg[0] == 'mapred':
+                       elif cfg[0] == 'dbhost':
+                               dbhost = cfg[1]
+                       elif cfg[0] == 'db':
+                               db = 1
+                       elif cfg[0] == 'dbname':
+                               dbname = cfg[1]
+                       elif cfg[0] == 'dbuser':
+                               dbuser = cfg[1]
+                       elif cfg[0] == 'dbpassword':
+                               dbpassword = cfg[1]
+                       elif cfg[0] == 'mapred':
                                base_mapred_url = cfg[1]
                                print cfg
                        elif cfg[0] == 'yarn':
@@ -97,7 +150,17 @@ for opt, arg in opts:
                                ssl = cfg[1]
        elif opt in ('-d', '--debug'):
                debug=1
-       elif opt in ('-j', '--jobs'):
+       elif opt in ('--db'):
+               db = 1
+       elif opt in ('--dbhost'):
+               dbhost = arg
+       elif opt in ('--dbname'):
+               dbname = arg
+       elif opt in ('--dbuser'):
+               dbuser = arg
+       elif opt in ('--dbpassword'):
+               dbpassword = arg
+       elif opt in ('-j', '--jobid'):
                id = arg
        elif opt in ('-m', '--mapred'):
                base_mapred_url = arg
@@ -140,8 +203,6 @@ if id:
                else:
                        job = jobs[id]
                job.mapred = j1['job'];
-               job.status = job.mapred['state']
-               job.user = job.mapred['user']
 
        if j2 and j2['app']:
                if id not in jobs:
@@ -150,10 +211,6 @@ if id:
                else:
                        job = jobs[id]
                job.yarn = j2['app'];
-               if not job.status:
-                       job.status = job.yarn['finalStatus']
-               if not job.user:
-                       job.user = job.yarn['user']
 
        print jobs
 else:
@@ -174,8 +231,6 @@ else:
                        else:
                                job = jobs[id]
                        job.mapred = j;
-                       job.status = job.mapred['state']
-                       job.user = job.mapred['user']
 
        if j2["apps"]:
                for j in j2["apps"]["app"]:
@@ -186,15 +241,33 @@ else:
                        else:
                                job = jobs[id]
                        job.yarn = j;
-                       if not job.status:
-                               job.status = job.yarn['finalStatus']
-                       if not job.user:
-                               job.user = job.yarn['user']
 
+if db:
+       db = MySQLdb.connect(dbhost, dbuser, dbpassword, dbname)
+       st = db.cursor()
 
 regHost = re.compile(':\d+')
 counter=1
 for id, job in jobs.iteritems():
+
+       # merge MapRed and YARN
+       if job.mapred:
+               job.status = job.mapred['state']
+               job.user = job.mapred['user']
+               job.start = job.mapred['startTime']
+               job.finish = job.mapred['finishTime']
+       if job.yarn:
+               if not job.status:
+                       job.status = job.yarn['finalStatus']
+               if not job.user:
+                       job.user = job.yarn['user']
+               if not job.start:
+                       job.start = job.yarn['startedTime']
+                       if debug: print '[MR] missing start time of %s completed from YARN (%d)' % (id, job.start)
+               if not job.finish:
+                       job.finish = job.yarn['finishedTime']
+                       if debug: print '[MR] missing finish time of %s completed from YARN (%d)' % (id, job.finish)
+
        print 'job %s (%d):' % (id, counter)
        counter += 1
        print '  status: %s' % job.status
@@ -217,21 +290,61 @@ for id, job in jobs.iteritems():
                                                for attempt in a['taskAttempts']['taskAttempt']:
                                                        nodeHost = regHost.sub('', attempt['nodeHttpAddress'])
                                                        if nodeHost not in nodes:
-                                                               nodes[nodeHost] = 0
-                                                       nodes[nodeHost] += attempt['elapsedTime']
+                                                               nodes[nodeHost] = Node()
+                                                       nodes[nodeHost].elapsed += attempt['elapsedTime']
+                                                       if attempt['type'] == 'MAP':
+                                                               nodes[nodeHost].map += 1
+                                                       elif attempt['type'] == 'REDUCE':
+                                                               nodes[nodeHost].reduce += 1
+                                                       else:
+                                                               raise Exception('unknown type %s' %  attempt['type'])
 #      #                       print 'tasks elapsed: %d' % aggregate
 
                        aggregate=0
-                       for node, elapsed in nodes.iteritems():
-                               print '  node %s: %d' % (nodeelapsed)
-                               aggregate += elapsed
+                       for nodename, node in nodes.iteritems():
+                               print '  node %s: %d' % (nodename, node.elapsed)
+                               aggregate += node.elapsed
                        print '  ==> aggregated %d' % aggregate
 
        if job.yarn:
                print '  MB x s: %d' % job.yarn['memorySeconds']
                print '  CPU x s: %d' % job.yarn['vcoreSeconds']
 
+       if db:
+               st.execute("SELECT id, user, status, submit, start, finish, memory_seconds, cpu_seconds, map, reduce FROM job WHERE id='%s'" % id)
+               data = st.fetchone()
+               if data:
+                       if data[1] == job.user and data[2] == job.status and data[4] == job.start and data[5] == job.finish:
+                               if debug: print '[db] job %s found' % id
+                       else:
+                               st.execute("UPDATE job SET user='%s', status='%s', start=%s, finish=%s WHERE id='%s'" %(job.user, job.status, dbint(job.start), dbint(job.finish), id))
+                               if debug: print '[db] job %s updated' % id
+
+               else:
+                       st.execute("INSERT INTO job (id, user, status, start, finish) VALUES('%s', %s, %s)" %(id, job.user, job.status, dbint(job.start), dbint(job.finish)))
+                       if debug: print '[db] job %s inserted' % id
+               if job.mapred:
+                       if data and data[3] == job.mapred['submitTime'] and data[8] == job.mapred['mapsTotal'] and data[9] == job.mapred['reducesTotal']:
+                               if debug: print '[db] job %s mapred is actual' % id
+                       else:
+                               st.execute("UPDATE job SET submit=%d, map=%s, reduce=%s WHERE id='%s'" %(job.mapred['submitTime'], dbint(job.mapred['mapsTotal']), dbint(job.mapred['reducesTotal']), id))
+                               if debug: print '[db] job %s mapred updated' % id
+               if job.yarn:
+                       if data and data[6] == job.yarn['memorySeconds'] and data[7] == job.yarn['vcoreSeconds']:
+                               if debug: print '[db] job %s yarn is actual' % id
+                       else:
+                               st.execute("UPDATE job SET memory_seconds=%d, cpu_seconds=%d WHERE id='%s'" %(job.yarn['memorySeconds'], job.yarn['vcoreSeconds'], id))
+                               if debug: print '[db] job %s yarn updated' % id
+               if nodes:
+                       st.execute("DELETE FROM node WHERE jobid='%s'" % id)
+                       for nodename, node in nodes.iteritems():
+                               st.execute("INSERT INTO node (jobid, host, elapsed, map, reduce) VALUES ('%s', '%s', %d, %d, %d)" % (id, nodename, node.elapsed, node.map, node.reduce))
+                       if debug: print '[db] job %s nodes updated' % id
+               db.commit()
+
        print
 
-c.close()
+if db:
+       db.close()
 
+c.close()