imported statistics counting
authorAleš Křenek <ljocha@ics.muni.cz>
Fri, 10 Dec 2004 13:01:20 +0000 (13:01 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Fri, 10 Dec 2004 13:01:20 +0000 (13:01 +0000)
org.glite.lb.server/Makefile
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/stats.c [new file with mode: 0644]
org.glite.lb.server/src/stats.h [new file with mode: 0644]

index ac6cd76..4489c06 100644 (file)
@@ -122,7 +122,7 @@ SERVER_OBJS:= bkserverd.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_su
        lb_xml_parse_V21.o \
        lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \
        stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \
-       notification.o il_notification.o notif_match.o
+       notification.o il_notification.o notif_match.o stats.o
 
 INDEX_OBJS:= index.o index_parse.o jobstat_supp.o lbs_db.o openserver.o \
        jobstat.o query.o lock.o get_events.o write2rgma.o index_lex.o \
index caa56f4..820f3f2 100644 (file)
@@ -95,6 +95,7 @@ static const int              one = 1;
 static int                             noAuth = 0;
 static int                             noIndex = 0;
 static int                             strict_locking = 0;
+static int count_statistics = 0;
 static int                             hardJobsLimit = 0;
 static int                             hardEventsLimit = 0;
 static int                             hardRespSizeLimit = 0;
@@ -146,13 +147,14 @@ static struct option opts[] = {
        {"notif-dur",   1, NULL,        'N'},
        {"notif-il-sock",       1, NULL,        'X'},
        {"notif-il-fprefix",    1, NULL,        'Y'},
+       {"count-statistics",    1, NULL,        'T'},
        {NULL,0,NULL,0}
 };
 
 #ifdef GLITE_LB_SERVER_WITH_WS
-static const char *get_opt_string = "a:c:k:C:V:p:w:drm:ns:l:L:N:i:S:D:X:Y:";
+static const char *get_opt_string = "a:c:k:C:V:p:w:drm:ns:l:L:N:i:S:D:X:Y:T:";
 #else
-static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:";
+static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:";
 #endif /* GLITE_LB_SERVER_WITH_WS */
 
 static void usage(char *me) 
@@ -185,6 +187,8 @@ static void usage(char *me)
                "\t--strict-locking=1\t lock jobs also on storing events (may be slow)\n"
                "\t--notif-il-sock\t socket to send notifications\n"
                "\t--notif-il-fprefix\t file prefix for notifications\n"
+               "\t--count-statistics=1\t count certain statistics on jobs\n"
+               "\t                  =2\t ... and allow anonymous access\n"
        ,me);
 }
 
@@ -347,6 +351,8 @@ int main(int argc, char *argv[])
                          break;
                case 'P': strict_locking = 1;
                          break;
+               case 'T': count_statistics = atoi(optarg);
+                         break;
                case '?': usage(name); return 1;
        }
 
@@ -517,6 +523,7 @@ a.sin_addr.s_addr = INADDR_ANY;
                fprintf(stderr,"%s: open database: %s (%s)\n",argv[0],et,ed);
                return 1;
        }
+       if (count_statistics) edg_wll_InitStatistics(ctx);
        edg_wll_Close(ctx);
        edg_wll_FreeContext(ctx);
 
@@ -716,6 +723,7 @@ int bk_handle_connection(int conn, struct timeval client_start, void *data)
        getpeername(conn, (struct sockaddr *)&a, &alen);
        ctx->connPool[ctx->connToUse].peerName = strdup(inet_ntoa(a.sin_addr));
        ctx->connPool[ctx->connToUse].peerPort = ntohs(a.sin_port);
+       ctx->count_statistics = count_statistics;
 
        /* not a critical operation, do not waste all SLAVE_TIMEOUT */
        h_errno = asyn_gethostbyaddr(&name, (char *)&a.sin_addr.s_addr,sizeof(a.sin_addr.s_addr), AF_INET, &dns_to);
