--- /dev/null
+--
+-- Accounting for Hadoop
+--
+-- How to add values:
+--
+-- INSERT INTO measure (name) VALUES ('quota');
+-- INSERT INTO quota (id_measure, user, used) VALUES (last_insert_id(), 'valtri', 17);
+-- INSERT INTO quota (id_measure, user, used) VALUES (last_insert_id(), 'nemo', 1);
+--
+-- INSERT INTO hdfs (full, disk, disk_used) VALUES (1024, 1023, 10);
+-- or:
+-- INSERT INTO measure (name) VALUES ('hdfs');
+-- INSERT INTO hdfs (id_measure, full, disk, disk_used, block_under, block_corrupt, block_missing) VALUES (last_insert_id(), 10240, 10230, 100, 0, 0, 0);
+-- INSERT INTO hdfs (id_measure, hostname, state, full, disk, disk_used) VALUES (last_insert_id(), 'hador1', 1, 1024, 1023, 10);
+--
+-- INSERT INTO measure (name, start, end) VALUES ('jobstat', '2015-01-16', '2015-01-17');
+-- INSERT INTO jobstat (id_measure, user, total, fails, wait_min, wait_avg, wait_max) VALUES (last_insert_id(), 'valtri', 10, 2, 0, 50, 100);
+-- INSERT INTO jobstat (id_measure, user, total, fails, wait_min, wait_avg, wait_max) VALUES (last_insert_id(), 'nemo', 0, 0, NULL, NULL, NULL);
+--
+-- How to read values:
+--
+-- a) all history
+--
+-- SELECT * FROM view_hdfs;
+-- SELECT * FROM view_quotas;
+--
+-- b) current values
+--
+-- SELECT h.* FROM view_hdfs h, statistic s WHERE h.seq=s.last_seq;
+-- SELECT h.* FROM view_hdfs h, statistic s WHERE h.seq=s.last_seq;
+--
+
+CREATE TABLE statistic (
+ name CHAR(8) NOT NULL,
+ last_id_measure INTEGER,
+ last_seq INTEGER,
+
+ INDEX (last_id_measure),
+ INDEX (last_seq)
+);
+
+CREATE TABLE measure (
+ id_measure INTEGER AUTO_INCREMENT PRIMARY KEY,
+ name CHAR(8) NOT NULL,
+ seq INTEGER NOT NULL,
+ time TIMESTAMP DEFAULT NOW(),
+ start TIMESTAMP NULL DEFAULT NULL,
+ end TIMESTAMP NULL DEFAULT NULL,
+
+ INDEX (id_measure),
+ INDEX (name),
+ INDEX (seq)
+);
+
+CREATE TABLE hdfs (
+ id_measure INTEGER NOT NULL,
+ hostname CHAR(50),
+ state INTEGER,
+ full BIGINT,
+ disk BIGINT,
+ disk_used BIGINT,
+ disk_free BIGINT,
+ block_under INTEGER,
+ block_corrupt INTEGER,
+ block_missing INTEGER,
+
+ CONSTRAINT PRIMARY KEY (id_measure, hostname),
+ INDEX(id_measure),
+ INDEX(hostname)
+);
+
+CREATE TABLE quota (
+ id_measure INTEGER NOT NULL,
+ user CHAR(20) NOT NULL,
+ used BIGINT,
+
+ CONSTRAINT PRIMARY KEY (id_measure, user),
+ INDEX(id_measure),
+ INDEX(user)
+);
+
+CREATE TABLE jobstat (
+ id_measure INTEGER NOT NULL,
+ user CHAR(20) NULL,
+ total INTEGER,
+ fails INTEGER,
+ wait_min INTEGER,
+ wait_avg INTEGER,
+ wait_max INTEGER,
+
+ CONSTRAINT PRIMARY KEY (id_measure, user),
+ INDEX(id_measure),
+ INDEX(user)
+);
+
+INSERT INTO statistic (name, last_seq) VALUES ('hdfs', 0);
+INSERT INTO statistic (name, last_seq) VALUES ('quota', 0);
+INSERT INTO statistic (name, last_seq) VALUES ('jobstat', 0);
+
+DELIMITER //
+
+CREATE TRIGGER bi_measure BEFORE INSERT ON measure
+FOR EACH ROW BEGIN
+ SET NEW.seq=(SELECT last_seq+1 FROM statistic s WHERE NEW.name=s.name);
+END; //
+
+CREATE TRIGGER ai_measure AFTER INSERT ON measure
+FOR EACH ROW BEGIN
+ UPDATE statistic s SET s.last_seq=s.last_seq+1, s.last_id_measure=NEW.id_measure WHERE s.name=NEW.name;
+END; //
+
+-- not needed, id_measure should be always specified
+CREATE TRIGGER ai_hdfs BEFORE INSERT ON hdfs
+FOR EACH ROW BEGIN
+ IF NEW.id_measure IS NULL OR NEW.id_measure=0 THEN
+ INSERT INTO measure (name) VALUES ('hdfs');
+ SET NEW.id_measure=last_insert_id();
+ END IF;
+END; //
+
+DELIMITER ;
+
+CREATE VIEW view_measures AS SELECT m.* FROM measure m, statistic s WHERE s.last_id_measure = m.id_measure;
+CREATE VIEW view_hdfs AS SELECT m.seq, m.time, h.hostname, h.full, h.disk, h.disk_used, h.disk_free, h.block_under, h.block_corrupt, h.block_missing FROM hdfs h, measure m WHERE h.id_measure=m.id_measure;
+CREATE VIEW view_quotas AS SELECT m.seq, m.time, q.user, q.used FROM quota q, measure m WHERE q.id_measure=m.id_measure;
+CREATE VIEW view_jobstat AS SELECT m.seq, m.time, m.start, m.end, j.user, j.total, j.fails, j.wait_min, j.wait_avg, j.wait_max FROM jobstat j, measure m WHERE j.id_measure=m.id_measure;
--- /dev/null
+#
+# Parsing output of:
+#
+# hdfs dfsadmin -report
+#
+
+function dbstr(s) {
+ if (s) { return "'" s "'" }
+ else { return "NULL" }
+}
+
+function dbi(i) {
+ if (i >= 0) { return i }
+ else { return "NULL" }
+}
+
+function reset() {
+ name="(none)";
+ full=-1;
+ disk=-1;
+ disk_free=-1;
+ disk_used=-1;
+ block_under=-1;
+ block_corrupt=-1;
+ block_missing=-1;
+# cache=-1;
+# cache_used=-1;
+# cache_free=-1;
+}
+
+BEGIN {
+ reset();
+ name="all";
+ state=-1;
+
+ FS="[: ]+";
+ CONVFMT="%d";
+
+ print "INSERT INTO measure (name) VALUES ('hdfs');";
+}
+
+/^Live datanodes.*/ {state=1}
+/^Dead datanodes.*/ {state=2}
+/^Decommissioned .*/ {state=3}
+
+/^Hostname:.*/ {name=$2}
+/^Name:.*/ {ip=$2}
+/^Configured Capacity:.*/ {full=$3}
+/^Present Capacity:.*/ {disk=$3}
+/^DFS Remaining:.*/ {disk_free=$3}
+/^DFS Used:.*/ {disk_used=$3}
+/^Under replicated blocks:.*/ {block_under=$4}
+/^Blocks with corrupt replicas:.*/ {block_corrupt=$5}
+/^Missing blocks:.*/ {block_missing=$3}
+#/^Configured Cache Capacity:.*/{cache=$4}
+#/^Cache Used:.*/ {cache_used=$3}
+#/^Cache Remaining:.*/ {cache_free=$3}
+
+/^$/ {
+ if (name != "(none)" && ip !~ /^10\./) {
+ print "INSERT INTO hdfs (id_measure, hostname, state, full, disk, disk_free, disk_used, block_under, block_corrupt, block_missing) VALUES (last_insert_id(), " dbstr(name) ", " dbi(state) ", " dbi(full) ", IFNULL(" dbi(disk) ", " dbi(disk_free) " + " dbi(disk_used) "), " dbi(disk_free) ", " dbi(disk_used) ", " dbi(block_under) ", " dbi(block_corrupt) ", " dbi(block_missing) ");";
+ }
+ reset()
+}
--- /dev/null
+# == Class site_hadoop::accounting
+#
+# Requires:
+# * database
+# * hdfs user and group (=hadoop)
+#
+# For example using puppetlabs-mysql and cesnet-hadoop:
+#
+# mysql::db { 'accounting':
+# user => 'accounting',
+# password => 'accpass',
+# host => 'localhost',
+# grant => ['SELECT', 'INSERT', 'UPDATE', 'DELETE'],
+# sql => '/usr/local/share/hadoop/accounting.sql',
+# }
+#
+# Class['site_hadoop::accounting'] -> Mysql::Db['accounting']
+# Class['hadoop::nameserver::install'] -> Class['site_hadoop::accounting']
+#
+# === Parameters
+#
+# [*email*] undef
+#
+# Email to send errors from cron.
+#
+# [*hdfs*] undef
+#
+# Enable storing global HDFS disk and data statistics. The value is time in the cron format. See *man 5 crontab*.
+#
+class site_hadoop::accounting(
+ $email = undef,
+ $hdfs = undef,
+) {
+ file {'/usr/local/bin/accounting-hdfs':
+ owner => 'root',
+ group => 'root',
+ mode => '0755',
+ content => template('site_hadoop/accounting/hdfs.sh.erb'),
+ }
+
+ file{'/usr/local/share/hadoop':
+ ensure => 'directory',
+ owner => 'root',
+ group => 'root',
+ mode => '0755',
+ }
+ ->
+ file {'/usr/local/share/hadoop/accounting-hdfs.awk':
+ owner => 'root',
+ group => 'root',
+ mode => '0644',
+ source => 'puppet:///modules/site_hadoop/accounting/hdfs.awk',
+ }
+
+ file{'/usr/local/share/hadoop/accounting.sql':
+ owner => 'root',
+ group => 'root',
+ mode => '0644',
+ source => 'puppet:///modules/site_hadoop/accounting/create.sql',
+ }
+
+ $db_name = $site_hadoop::db_name
+ $db_user = $site_hadoop::db_user
+ $db_password = $site_hadoop::db_password
+ if $db_name or $db_user or $db_password {
+ file{"${site_hadoop::defaultconfdir}/hadoop-accounting":
+ owner => 'hdfs',
+ group => 'hdfs',
+ mode => '0400',
+ content => template('site_hadoop/accounting/hadoop-accounting.erb'),
+ }
+ } else {
+ file{"${site_hadoop::defaultconfdir}/hadoop-accounting":
+ ensure => 'absent',
+ }
+ }
+
+ if $hdfs {
+ file{'/etc/cron.d/accounting-hdfs':
+ owner => 'root',
+ group => 'root',
+ mode => '0644',
+ content => template('site_hadoop/accounting/cron-hdfs.erb'),
+ }
+ } else {
+ file{'/etc/cron.d/accounting-hdfs':
+ ensure => 'absent',
+ }
+ }
+}