-- INSERT INTO hdfs (id_measure, full, disk, disk_used, block_under, block_corrupt, block_missing) VALUES (last_insert_id(), 10240, 10230, 100, 0, 0, 0);
-- INSERT INTO hdfs (id_measure, hostname, state, full, disk, disk_used) VALUES (last_insert_id(), 'hador1', 1, 1024, 1023, 10);
--
--- INSERT INTO measure (name, start, end) VALUES ('jobstat', '2015-01-16', '2015-01-17');
--- INSERT INTO jobstat (id_measure, user, total, fails, wait_min, wait_avg, wait_max) VALUES (last_insert_id(), 'valtri', 10, 2, 0, 50, 100);
--- INSERT INTO jobstat (id_measure, user, total, fails, wait_min, wait_avg, wait_max) VALUES (last_insert_id(), 'nemo', 0, 0, NULL, NULL, NULL);
+-- INSERT INTO measure (name, start, end) VALUES ('jobs', '2015-01-16', '2015-01-17');
+-- INSERT INTO jobs (id_measure, user, jobs, done, fail, real_wait, real_time, wait_min, wait_max) VALUES (last_insert_id(), 'valtri', 6, 10, 2, 1000, 500, 10, 100);
+-- INSERT INTO jobs (id_measure, user, jobs, done, fail, wait_min, wait_max) VALUES (last_insert_id(), 'nemo', 0, 0, 0, NULL, NULL);
--
-- How to read values:
--
INDEX(user)
);
-CREATE TABLE jobstat (
+CREATE TABLE jobs (
id_measure INTEGER NOT NULL,
user CHAR(20) NULL,
- total INTEGER,
- fails INTEGER,
+ jobs INTEGER,
+ done INTEGER,
+ fail INTEGER,
+ real_wait INTEGER,
+ real_time INTEGER,
wait_min INTEGER,
- wait_avg INTEGER,
wait_max INTEGER,
CONSTRAINT PRIMARY KEY (id_measure, user),
INSERT INTO statistic (name, last_seq) VALUES ('hdfs', 0);
INSERT INTO statistic (name, last_seq) VALUES ('quota', 0);
-INSERT INTO statistic (name, last_seq) VALUES ('jobstat', 0);
+INSERT INTO statistic (name, last_seq) VALUES ('jobs', 0);
DELIMITER //
CREATE VIEW view_measures AS SELECT m.* FROM measure m, statistic s WHERE s.last_id_measure = m.id_measure;
CREATE VIEW view_hdfs AS SELECT m.seq, m.time, h.hostname, h.full, h.disk, h.disk_used, h.disk_free, h.block_under, h.block_corrupt, h.block_missing FROM hdfs h, measure m WHERE h.id_measure=m.id_measure;
CREATE VIEW view_quota AS SELECT m.seq, m.time, q.user, q.used FROM quota q, measure m WHERE q.id_measure=m.id_measure;
-CREATE VIEW view_jobstat AS SELECT m.seq, m.time, m.start, m.end, j.user, j.total, j.fails, j.wait_min, j.wait_avg, j.wait_max FROM jobstat j, measure m WHERE j.id_measure=m.id_measure;
+CREATE VIEW view_jobs AS SELECT m.seq, m.time, m.start, m.end, j.user, j.jobs, j.done, j.fail, j.real_wait, j.real_time, j.wait_min, j.wait_max FROM jobs j, measure m WHERE j.id_measure=m.id_measure;
# stage => 'setup',
# }
#
-# class{'site_hadoop::accountig':
+# class{'site_hadoop::accounting':
# db_password => 'accpass',
# email => 'mail@example.com',
-# hdfs => '0,30 * * *',
+# accounting_hdfs => '0,30 * * *',
# }
#
# mysql::db { 'accounting':
#
# === Parameters
#
+# ####`accounting_hdfs`
+# = undef
+#
+# Enable storing global HDFS disk and data statistics. The value is time in the cron format. See *man 5 crontab*.
+#
+# ####`accounting_quota`
+# = undef
+#
+# Enable storing user data statistics. The value is time in the cron format. See *man 5 crontab*.
+#
+# ####`accounting_jobs`
+# = undef
+#
+# Enable storing user jobs statistics. The value is time in the cron format. See *man 5 crontab*.
+#
# ####`db_name`
# = undef (system default is *accounting*)
#
#
# Email address to send errors from cron.
#
-# [*hdfs*] undef
+# ####`mapred_hostname`
+# = $::fqdn
#
-# Enable storing global HDFS disk and data statistics. The value is time in the cron format. See *man 5 crontab*.
+# Hadoop Job History Node hostname for gathering user jobs statistics.
+#
+# ####`mapred_url`
+# = http://*mapred_hostname*:19888, https://*mapred_hostname*:19890
+#
+# HTTP REST URL of Hadoop Job History Node for gathering user jobs statistics. It is derived from *mapred_hostname* and *principal*, but it may be needed to override it anyway (different hosts due to High Availability, non-defalt port, ...).
#
# ####`principal`
# = undef
# Kerberos principal to access Hadoop.
#
class site_hadoop::accounting(
+ $accounting_hdfs = undef,
+ $accounting_quota = undef,
+ $accounting_jobs = undef,
$db_name = undef,
$db_user = undef,
$db_password = undef,
$email = undef,
- $hdfs = undef,
- $quota = undef,
+ $mapred_hostname = $::fqdn,
+ $mapred_url = undef,
$principal = undef,
) {
+ include stdlib
+
+ $packages = ['python-pycurl']
+
# common
+ ensure_packages($packages)
file{'/usr/local/share/hadoop':
ensure => 'directory',
owner => 'root',
source => 'puppet:///modules/site_hadoop/accounting/hdfs.awk',
require => File['/usr/local/share/hadoop'],
}
- if $hdfs {
+ if $accounting_hdfs {
file{'/etc/cron.d/accounting-hdfs':
owner => 'root',
group => 'root',
source => 'puppet:///modules/site_hadoop/accounting/quota.awk',
require => File['/usr/local/share/hadoop'],
}
- if $quota {
+ if $accounting_quota {
file{'/etc/cron.d/accounting-quota':
owner => 'root',
group => 'root',
ensure => 'absent',
}
}
+
+ # user jobs
+ if $mapred_url {
+ $_mapred_url = $mapred_url
+ } else {
+ if $principal {
+ $_mapred_url = "https://${mapred_hostname}:19890"
+ } else {
+ $_mapred_url = "http://${mapred_hostname}:19888"
+ }
+ }
+ file {'/usr/local/bin/accounting-jobs':
+ owner => 'root',
+ group => 'root',
+ mode => '0755',
+ content => template('site_hadoop/accounting/jobs.py.erb'),
+ }
+ if $accounting_jobs {
+ file{'/etc/cron.d/accounting-jobs':
+ owner => 'root',
+ group => 'root',
+ mode => '0644',
+ content => template('site_hadoop/accounting/cron-jobs.erb'),
+ }
+ } else {
+ file{'/etc/cron.d/accounting-jobs':
+ ensure => 'absent',
+ }
+ }
}
MAILTO='<%= @email -%>'
<% end -%>
-<%= @hdfs -%> hdfs /usr/local/bin/accounting-hdfs
+<%= @accounting_hdfs -%> hdfs /usr/local/bin/accounting-hdfs
--- /dev/null
+<% if @email -%>
+MAILTO='<%= @email -%>'
+
+<% end -%>
+#<%= @accounting_jobs -%> hdfs /usr/local/bin/accounting-jobs
MAILTO='<%= @email -%>'
<% end -%>
-<%= @quota -%> hdfs /usr/local/bin/accounting-quota
+<%= @accounting_quota -%> hdfs /usr/local/bin/accounting-quota
--- /dev/null
+#! /usr/bin/python2
+
+import pycurl, json
+from io import BytesIO
+
+import time
+import calendar
+import datetime
+
+# epoch time of local date
+#now = datetime.date.today().strftime('%s')
+
+# epoch time of GMT date
+now0 = datetime.date.today()
+now = calendar.timegm(datetime.datetime(now0.year, now0.month, now0.day, 0, 0).timetuple())
+print '# ' + str(now0)
+
+begin = 1000 * (now - 24 * 3600)
+end = 1000 * now
+url = "<%= @_mapred_url -%>/ws/v1/history/mapreduce/jobs?finishedTimeBegin=" + str(begin) + "&finishedTimeEnd=" + str(end)
+print '# ' + url
+
+b = BytesIO()
+c = pycurl.Curl()
+c.setopt(pycurl.URL, url)
+#c.setopt(pycurl.WRITEDATA, b)
+c.setopt(pycurl.WRITEFUNCTION, b.write)
+c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE)
+c.setopt(pycurl.USERPWD, ":")
+c.perform()
+s = b.getvalue().decode('utf-8')
+
+if c.getinfo(c.RESPONSE_CODE) != 200:
+ print s
+ print 'Status: %d' % c.getinfo(c.RESPONSE_CODE)
+ c.close()
+ b.close()
+ raise Exception()
+
+c.close()
+
+j = json.loads(s)
+#print json.dumps(j, indent=4)
+
+class User:
+ jobs = 0
+ total = 0
+ completed = 0
+ wait = 0
+ time = 0
+ wait_min = -1
+ wait_max = -1
+
+users = dict()
+if j["jobs"]:
+ for job in j["jobs"]["job"]:
+ username = job["user"]
+
+ if username not in users:
+ users[username] = User()
+
+ user = users[username]
+ wait = job["startTime"] - job["submitTime"]
+
+ user.jobs += 1
+ user.total += job['reducesTotal'] + job['mapsTotal']
+ user.completed += job['reducesCompleted'] + job['mapsCompleted']
+ user.wait += wait
+ user.time += job["finishTime"] - job["startTime"]
+ if user.wait_min == -1 or wait < user.wait_min:
+ user.wait_min = wait
+ if user.wait_max == -1 or wait > user.wait_max:
+ user.wait_max = wait
+
+# print '#[progress]', username, users[username].total, user.completed, user.wait, user.time
+
+print "INSERT INTO measure (name) VALUES ('jobs')"
+for username, user in users.iteritems():
+ print "INSERT INTO jobs (id_measure, user, jobs, done, fail, real_wait, real_time, wait_min, wait_max) VALUES (last_insert_id(), '%s', %d, %d, %d, %d, %d, %d, %d)" % (username, user.jobs, user.completed, user.total - user.completed, user.wait, user.time, user.wait_min, user.wait_max)