Preliminary implementation for user jobs statistics.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 18 Jan 2015 00:45:27 +0000 (01:45 +0100)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Sun, 18 Jan 2015 00:45:27 +0000 (01:45 +0100)
files/accounting/create.sql
manifests/accounting.pp
templates/accounting/cron-hdfs.erb
templates/accounting/cron-jobs.erb [new file with mode: 0644]
templates/accounting/cron-quota.erb
templates/accounting/jobs.py.erb [new file with mode: 0755]

index 157fc5a..58c84c5 100644 (file)
@@ -13,9 +13,9 @@
 -- INSERT INTO hdfs (id_measure, full, disk, disk_used, block_under, block_corrupt, block_missing) VALUES (last_insert_id(), 10240, 10230, 100, 0, 0, 0);
 -- INSERT INTO hdfs (id_measure, hostname, state, full, disk, disk_used) VALUES (last_insert_id(), 'hador1', 1, 1024, 1023, 10);
 --
--- INSERT INTO measure (name, start, end) VALUES ('jobstat', '2015-01-16', '2015-01-17');
--- INSERT INTO jobstat (id_measure, user, total, fails, wait_min, wait_avg, wait_max) VALUES (last_insert_id(), 'valtri', 10, 2, 0, 50, 100);
--- INSERT INTO jobstat (id_measure, user, total, fails, wait_min, wait_avg, wait_max) VALUES (last_insert_id(), 'nemo', 0, 0, NULL, NULL, NULL);
+-- INSERT INTO measure (name, start, end) VALUES ('jobs', '2015-01-16', '2015-01-17');
+-- INSERT INTO jobs (id_measure, user, jobs, done, fail, real_wait, real_time, wait_min, wait_max) VALUES (last_insert_id(), 'valtri', 6, 10, 2, 1000, 500, 10, 100);
+-- INSERT INTO jobs (id_measure, user, jobs, done, fail, wait_min, wait_max) VALUES (last_insert_id(), 'nemo', 0, 0, 0, NULL, NULL);
 --
 -- How to read values:
 --
@@ -79,13 +79,15 @@ CREATE TABLE quota (
        INDEX(user)
 );
 
