From: František Dvořák Date: Sun, 18 Jan 2015 00:45:27 +0000 (+0100) Subject: Preliminary implementation for user jobs statistics. X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=9ed84e8ac854719321b731badb5f0d8dcbc47fd6;p=meta-site_hadoop.git Preliminary implementation for user jobs statistics. --- diff --git a/files/accounting/create.sql b/files/accounting/create.sql index 157fc5a..58c84c5 100644 --- a/files/accounting/create.sql +++ b/files/accounting/create.sql @@ -13,9 +13,9 @@ -- INSERT INTO hdfs (id_measure, full, disk, disk_used, block_under, block_corrupt, block_missing) VALUES (last_insert_id(), 10240, 10230, 100, 0, 0, 0); -- INSERT INTO hdfs (id_measure, hostname, state, full, disk, disk_used) VALUES (last_insert_id(), 'hador1', 1, 1024, 1023, 10); -- --- INSERT INTO measure (name, start, end) VALUES ('jobstat', '2015-01-16', '2015-01-17'); --- INSERT INTO jobstat (id_measure, user, total, fails, wait_min, wait_avg, wait_max) VALUES (last_insert_id(), 'valtri', 10, 2, 0, 50, 100); --- INSERT INTO jobstat (id_measure, user, total, fails, wait_min, wait_avg, wait_max) VALUES (last_insert_id(), 'nemo', 0, 0, NULL, NULL, NULL); +-- INSERT INTO measure (name, start, end) VALUES ('jobs', '2015-01-16', '2015-01-17'); +-- INSERT INTO jobs (id_measure, user, jobs, done, fail, real_wait, real_time, wait_min, wait_max) VALUES (last_insert_id(), 'valtri', 6, 10, 2, 1000, 500, 10, 100); +-- INSERT INTO jobs (id_measure, user, jobs, done, fail, wait_min, wait_max) VALUES (last_insert_id(), 'nemo', 0, 0, 0, NULL, NULL); -- -- How to read values: -- @@ -79,13 +79,15 @@ CREATE TABLE quota ( INDEX(user) ); -CREATE TABLE jobstat ( +CREATE TABLE jobs ( id_measure INTEGER NOT NULL, user CHAR(20) NULL, - total INTEGER, - fails INTEGER, + jobs INTEGER, + done INTEGER, + fail INTEGER, + real_wait INTEGER, + real_time INTEGER, wait_min INTEGER, - wait_avg INTEGER, wait_max INTEGER, CONSTRAINT PRIMARY KEY (id_measure, user), @@ -95,7 +97,7 @@ CREATE TABLE jobstat ( INSERT INTO statistic (name, last_seq) VALUES ('hdfs', 0); INSERT INTO statistic (name, last_seq) VALUES ('quota', 0); -INSERT INTO statistic (name, last_seq) VALUES ('jobstat', 0); +INSERT INTO statistic (name, last_seq) VALUES ('jobs', 0); DELIMITER // @@ -123,4 +125,4 @@ DELIMITER ; CREATE VIEW view_measures AS SELECT m.* FROM measure m, statistic s WHERE s.last_id_measure = m.id_measure; CREATE VIEW view_hdfs AS SELECT m.seq, m.time, h.hostname, h.full, h.disk, h.disk_used, h.disk_free, h.block_under, h.block_corrupt, h.block_missing FROM hdfs h, measure m WHERE h.id_measure=m.id_measure; CREATE VIEW view_quota AS SELECT m.seq, m.time, q.user, q.used FROM quota q, measure m WHERE q.id_measure=m.id_measure; -CREATE VIEW view_jobstat AS SELECT m.seq, m.time, m.start, m.end, j.user, j.total, j.fails, j.wait_min, j.wait_avg, j.wait_max FROM jobstat j, measure m WHERE j.id_measure=m.id_measure; +CREATE VIEW view_jobs AS SELECT m.seq, m.time, m.start, m.end, j.user, j.jobs, j.done, j.fail, j.real_wait, j.real_time, j.wait_min, j.wait_max FROM jobs j, measure m WHERE j.id_measure=m.id_measure; diff --git a/manifests/accounting.pp b/manifests/accounting.pp index f492095..cf95992 100644 --- a/manifests/accounting.pp +++ b/manifests/accounting.pp @@ -12,10 +12,10 @@ # stage => 'setup', # } # -# class{'site_hadoop::accountig': +# class{'site_hadoop::accounting': # db_password => 'accpass', # email => 'mail@example.com', -# hdfs => '0,30 * * *', +# accounting_hdfs => '0,30 * * *', # } # # mysql::db { 'accounting': @@ -31,6 +31,21 @@ # # === Parameters # +# ####`accounting_hdfs` +# = undef +# +# Enable storing global HDFS disk and data statistics. The value is time in the cron format. See *man 5 crontab*. +# +# ####`accounting_quota` +# = undef +# +# Enable storing user data statistics. The value is time in the cron format. See *man 5 crontab*. +# +# ####`accounting_jobs` +# = undef +# +# Enable storing user jobs statistics. The value is time in the cron format. See *man 5 crontab*. +# # ####`db_name` # = undef (system default is *accounting*) # @@ -51,9 +66,15 @@ # # Email address to send errors from cron. # -# [*hdfs*] undef +# ####`mapred_hostname` +# = $::fqdn # -# Enable storing global HDFS disk and data statistics. The value is time in the cron format. See *man 5 crontab*. +# Hadoop Job History Node hostname for gathering user jobs statistics. +# +# ####`mapred_url` +# = http://*mapred_hostname*:19888, https://*mapred_hostname*:19890 +# +# HTTP REST URL of Hadoop Job History Node for gathering user jobs statistics. It is derived from *mapred_hostname* and *principal*, but it may be needed to override it anyway (different hosts due to High Availability, non-defalt port, ...). # # ####`principal` # = undef @@ -61,15 +82,23 @@ # Kerberos principal to access Hadoop. # class site_hadoop::accounting( + $accounting_hdfs = undef, + $accounting_quota = undef, + $accounting_jobs = undef, $db_name = undef, $db_user = undef, $db_password = undef, $email = undef, - $hdfs = undef, - $quota = undef, + $mapred_hostname = $::fqdn, + $mapred_url = undef, $principal = undef, ) { + include stdlib + + $packages = ['python-pycurl'] + # common + ensure_packages($packages) file{'/usr/local/share/hadoop': ensure => 'directory', owner => 'root', @@ -103,7 +132,7 @@ class site_hadoop::accounting( source => 'puppet:///modules/site_hadoop/accounting/hdfs.awk', require => File['/usr/local/share/hadoop'], } - if $hdfs { + if $accounting_hdfs { file{'/etc/cron.d/accounting-hdfs': owner => 'root', group => 'root', @@ -130,7 +159,7 @@ class site_hadoop::accounting( source => 'puppet:///modules/site_hadoop/accounting/quota.awk', require => File['/usr/local/share/hadoop'], } - if $quota { + if $accounting_quota { file{'/etc/cron.d/accounting-quota': owner => 'root', group => 'root', @@ -142,4 +171,33 @@ class site_hadoop::accounting( ensure => 'absent', } } + + # user jobs + if $mapred_url { + $_mapred_url = $mapred_url + } else { + if $principal { + $_mapred_url = "https://${mapred_hostname}:19890" + } else { + $_mapred_url = "http://${mapred_hostname}:19888" + } + } + file {'/usr/local/bin/accounting-jobs': + owner => 'root', + group => 'root', + mode => '0755', + content => template('site_hadoop/accounting/jobs.py.erb'), + } + if $accounting_jobs { + file{'/etc/cron.d/accounting-jobs': + owner => 'root', + group => 'root', + mode => '0644', + content => template('site_hadoop/accounting/cron-jobs.erb'), + } + } else { + file{'/etc/cron.d/accounting-jobs': + ensure => 'absent', + } + } } diff --git a/templates/accounting/cron-hdfs.erb b/templates/accounting/cron-hdfs.erb index 88a7134..05eaca8 100644 --- a/templates/accounting/cron-hdfs.erb +++ b/templates/accounting/cron-hdfs.erb @@ -2,4 +2,4 @@ MAILTO='<%= @email -%>' <% end -%> -<%= @hdfs -%> hdfs /usr/local/bin/accounting-hdfs +<%= @accounting_hdfs -%> hdfs /usr/local/bin/accounting-hdfs diff --git a/templates/accounting/cron-jobs.erb b/templates/accounting/cron-jobs.erb new file mode 100644 index 0000000..b08ef5e --- /dev/null +++ b/templates/accounting/cron-jobs.erb @@ -0,0 +1,5 @@ +<% if @email -%> +MAILTO='<%= @email -%>' + +<% end -%> +#<%= @accounting_jobs -%> hdfs /usr/local/bin/accounting-jobs diff --git a/templates/accounting/cron-quota.erb b/templates/accounting/cron-quota.erb index ef9f4f8..f36c0ab 100644 --- a/templates/accounting/cron-quota.erb +++ b/templates/accounting/cron-quota.erb @@ -2,4 +2,4 @@ MAILTO='<%= @email -%>' <% end -%> -<%= @quota -%> hdfs /usr/local/bin/accounting-quota +<%= @accounting_quota -%> hdfs /usr/local/bin/accounting-quota diff --git a/templates/accounting/jobs.py.erb b/templates/accounting/jobs.py.erb new file mode 100755 index 0000000..d383310 --- /dev/null +++ b/templates/accounting/jobs.py.erb @@ -0,0 +1,79 @@ +#! /usr/bin/python2 + +import pycurl, json +from io import BytesIO + +import time +import calendar +import datetime + +# epoch time of local date +#now = datetime.date.today().strftime('%s') + +# epoch time of GMT date +now0 = datetime.date.today() +now = calendar.timegm(datetime.datetime(now0.year, now0.month, now0.day, 0, 0).timetuple()) +print '# ' + str(now0) + +begin = 1000 * (now - 24 * 3600) +end = 1000 * now +url = "<%= @_mapred_url -%>/ws/v1/history/mapreduce/jobs?finishedTimeBegin=" + str(begin) + "&finishedTimeEnd=" + str(end) +print '# ' + url + +b = BytesIO() +c = pycurl.Curl() +c.setopt(pycurl.URL, url) +#c.setopt(pycurl.WRITEDATA, b) +c.setopt(pycurl.WRITEFUNCTION, b.write) +c.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_GSSNEGOTIATE) +c.setopt(pycurl.USERPWD, ":") +c.perform() +s = b.getvalue().decode('utf-8') + +if c.getinfo(c.RESPONSE_CODE) != 200: + print s + print 'Status: %d' % c.getinfo(c.RESPONSE_CODE) + c.close() + b.close() + raise Exception() + +c.close() + +j = json.loads(s) +#print json.dumps(j, indent=4) + +class User: + jobs = 0 + total = 0 + completed = 0 + wait = 0 + time = 0 + wait_min = -1 + wait_max = -1 + +users = dict() +if j["jobs"]: + for job in j["jobs"]["job"]: + username = job["user"] + + if username not in users: + users[username] = User() + + user = users[username] + wait = job["startTime"] - job["submitTime"] + + user.jobs += 1 + user.total += job['reducesTotal'] + job['mapsTotal'] + user.completed += job['reducesCompleted'] + job['mapsCompleted'] + user.wait += wait + user.time += job["finishTime"] - job["startTime"] + if user.wait_min == -1 or wait < user.wait_min: + user.wait_min = wait + if user.wait_max == -1 or wait > user.wait_max: + user.wait_max = wait + +# print '#[progress]', username, users[username].total, user.completed, user.wait, user.time + +print "INSERT INTO measure (name) VALUES ('jobs')" +for username, user in users.iteritems(): + print "INSERT INTO jobs (id_measure, user, jobs, done, fail, real_wait, real_time, wait_min, wait_max) VALUES (last_insert_id(), '%s', %d, %d, %d, %d, %d, %d, %d)" % (username, user.jobs, user.completed, user.total - user.completed, user.wait, user.time, user.wait_min, user.wait_max)