From: Aleš Křenek Date: Fri, 10 Dec 2004 13:01:20 +0000 (+0000) Subject: imported statistics counting X-Git-Tag: GLITE_RELEASE_1_1~1 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=6496765f5438ed6dc5d43697992c91d79bb3fde0;p=jra1mw.git imported statistics counting --- diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index ac6cd76..4489c06 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -122,7 +122,7 @@ SERVER_OBJS:= bkserverd.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_su lb_xml_parse_V21.o \ lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \ stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ - notification.o il_notification.o notif_match.o + notification.o il_notification.o notif_match.o stats.o INDEX_OBJS:= index.o index_parse.o jobstat_supp.o lbs_db.o openserver.o \ jobstat.o query.o lock.o get_events.o write2rgma.o index_lex.o \ diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index caa56f4..820f3f2 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -95,6 +95,7 @@ static const int one = 1; static int noAuth = 0; static int noIndex = 0; static int strict_locking = 0; +static int count_statistics = 0; static int hardJobsLimit = 0; static int hardEventsLimit = 0; static int hardRespSizeLimit = 0; @@ -146,13 +147,14 @@ static struct option opts[] = { {"notif-dur", 1, NULL, 'N'}, {"notif-il-sock", 1, NULL, 'X'}, {"notif-il-fprefix", 1, NULL, 'Y'}, + {"count-statistics", 1, NULL, 'T'}, {NULL,0,NULL,0} }; #ifdef GLITE_LB_SERVER_WITH_WS -static const char *get_opt_string = "a:c:k:C:V:p:w:drm:ns:l:L:N:i:S:D:X:Y:"; +static const char *get_opt_string = "a:c:k:C:V:p:w:drm:ns:l:L:N:i:S:D:X:Y:T:"; #else -static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:"; +static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:"; #endif /* GLITE_LB_SERVER_WITH_WS */ static void usage(char *me) @@ -185,6 +187,8 @@ static void usage(char *me) "\t--strict-locking=1\t lock jobs also on storing events (may be slow)\n" "\t--notif-il-sock\t socket to send notifications\n" "\t--notif-il-fprefix\t file prefix for notifications\n" + "\t--count-statistics=1\t count certain statistics on jobs\n" + "\t =2\t ... and allow anonymous access\n" ,me); } @@ -347,6 +351,8 @@ int main(int argc, char *argv[]) break; case 'P': strict_locking = 1; break; + case 'T': count_statistics = atoi(optarg); + break; case '?': usage(name); return 1; } @@ -517,6 +523,7 @@ a.sin_addr.s_addr = INADDR_ANY; fprintf(stderr,"%s: open database: %s (%s)\n",argv[0],et,ed); return 1; } + if (count_statistics) edg_wll_InitStatistics(ctx); edg_wll_Close(ctx); edg_wll_FreeContext(ctx); @@ -716,6 +723,7 @@ int bk_handle_connection(int conn, struct timeval client_start, void *data) getpeername(conn, (struct sockaddr *)&a, &alen); ctx->connPool[ctx->connToUse].peerName = strdup(inet_ntoa(a.sin_addr)); ctx->connPool[ctx->connToUse].peerPort = ntohs(a.sin_port); + ctx->count_statistics = count_statistics; /* not a critical operation, do not waste all SLAVE_TIMEOUT */ h_errno = asyn_gethostbyaddr(&name, (char *)&a.sin_addr.s_addr,sizeof(a.sin_addr.s_addr), AF_INET, &dns_to); diff --git a/org.glite.lb.server/src/stats.c b/org.glite.lb.server/src/stats.c new file mode 100644 index 0000000..1e8f6cb --- /dev/null +++ b/org.glite.lb.server/src/stats.c @@ -0,0 +1,427 @@ +#include +#include +#include +#include +#include +#include + +#include "glite/lb/events.h" +#include "glite/lb/jobstat.h" +#include "glite/lb/context-int.h" + +#include "glite/wmsutils/jobid/strmd5.h" + +#include "stats.h" +static int stats_inc_counter(edg_wll_Context,const edg_wll_JobStat *,edg_wll_Stats *); +static int stats_record_duration(edg_wll_Context,const edg_wll_JobStat *,const edg_wll_JobStat *,edg_wll_Stats *); + +#define dprintf(x) { printf("[%d] ",getpid()); printf x; } + +/* XXX: should be configurable at run time */ +static struct _edg_wll_StatsArchive default_archives[] = { + { 10, 60 }, + { 60, 30 }, + { 900, 12 }, + { 0, 0 } +}; + +static edg_wll_QueryRec default_group[2] = { + { EDG_WLL_QUERY_ATTR_DESTINATION, }, + { EDG_WLL_QUERY_ATTR_UNDEF, } +}; + +static edg_wll_Stats default_stats[] = { + { STATS_COUNT, default_group, EDG_WLL_JOB_READY, 0, default_archives }, + { STATS_COUNT, default_group, EDG_WLL_JOB_SCHEDULED, 0, default_archives }, + { STATS_COUNT, default_group, EDG_WLL_JOB_RUNNING, 0, default_archives }, + { STATS_COUNT, default_group, EDG_WLL_JOB_DONE, EDG_WLL_STAT_OK, default_archives }, + { STATS_COUNT, default_group, EDG_WLL_JOB_DONE, EDG_WLL_STAT_FAILED, default_archives }, + { STATS_DURATION, default_group, EDG_WLL_JOB_SCHEDULED, 0, default_archives }, + { STATS_DURATION, default_group, EDG_WLL_JOB_RUNNING, 0, default_archives }, + { STATS_UNDEF, } + +}; + +int edg_wll_InitStatistics(edg_wll_Context ctx) +{ + edg_wll_Stats *stats = default_stats; /* XXX: hardcoded */ + int i,j; + + for (i=0; stats[i].type; i++) { + char fname[50],*zero; + + strcpy(fname,"/tmp/lb_stats.XXXXXX"); + stats[i].fd = mkstemp(fname); + if (stats[i].fd < 0) return edg_wll_SetError(ctx,errno,fname); + /* XXX: should be initialized from LB data */ + stats[i].grpno = 0; + + stats[i].grpsize = sizeof(struct edg_wll_stats_group)-sizeof(struct edg_wll_stats_archive); + + for (j=0; stats[i].archives[j].interval; j++) { + stats[i].grpsize += sizeof(struct edg_wll_stats_archive)-sizeof(struct edg_wll_stats_cell); + stats[i].grpsize += stats[i].archives[j].length * sizeof(struct edg_wll_stats_cell); + } + zero = calloc(1,stats[i].grpsize); + write(stats[i].fd,zero,stats[i].grpsize); + stats[i].map = mmap(NULL,sizeof zero,PROT_READ|PROT_WRITE,MAP_SHARED,stats[i].fd,0); + if (stats[i].map == MAP_FAILED) return edg_wll_SetError(ctx,errno,"mmap()"); + + dprintf(("stats: using %s\n",fname)); + unlink(fname); + } + return 0; +} + +int edg_wll_UpdateStatistics( + edg_wll_Context ctx, + const edg_wll_JobStat *from, + const edg_wll_Event *e, + const edg_wll_JobStat *to) +{ + int i; + + /* XXX: should be propagated somhow via context, not hardcoded */ + edg_wll_Stats *stats = default_stats; + + if (!ctx->count_statistics) return 0; + + for (i=0; stats[i].type; i++) switch (stats[i].type) { + case STATS_COUNT: + if (!to) continue; + if (to->state == stats[i].major && (!from || from->state != to->state)) { + switch (to->state) { + case EDG_WLL_JOB_DONE: + if (to->done_code != stats[i].minor) continue; + break; + default: break; + } + stats_inc_counter(ctx,to,stats+i); + } + break; + case STATS_DURATION: + if (!to || !from) continue; + if (from->state == stats[i].major && from->state != to->state) + stats_record_duration(ctx,from,to,stats+i); + default: break; + } + return 0; +} + +static struct edg_wll_stats_archive *archive_skip( + const struct edg_wll_stats_archive *a, + int len) +{ + return (struct edg_wll_stats_archive *) + (((char *) a) + sizeof(struct edg_wll_stats_archive) + - sizeof(struct edg_wll_stats_cell) + + len * sizeof(struct edg_wll_stats_cell) + ); +} + +static int stats_inc_counter(edg_wll_Context ctx,const edg_wll_JobStat *jobstat,edg_wll_Stats *stats) +{ + int i,j; + char *sig; + struct edg_wll_stats_group *g; + struct edg_wll_stats_archive *a; + time_t now = jobstat->stateEnterTime.tv_sec; + + /* XXX: we support destination grouping only */ + if (!jobstat->destination) return 0; + edg_wll_ResetError(ctx); + + dprintf(("inc_counter: destination %s, stats %d\n",jobstat->destination, + (int) (stats - (edg_wll_Stats *) default_stats))); + + if (flock(stats->fd,LOCK_EX)) return edg_wll_SetError(ctx,errno,"flock()"); + + /* remap the file if someone changed its size */ + if (stats->map->grpno != stats->grpno) { + int newgrpno = stats->map->grpno; + munmap(stats->map,(stats->grpno ? stats->grpno : 1) * stats->grpsize); + stats->map = mmap(NULL,newgrpno * stats->grpsize, + PROT_READ|PROT_WRITE,MAP_SHARED,stats->fd,0); + if (stats->map == MAP_FAILED) { + edg_wll_SetError(ctx,errno,"mmap()"); + goto cleanup; + } + assert(stats->map->grpno == newgrpno); + stats->grpno = newgrpno; + } + + sig = str2md5base64(jobstat->destination); + + for (i=0; igrpno; i++) { + g = (struct edg_wll_stats_group *) ( + ((char *) stats->map) + stats->grpsize * i + ); + if (!strcmp(sig,g->sig)) break; + } + + /* not found, initialize new */ + if (i == stats->grpno) { + dprintf(("group %s not found\n",sig)); + if (stats->grpno) { + char *zero = calloc(1,stats->grpsize); + munmap(stats->map,stats->grpno * stats->grpsize); + lseek(stats->fd,0,SEEK_END); + write(stats->fd,zero,stats->grpsize); + free(zero); + stats->map = mmap(NULL,stats->grpno * stats->grpsize, + PROT_READ|PROT_WRITE,MAP_SHARED,stats->fd,0); + + if (stats->map == MAP_FAILED) { + edg_wll_SetError(ctx,errno,"mmap()"); + goto cleanup; + } + } + stats->grpno++; + stats->map->grpno++; + dprintf(("allocated\n")); + + g = (struct edg_wll_stats_group *) ( + ((char *) stats->map) + stats->grpsize * i); + + /* invalidate all cells in all archives */ + a = g->archive; + for (i=0; stats->archives[i].interval; i++) { + for (j=0; jarchives[i].length; j++) a->cells[j].cnt = -1; + a = archive_skip(a,stats->archives[i].length); + } + + strcpy(g->sig,sig); + g->last_update = now; + } + else dprintf(("group %s found at %d\n",sig,i)); + + a = g->archive; + for (i=0; stats->archives[i].interval; i++) { + time_t pt = g->last_update; + + pt -= pt % stats->archives[i].interval; + + /* nothing happened longer than is history of this archive */ + if (now-pt > stats->archives[i].interval * stats->archives[i].length) { + for (j=0; jarchives[i].length; j++) a->cells[j].cnt = 0; + a->ptr = 0; + } + /* catch up eventually, cleaning not touched cells */ + else for (pt += stats->archives[i].interval; pt < now; + pt += stats->archives[i].interval) + { + if (++(a->ptr) == stats->archives[i].length) a->ptr = 0; + a->cells[a->ptr].cnt = 0; + } + + /* validate an unused cell */ + if (a->cells[a->ptr].cnt < 0) a->cells[a->ptr].cnt = 0; + + /* now we can do IT */ + a->cells[a->ptr].cnt++; + dprintf(("update archive %d, cell %d to %d\n",i,a->ptr,a->cells[a->ptr].cnt)); + + /* go to next archive */ + a = archive_skip(a,stats->archives[i].length); + } + + g->last_update = now; + + +cleanup: + free(sig); + flock(stats->fd,LOCK_UN); + return edg_wll_Error(ctx,NULL,NULL); +} + +static int stats_record_duration( + edg_wll_Context ctx, + const edg_wll_JobStat *from, + const edg_wll_JobStat *to, + edg_wll_Stats *stats) +{ + return 0; +} + +int edg_wll_StateRateServer( + edg_wll_Context ctx, + const edg_wll_QueryRec *group, + edg_wll_JobStatCode major, + int minor, + time_t *from, + time_t *to, + float *rate, + int *res_from, + int *res_to +) +{ + edg_wll_Stats *stats = default_stats; /* XXX: hardcoded */ + struct edg_wll_stats_group *g; + struct edg_wll_stats_archive *a; + int i,j,matchi; + char *sig = NULL; + time_t afrom,ato; + long match; + + edg_wll_ResetError(ctx); + + switch (ctx->count_statistics) { + case 0: return edg_wll_SetError(ctx,ENOSYS,NULL); + case 1: if (!ctx->noAuth) return edg_wll_SetError(ctx,EPERM,NULL); + break; + case 2: break; + default: abort(); + } + + if (group[0].attr != EDG_WLL_QUERY_ATTR_DESTINATION + || group[1].attr != EDG_WLL_QUERY_ATTR_UNDEF) + return edg_wll_SetError(ctx,ENOSYS, + "the only supported grouping is by destination"); + + if (*from >= *to) return edg_wll_SetError(ctx,EINVAL,"from >= to"); + + for (;stats->type; stats++) { + if (stats->type != STATS_COUNT || stats->major != major) continue; + switch (major) { + case EDG_WLL_JOB_DONE: + if (stats->minor != minor) continue; + break; + default: break; + } + break; + } + + if (!stats->type) return edg_wll_SetError(ctx,ENOENT,"no matching state counter"); + + if (flock(stats->fd,LOCK_SH)) return edg_wll_SetError(ctx,errno,"flock()"); + + /* XXX */ + sig = str2md5base64(group->value.c); + + + for (i=0, g=stats->map; igrpno; i++) { + if (!strcmp(sig,g->sig)) break; + g = (struct edg_wll_stats_group *) (((char *) g) + stats->grpsize); + } + + if (i == stats->grpno) { + edg_wll_SetError(ctx,ENOENT,"no matching group"); + goto cleanup; + } + + match = 0; + matchi = -1; + /* XXX: assumes decreasing resolution of archives */ + for (j=0; stats->archives[j].interval; j++) { + afrom = ato = g->last_update; + + ato += stats->archives[j].interval - ato % stats->archives[j].interval; + afrom -= afrom % stats->archives[j].interval; + afrom -= stats->archives[j].interval * (stats->archives[j].length-1); + + /* intersection of [from,to] and [afrom,ato] */ + if (afrom < *from) afrom = *from; + if (ato > *to) ato = *to; + + /* find the best match */ + if (ato-afrom > match) { + match = ato - afrom; + matchi = j; + } + } + + dprintf(("best match: archive %d, interval %ld\n",matchi,match)); + + if (matchi < 0) { + if (*from > g->last_update) { + /* special case -- we are sure that nothing arrived */ + *rate = 0; + *res_from = *res_to = stats->archives[0].interval; + goto cleanup; + } + edg_wll_SetError(ctx,ENOENT,"no data available"); + goto cleanup; + } + + *res_from = *res_to = stats->archives[matchi].interval; + + a = g->archive; + for (j=0; jarchives[j].length); + + i = stats->archives[matchi].interval; + afrom = g->last_update - g->last_update % i + - (stats->archives[matchi].length-1)*i; + + dprintf(("archive from %ld = %s",afrom,ctime(&afrom))); + + if (afrom > *from) *from = afrom; + if (afrom + stats->archives[matchi].length * i < *to) *to = afrom + stats->archives[matchi].length * i; + + *rate = 0; + match = 0; + + + for (j=0; jarchives[matchi].length; j++,afrom += i) { + struct edg_wll_stats_cell *c = a->cells + ((a->ptr+j+1) % stats->archives[matchi].length); + + dprintf(("cell %d (abs %d): ",j,(a->ptr+j+1) % stats->archives[matchi].length)); + if (c->cnt < 0) { + dprintf(("invalid\n")); + continue; /* invalid cell */ + } + + dprintf(("search %ld in %ld, %ld\n",*from,afrom,afrom+i)); + + if (*from >= afrom && *from < afrom+i) { + match += *from - afrom; + *rate += c->cnt * (1.0 - ((float) *from-afrom)/i); + dprintf(("matched from: match %ld, rate %f\n",match,*rate)); + } + else if (*from < afrom && *to >= afrom) { + match += i; + *rate += c->cnt; + dprintf(("matched in: match %ld, rate %f\n",match,*rate)); + } + + if (*to >= afrom && *to < afrom+i) { + match -= i-(*to-afrom); + *rate -= c->cnt * (((float) i)-(*to - afrom))/i; + + dprintf(("matched to: match %ld, rate %f\n",match,*rate)); + + /* asi blbost + if (j == stats->archives[matchi].length - 1 + && *to > g->last_update) + { + match -= *to - g->last_update; + *rate -= c->cnt * (((float) *to) - g->last_update)/i; + dprintf(("corrected wrt. last_update: match %ld, rate %f\n",match,*rate)); + } + */ + + break; + } + } + *rate /= match; + +cleanup: + free(sig); + flock(stats->fd,LOCK_UN); + return edg_wll_Error(ctx,NULL,NULL); +} + +int edg_wll_StateDurationServer( + edg_wll_Context ctx, + const edg_wll_QueryRec *group, + edg_wll_JobStatCode major, + int minor, + time_t *from, + time_t *to, + float *duration, + int *res_from, + int *res_to +) +{ + return edg_wll_SetError(ctx,ENOSYS,NULL); +} + diff --git a/org.glite.lb.server/src/stats.h b/org.glite.lb.server/src/stats.h new file mode 100644 index 0000000..dc869d3 --- /dev/null +++ b/org.glite.lb.server/src/stats.h @@ -0,0 +1,70 @@ +#ifndef __EDG_WORKLOAD_LOGGING_LBSERVER_STATS_H__ +#define __EDG_WORKLOAD_LOGGING_LBSERVER_STATS_H__ + +int edg_wll_InitStatistics(edg_wll_Context); + +int edg_wll_UpdateStatistics( + edg_wll_Context, + const edg_wll_JobStat *, + const edg_wll_Event *, + const edg_wll_JobStat *); + + +struct edg_wll_stats_cell { + int cnt; + float value; +}; + +struct edg_wll_stats_archive { + int ptr; + struct edg_wll_stats_cell cells[1]; +}; + +struct edg_wll_stats_group { + int grpno; + char sig[33]; + time_t last_update; + struct edg_wll_stats_archive archive[1]; +}; + + +typedef struct { + enum { STATS_UNDEF = 0, STATS_COUNT, STATS_DURATION } type; + edg_wll_QueryRec *group; + edg_wll_JobStatCode major; + int minor; + struct _edg_wll_StatsArchive { + int interval,length; + } *archives; + + int fd; + struct edg_wll_stats_group *map; + int grpno,grpsize; +} edg_wll_Stats; + +int edg_wll_StateRateServer( + edg_wll_Context context, + const edg_wll_QueryRec *group, + edg_wll_JobStatCode major, + int minor, + time_t *from, + time_t *to, + float *rate, + int *res_from, + int *res_to +); + + +int edg_wll_StateDurationServer( + edg_wll_Context context, + const edg_wll_QueryRec *group, + edg_wll_JobStatCode major, + int minor, + time_t *from, + time_t *to, + float *duration, + int *res_from, + int *res_to +); + +#endif