-CREATE TABLE jobstat (
+CREATE TABLE jobs (
        id_measure INTEGER NOT NULL,
        user CHAR(20) NULL,
-       total INTEGER,
-       fails INTEGER,
+       jobs INTEGER,
+       done INTEGER,
+       fail INTEGER,
+       real_wait INTEGER,
+       real_time INTEGER,
        wait_min INTEGER,
-       wait_avg INTEGER,
        wait_max INTEGER,
 
        CONSTRAINT PRIMARY KEY (id_measure, user),
@@ -95,7 +97,7 @@ CREATE TABLE jobstat (
 
 INSERT INTO statistic (name, last_seq) VALUES ('hdfs', 0);
 INSERT INTO statistic (name, last_seq) VALUES ('quota', 0);
-INSERT INTO statistic (name, last_seq) VALUES ('jobstat', 0);
+INSERT INTO statistic (name, last_seq) VALUES ('jobs', 0);
 
 DELIMITER //
 
@@ -123,4 +125,4 @@ DELIMITER ;
 CREATE VIEW view_measures AS SELECT m.* FROM measure m, statistic s WHERE s.last_id_measure = m.id_measure;
 CREATE VIEW view_hdfs AS SELECT m.seq, m.time, h.hostname, h.full, h.disk, h.disk_used, h.disk_free, h.block_under, h.block_corrupt, h.block_missing FROM hdfs h, measure m WHERE h.id_measure=m.id_measure;
 CREATE VIEW view_quota AS SELECT m.seq, m.time, q.user, q.used FROM quota q, measure m WHERE q.id_measure=m.id_measure;
-CREATE VIEW view_jobstat AS SELECT m.seq, m.time, m.start, m.end, j.user, j.total, j.fails, j.wait_min, j.wait_avg, j.wait_max FROM jobstat j, measure m WHERE j.id_measure=m.id_measure;
+CREATE VIEW view_jobs AS SELECT m.seq, m.time, m.start, m.end, j.user, j.jobs, j.done, j.fail, j.real_wait, j.real_time, j.wait_min, j.wait_max FROM jobs j, measure m WHERE j.id_measure=m.id_measure;
index f492095..cf95992 100644 (file)
 #      stage       => 'setup',
 #    }
 #    
-#    class{'site_hadoop::accountig':
+#    class{'site_hadoop::accounting':
 #      db_password => 'accpass',
 #      email       => 'mail@example.com',
-#      hdfs        => '0,30 * * *',
+#      accounting_hdfs => '0,30 * * *',
 #    }
 #    
 #    mysql::db { 'accounting':
 #
 # === Parameters
 #
+# ####`accounting_hdfs`
+# = undef
+#
+# Enable storing global HDFS disk and data statistics. The value is time in the cron format. See *man 5 crontab*.
+#
+# ####`accounting_quota`
+# = undef
+#
+# Enable storing user data statistics. The value is time in the cron format. See *man 5 crontab*.
+#
+# ####`accounting_jobs`
+# = undef
+#
+# Enable storing user jobs statistics. The value is time in the cron format. See *man 5 crontab*.
+#
 # ####`db_name`
 # = undef (system default is *accounting*)
 #
 #
 # Email address to send errors from cron.
 #
-# [*hdfs*] undef
+# ####`mapred_hostname`
+# = $::fqdn
 #
-# Enable storing global HDFS disk and data statistics. The value is time in the cron format. See *man 5 crontab*.
+# Hadoop Job History Node hostname for gathering user jobs statistics.
+#
+# ####`mapred_url`
+# = http://*mapred_hostname*:19888, https://*mapred_hostname*:19890
+#
+# HTTP REST URL of Hadoop Job History Node for gathering user jobs statistics. It is derived from *mapred_hostname* and *principal*, but it may be needed to override it anyway (different hosts due to High Availability, non-defalt port, ...).
 #
 # ####`principal`
 # = undef
 # Kerberos principal to access Hadoop.
 #
 class site_hadoop::accounting(
+  $accounting_hdfs = undef,
+  $accounting_quota = undef,
+  $accounting_jobs = undef,
   $db_name = undef,
   $db_user = undef,
   $db_password = undef,
   $email = undef,
-  $hdfs  = undef,
-  $quota  = undef,
+  $mapred_hostname = $::fqdn,
+  $mapred_url = undef,
   $principal = undef,
 ) {
+  include stdlib
+
+  $packages = ['python-pycurl']
+
   # common
+  ensure_packages($packages)
   file{'/usr/local/share/hadoop':
     ensure  => 'directory',
     owner => 'root',
@@ -103,7 +132,7 @@ class site_hadoop::accounting(
     source => 'puppet:///modules/site_hadoop/accounting/hdfs.awk',
     require => File['/usr/local/share/hadoop'],
   }
-  if $hdfs {
+  if $accounting_hdfs {
     file{'/etc/cron.d/accounting-hdfs':
       owner => 'root',
       group => 'root',
@@ -130,7 +159,7 @@ class site_hadoop::accounting(
     source => 'puppet:///modules/site_hadoop/accounting/quota.awk',
     require => File['/usr/local/share/hadoop'],
   }
-  if $quota {
+  if $accounting_quota {
     file{'/etc/cron.d/accounting-quota':
       owner => 'root',
       group => 'root',
@@ -142,4 +171,33 @@ class site_hadoop::accounting(
       ensure => 'absent',
     }
   }
+
+  # user jobs
+  if $mapred_url {
+    $_mapred_url = $mapred_url
+  } else {
+    if $principal {
+      $_mapred_url = "https://${mapred_hostname}:19890"
+    } else {
+      $_mapred_url = "http://${mapred_hostname}:19888"
+    }
+  }
+  file {'/usr/local/bin/accounting-jobs':
+    owner  => 'root',
+    group  => 'root',
+    mode   => '0755',
+    content => template('site_hadoop/accounting/jobs.py.erb'),
+  }
+  if $accounting_jobs {
+    file{'/etc/cron.d/accounting-jobs':
+      owner => 'root',
+      group => 'root',
+      mode  => '0644',
+      content => template('site_hadoop/accounting/cron-jobs.erb'),
+    }
+  } else {
+    file{'/etc/cron.d/accounting-jobs':
+      ensure => 'absent',
+    }
+  }
 }
index 88a7134..05eaca8 100644 (file)
@@ -2,4 +2,4 @@
 MAILTO='<%= @email -%>'
 
 <% end -%>
-<%= @hdfs -%>  hdfs    /usr/local/bin/accounting-hdfs
+<%= @accounting_hdfs -%>       hdfs    /usr/local/bin/accounting-hdfs
diff --git a/templates/accounting/cron-jobs.erb b/templates/accounting/cron-jobs.erb
new file mode 100644 (file)
index 0000000..b08ef5e
--- /dev/null
@@ -0,0 +1,5 @@
+<% if @email -%>
+MAILTO='<%= @email -%>'
+
+<% end -%>
+#<%= @accounting_jobs -%>      hdfs    /usr/local/bin/accounting-jobs
index ef9f4f8..f36c0ab 100644 (file)
@@ -2,4 +2,4 @@
 MAILTO='<%= @email -%>'
 
 <% end -%>
-<%= @quota -%> hdfs    /usr/local/bin/accounting-quota
+<%= @accounting_quota -%>      hdfs    /usr/local/bin/accounting-quota
diff --git a/templates/accounting/jobs.py.erb b/templates/accounting/jobs.py.erb
new file mode 100755 (executable)
index 0000000..d383310
--- /dev/null
@@ -0,0 +1,79 @@
+#! /usr/bin/python2
+
+import pycurl, json
+from io import BytesIO
+
+import time
+import calendar
+import datetime
+
+# epoch time of local date
+#now = datetime.date.today().strftime('%s')
+
+# epoch time of GMT date
+now0 = datetime.date.today()
+now = calendar.timegm(datetime.datetime(now0.year, now0.month, now0.day, 0, 0).timetuple())
+print '# ' + str(now0)
+
+begin = 1000 * (now - 24 * 3600)
+end = 1000 * now
+url = "<%= @_mapred_url -%>/ws/v1/history/mapreduce/jobs?finishedTimeBegin=" + str(begin) + "&finishedTimeEnd=" + str(end)
+print '# ' + url
+
+b = BytesIO()
+c = pycurl.Curl()
+c.setopt(pycurl.URL, url)
+#c.setopt(pycurl.WRITEDATA, b)
+c.setopt(pycurl.WRITEFUNCTION, b.write)
+c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
+c.setopt(pycurl.USERPWD, ":")
+c.perform()
+s = b.getvalue().decode('utf-8')
+
+if c.getinfo(c.RESPONSE_CODE) != 200:
+       print s
+       print 'Status: %d' % c.getinfo(c.RESPONSE_CODE)
+       c.close()
+       b.close()
+       raise Exception()
+
+c.close()
+
+j = json.loads(s)
+#print json.dumps(j, indent=4)
+
+class User:
+       jobs = 0
+       total = 0
+       completed = 0
+       wait = 0
+       time = 0
+       wait_min = -1
+       wait_max = -1
+
+users = dict()
+if j["jobs"]:
+       for job in j["jobs"]["job"]:
+               username = job["user"]
+
+               if username not in users:
+                       users[username] = User()
+
+               user = users[username]
+               wait = job["startTime"] - job["submitTime"]
+
+               user.jobs += 1
+               user.total += job['reducesTotal'] + job['mapsTotal']
+               user.completed += job['reducesCompleted'] + job['mapsCompleted']
+               user.wait += wait
+               user.time += job["finishTime"] - job["startTime"]
+               if user.wait_min == -1 or wait < user.wait_min:
+                       user.wait_min = wait
+               if user.wait_max == -1 or wait > user.wait_max:
+                       user.wait_max = wait
+
+#              print '#[progress]', username, users[username].total, user.completed, user.wait, user.time
+
+print "INSERT INTO measure (name) VALUES ('jobs')"
+for username, user in users.iteritems():
+       print "INSERT INTO jobs (id_measure, user, jobs, done, fail, real_wait, real_time, wait_min, wait_max) VALUES (last_insert_id(), '%s', %d, %d, %d, %d, %d, %d, %d)" % (username, user.jobs, user.completed, user.total - user.completed, user.wait, user.time, user.wait_min, user.wait_max)