diff --git a/org.glite.lb.server/src/stats.c b/org.glite.lb.server/src/stats.c
new file mode 100644 (file)
index 0000000..1e8f6cb
--- /dev/null
@@ -0,0 +1,427 @@
+#include <unistd.h>
+#include <sys/file.h>
+#include <sys/mman.h>
+#include <string.h>
+#include <assert.h>
+#include <stdio.h>
+
+#include "glite/lb/events.h"
+#include "glite/lb/jobstat.h"
+#include "glite/lb/context-int.h"
+
+#include "glite/wmsutils/jobid/strmd5.h"
+
+#include "stats.h"
+static int stats_inc_counter(edg_wll_Context,const edg_wll_JobStat *,edg_wll_Stats *);
+static int stats_record_duration(edg_wll_Context,const edg_wll_JobStat *,const edg_wll_JobStat *,edg_wll_Stats *);
+
+#define dprintf(x) { printf("[%d] ",getpid()); printf x; }
+
+/* XXX: should be configurable at run time */
+static struct _edg_wll_StatsArchive default_archives[] = { 
+       { 10, 60 },
+       { 60, 30 },
+       { 900, 12 },
+       { 0, 0 }
+};
+
+static edg_wll_QueryRec default_group[2] = { 
+       { EDG_WLL_QUERY_ATTR_DESTINATION, },
+       { EDG_WLL_QUERY_ATTR_UNDEF, }
+};
+
+static edg_wll_Stats default_stats[] = {
+       { STATS_COUNT, default_group, EDG_WLL_JOB_READY, 0, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_SCHEDULED, 0, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_RUNNING, 0, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_DONE, EDG_WLL_STAT_OK, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_DONE, EDG_WLL_STAT_FAILED, default_archives },
+       { STATS_DURATION, default_group, EDG_WLL_JOB_SCHEDULED, 0, default_archives },
+       { STATS_DURATION, default_group, EDG_WLL_JOB_RUNNING, 0, default_archives },
+       { STATS_UNDEF, }
+       
+};
+
+int edg_wll_InitStatistics(edg_wll_Context ctx)
+{
+       edg_wll_Stats *stats = default_stats;   /* XXX: hardcoded */
+       int     i,j;
+
+       for (i=0; stats[i].type; i++) {
+               char    fname[50],*zero;
+
+               strcpy(fname,"/tmp/lb_stats.XXXXXX");
+               stats[i].fd = mkstemp(fname);
+               if (stats[i].fd < 0) return edg_wll_SetError(ctx,errno,fname);
+               /* XXX: should be initialized from LB data */
+               stats[i].grpno = 0;
+
+               stats[i].grpsize = sizeof(struct edg_wll_stats_group)-sizeof(struct edg_wll_stats_archive);
+
+               for (j=0; stats[i].archives[j].interval; j++) {
+                       stats[i].grpsize += sizeof(struct edg_wll_stats_archive)-sizeof(struct edg_wll_stats_cell);
+                       stats[i].grpsize += stats[i].archives[j].length * sizeof(struct edg_wll_stats_cell);
+               }
+               zero = calloc(1,stats[i].grpsize);
+               write(stats[i].fd,zero,stats[i].grpsize);
+               stats[i].map = mmap(NULL,sizeof zero,PROT_READ|PROT_WRITE,MAP_SHARED,stats[i].fd,0);
+               if (stats[i].map == MAP_FAILED) return edg_wll_SetError(ctx,errno,"mmap()");
+
+               dprintf(("stats: using %s\n",fname));
+               unlink(fname);
+       }
+       return 0;
+}
+
+int edg_wll_UpdateStatistics(
+       edg_wll_Context ctx,
+       const edg_wll_JobStat *from,
+       const edg_wll_Event *e,
+       const edg_wll_JobStat *to)
+{
+       int     i;
+
+       /* XXX: should be propagated somhow via context, not hardcoded */
+       edg_wll_Stats   *stats = default_stats;
+
+       if (!ctx->count_statistics) return 0;
+
+       for (i=0; stats[i].type; i++) switch (stats[i].type) {
+               case STATS_COUNT:
+                       if (!to) continue;
+                       if (to->state == stats[i].major && (!from || from->state != to->state)) {
+                               switch (to->state) {
+                                       case EDG_WLL_JOB_DONE:
+                                               if (to->done_code != stats[i].minor) continue;
+                                               break;
+                                       default: break;
+                               }
+                               stats_inc_counter(ctx,to,stats+i);
+                       }
+                       break;
+               case STATS_DURATION:
+                       if (!to || !from) continue;
+                       if (from->state == stats[i].major && from->state != to->state) 
+                               stats_record_duration(ctx,from,to,stats+i);
+               default: break;
+       }
+       return 0;
+}
+
+static struct edg_wll_stats_archive *archive_skip(
+               const struct edg_wll_stats_archive *a,
+               int     len)
+{
+       return (struct edg_wll_stats_archive *) 
+                       (((char *) a) + sizeof(struct edg_wll_stats_archive)
+                               - sizeof(struct edg_wll_stats_cell)
+                               + len * sizeof(struct edg_wll_stats_cell)
+                       );
+}
+
+static int stats_inc_counter(edg_wll_Context ctx,const edg_wll_JobStat *jobstat,edg_wll_Stats *stats)
+{
+       int     i,j;
+       char    *sig;
+       struct edg_wll_stats_group      *g;
+       struct edg_wll_stats_archive    *a;
+       time_t  now = jobstat->stateEnterTime.tv_sec;
+
+       /* XXX: we support destination grouping only */
+       if (!jobstat->destination) return 0;
+       edg_wll_ResetError(ctx);
+
+       dprintf(("inc_counter: destination %s, stats %d\n",jobstat->destination,
+                       (int) (stats - (edg_wll_Stats *) default_stats)));
+
+       if (flock(stats->fd,LOCK_EX)) return edg_wll_SetError(ctx,errno,"flock()");
+
+       /* remap the file if someone changed its size */
+       if (stats->map->grpno != stats->grpno) {
+               int newgrpno = stats->map->grpno;
+               munmap(stats->map,(stats->grpno ? stats->grpno : 1) * stats->grpsize);
+               stats->map = mmap(NULL,newgrpno * stats->grpsize,
+                               PROT_READ|PROT_WRITE,MAP_SHARED,stats->fd,0);
+               if (stats->map == MAP_FAILED) {
+                       edg_wll_SetError(ctx,errno,"mmap()");
+                       goto cleanup;
+               }
+               assert(stats->map->grpno == newgrpno);
+               stats->grpno = newgrpno;
+       }
+
+       sig = str2md5base64(jobstat->destination);
+
+       for (i=0; i<stats->grpno; i++) {
+               g = (struct edg_wll_stats_group *) (
+                               ((char *) stats->map) + stats->grpsize * i
+                       );
+               if (!strcmp(sig,g->sig)) break;
+       }
+
+       /* not found, initialize new */
+       if (i == stats->grpno) {
+               dprintf(("group %s not found\n",sig));
+               if (stats->grpno) {
+                       char    *zero = calloc(1,stats->grpsize);
+                       munmap(stats->map,stats->grpno * stats->grpsize);
+                       lseek(stats->fd,0,SEEK_END);
+                       write(stats->fd,zero,stats->grpsize);
+                       free(zero);
+                       stats->map = mmap(NULL,stats->grpno * stats->grpsize,
+                                       PROT_READ|PROT_WRITE,MAP_SHARED,stats->fd,0);
+
+                       if (stats->map == MAP_FAILED) {
+                               edg_wll_SetError(ctx,errno,"mmap()");
+                               goto cleanup;
+                       }
+               }
+               stats->grpno++;
+               stats->map->grpno++;
+               dprintf(("allocated\n"));
+
+               g = (struct edg_wll_stats_group *) (
+                               ((char *) stats->map) + stats->grpsize * i);
+
+               /* invalidate all cells in all archives */
+               a = g->archive;
+               for (i=0; stats->archives[i].interval; i++) {
+                       for (j=0; j<stats->archives[i].length; j++) a->cells[j].cnt = -1;
+                       a = archive_skip(a,stats->archives[i].length);
+               }
+
+               strcpy(g->sig,sig);
+               g->last_update = now;
+       }
+       else dprintf(("group %s found at %d\n",sig,i));
+
+       a = g->archive;
+       for (i=0; stats->archives[i].interval; i++) {
+               time_t  pt = g->last_update;
+
+               pt -= pt % stats->archives[i].interval;
+
+               /* nothing happened longer than is history of this archive */
+               if (now-pt > stats->archives[i].interval * stats->archives[i].length) {
+                       for (j=0; j<stats->archives[i].length; j++) a->cells[j].cnt = 0;
+                       a->ptr = 0;
+               }
+               /* catch up eventually, cleaning not touched cells */
+               else for (pt += stats->archives[i].interval; pt < now;
+                               pt += stats->archives[i].interval)
+               {
+                       if (++(a->ptr) == stats->archives[i].length) a->ptr = 0;
+                       a->cells[a->ptr].cnt = 0;
+               }
+
+               /* validate an unused cell */
+               if (a->cells[a->ptr].cnt < 0) a->cells[a->ptr].cnt = 0;
+
+               /* now we can do IT */
+               a->cells[a->ptr].cnt++;
+               dprintf(("update archive %d, cell %d to %d\n",i,a->ptr,a->cells[a->ptr].cnt));
+
+               /* go to next archive */
+               a = archive_skip(a,stats->archives[i].length);
+       }
+
+       g->last_update = now;
+
+
+cleanup:
+       free(sig);
+       flock(stats->fd,LOCK_UN);
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+static int stats_record_duration(
+               edg_wll_Context ctx,
+               const edg_wll_JobStat *from,
+               const edg_wll_JobStat *to,
+               edg_wll_Stats *stats)
+{
+       return 0;
+}
+
+int edg_wll_StateRateServer(
+       edg_wll_Context ctx,
+       const edg_wll_QueryRec  *group,
+       edg_wll_JobStatCode     major,
+       int                     minor,
+       time_t  *from, 
+       time_t  *to,
+       float   *rate,
+       int     *res_from,
+       int     *res_to
+)
+{
+       edg_wll_Stats *stats = default_stats;   /* XXX: hardcoded */
+       struct edg_wll_stats_group      *g;
+       struct edg_wll_stats_archive    *a;
+       int     i,j,matchi;
+       char    *sig = NULL;
+       time_t  afrom,ato;
+       long    match;
+
+       edg_wll_ResetError(ctx);
+
+       switch (ctx->count_statistics) {
+               case 0: return edg_wll_SetError(ctx,ENOSYS,NULL);
+               case 1: if (!ctx->noAuth) return edg_wll_SetError(ctx,EPERM,NULL);
+                       break;
+               case 2: break;
+               default: abort();
+       }
+
+       if (group[0].attr != EDG_WLL_QUERY_ATTR_DESTINATION
+               || group[1].attr != EDG_WLL_QUERY_ATTR_UNDEF)
+                       return edg_wll_SetError(ctx,ENOSYS,
+                               "the only supported grouping is by destination");
+
+       if (*from >= *to) return edg_wll_SetError(ctx,EINVAL,"from >= to");
+
+       for (;stats->type; stats++) {
+               if (stats->type != STATS_COUNT || stats->major != major) continue;
+               switch (major) {
+                       case EDG_WLL_JOB_DONE:
+                               if (stats->minor != minor) continue;
+                               break;
+                       default: break;
+               }
+               break;
+       }
+
+       if (!stats->type) return edg_wll_SetError(ctx,ENOENT,"no matching state counter");
+
+       if (flock(stats->fd,LOCK_SH)) return edg_wll_SetError(ctx,errno,"flock()");
+
+       /* XXX */
+       sig = str2md5base64(group->value.c);
+
+
+       for (i=0, g=stats->map; i<stats->grpno; i++) {
+               if (!strcmp(sig,g->sig)) break;
+               g = (struct edg_wll_stats_group *) (((char *) g) + stats->grpsize);
+       }
+
+       if (i == stats->grpno) {
+               edg_wll_SetError(ctx,ENOENT,"no matching group");
+               goto cleanup;
+       }
+
+       match = 0;
+       matchi = -1;
+       /* XXX: assumes decreasing resolution of archives */ 
+       for (j=0; stats->archives[j].interval; j++) {
+               afrom = ato = g->last_update;
+
+               ato += stats->archives[j].interval - ato % stats->archives[j].interval;
+               afrom -= afrom % stats->archives[j].interval;
+               afrom -= stats->archives[j].interval * (stats->archives[j].length-1);
+
+               /* intersection of [from,to] and [afrom,ato] */
+               if (afrom < *from) afrom = *from;
+               if (ato > *to) ato = *to;
+
+               /* find the best match */
+               if (ato-afrom > match) {
+                       match = ato - afrom;
+                       matchi = j;
+               }
+       }
+
+       dprintf(("best match: archive %d, interval %ld\n",matchi,match));
+
+       if (matchi < 0) {
+               if (*from > g->last_update) {
+                       /* special case -- we are sure that nothing arrived */
+                       *rate = 0;
+                       *res_from = *res_to = stats->archives[0].interval;
+                       goto cleanup;
+               }
+               edg_wll_SetError(ctx,ENOENT,"no data available");
+               goto cleanup;
+       }
+
+       *res_from = *res_to = stats->archives[matchi].interval;
+
+       a = g->archive;
+       for (j=0; j<matchi; j++) a = archive_skip(a,stats->archives[j].length);
+
+       i = stats->archives[matchi].interval;
+       afrom = g->last_update - g->last_update % i
+                       - (stats->archives[matchi].length-1)*i;
+
+       dprintf(("archive from %ld = %s",afrom,ctime(&afrom)));
+
+       if (afrom > *from) *from = afrom;
+       if (afrom + stats->archives[matchi].length * i < *to) *to = afrom + stats->archives[matchi].length * i;
+
+       *rate = 0;
+       match = 0;
+
+
+       for (j=0; j<stats->archives[matchi].length; j++,afrom += i) {
+               struct edg_wll_stats_cell       *c = a->cells + ((a->ptr+j+1) % stats->archives[matchi].length);
+
+               dprintf(("cell %d (abs %d): ",j,(a->ptr+j+1) % stats->archives[matchi].length));
+               if (c->cnt < 0) {
+                       dprintf(("invalid\n"));
+                       continue; /* invalid cell */
+               }
+
+               dprintf(("search %ld in %ld, %ld\n",*from,afrom,afrom+i));
+
+               if (*from >= afrom && *from < afrom+i) {
+                       match += *from - afrom;
+                       *rate += c->cnt * (1.0 - ((float) *from-afrom)/i);
+                       dprintf(("matched from: match %ld, rate %f\n",match,*rate));
+               }
+               else if (*from < afrom && *to >= afrom) {
+                       match += i;
+                       *rate += c->cnt;
+                       dprintf(("matched in: match %ld, rate %f\n",match,*rate));
+               }
+
+               if (*to >= afrom && *to < afrom+i) {
+                       match -= i-(*to-afrom);
+                       *rate -= c->cnt * (((float) i)-(*to - afrom))/i;
+
+                       dprintf(("matched to: match %ld, rate %f\n",match,*rate));
+
+               /* asi blbost 
+                       if (j == stats->archives[matchi].length - 1
+                               && *to > g->last_update)
+                       {
+                               match -= *to - g->last_update;
+                               *rate -= c->cnt * (((float) *to) - g->last_update)/i;
+                               dprintf(("corrected wrt. last_update: match %ld, rate %f\n",match,*rate));
+                       }
+               */
+
+                       break;
+               }
+       }
+       *rate /= match;
+
+cleanup:
+       free(sig);
+       flock(stats->fd,LOCK_UN);
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+int edg_wll_StateDurationServer(
+       edg_wll_Context ctx,
+       const edg_wll_QueryRec  *group,
+       edg_wll_JobStatCode     major,
+       int                     minor,
+       time_t  *from, 
+       time_t  *to,
+       float   *duration,
+       int     *res_from,
+       int     *res_to
+)
+{
+       return edg_wll_SetError(ctx,ENOSYS,NULL);
+}
+
diff --git a/org.glite.lb.server/src/stats.h b/org.glite.lb.server/src/stats.h
new file mode 100644 (file)
index 0000000..dc869d3
--- /dev/null
@@ -0,0 +1,70 @@
+#ifndef __EDG_WORKLOAD_LOGGING_LBSERVER_STATS_H__
+#define __EDG_WORKLOAD_LOGGING_LBSERVER_STATS_H__
+
+int edg_wll_InitStatistics(edg_wll_Context);
+
+int edg_wll_UpdateStatistics(
+       edg_wll_Context,
+       const edg_wll_JobStat *,
+       const edg_wll_Event *,
+       const edg_wll_JobStat *);
+
+
+struct edg_wll_stats_cell {
+       int     cnt;
+       float   value;
+};
+
+struct edg_wll_stats_archive {
+       int     ptr;
+       struct edg_wll_stats_cell       cells[1];
+};
+
+struct edg_wll_stats_group {
+       int     grpno;
+       char    sig[33];
+       time_t  last_update;
+       struct edg_wll_stats_archive    archive[1];
+};
+
+
+typedef struct {
+       enum { STATS_UNDEF = 0, STATS_COUNT, STATS_DURATION }   type;
+       edg_wll_QueryRec        *group;
+       edg_wll_JobStatCode     major;
+       int                     minor;
+       struct _edg_wll_StatsArchive {
+               int     interval,length;
+       } *archives;
+
+       int     fd;
+       struct edg_wll_stats_group      *map;
+       int     grpno,grpsize;
+} edg_wll_Stats;
+
+int edg_wll_StateRateServer(
+       edg_wll_Context context,
+       const edg_wll_QueryRec  *group,
+       edg_wll_JobStatCode     major,
+       int                     minor,
+       time_t  *from, 
+       time_t  *to,
+       float   *rate,
+       int     *res_from,
+       int     *res_to
+);
+
+
+int edg_wll_StateDurationServer(
+       edg_wll_Context context,
+       const edg_wll_QueryRec  *group,
+       edg_wll_JobStatCode     major,
+       int                     minor,
+       time_t  *from, 
+       time_t  *to,
+       float   *duration,
+       int     *res_from,
+       int     *res_to
+);
+
+#endif