- added new statistics (duration from any state to another, grouped by destination)
authorJiří Filipovič <fila@ics.muni.cz>
Mon, 21 Jun 2010 11:05:16 +0000 (11:05 +0000)
committerJiří Filipovič <fila@ics.muni.cz>
Mon, 21 Jun 2010 11:05:16 +0000 (11:05 +0000)
- a few refactoring
- fixed old statistics (jobs rate)

org.glite.lb.common/interface/xml_conversions.h
org.glite.lb.common/interface/xml_parse.h
org.glite.lb.common/src/xml_conversions.c
org.glite.lb.common/src/xml_parse.c.T
org.glite.lb.server/src/lb_proto.c
org.glite.lb.server/src/lb_xml_parse.c.T
org.glite.lb.server/src/lb_xml_parse.h
org.glite.lb.server/src/stats.c
org.glite.lb.server/src/stats.h

index c2aeaeb..d685184 100644 (file)
@@ -93,7 +93,8 @@ typedef struct _edg_wll_XML_ctx {
        time_t                  notifValidity;
        char                    *statsFunction;
        edg_wll_QueryRec        **statsConditions;
-       edg_wll_JobStatCode     statsMajor;
+       edg_wll_JobStatCode     statsBaseState;
+       edg_wll_JobStatCode     statsFinalState;
        int                     statsMinor;
        time_t                  statsFrom, statsTo;
        float                   statsRate, statsDuration;
index 3d3601f..4a832ea 100644 (file)
@@ -89,7 +89,7 @@ extern int edg_wll_NotifRequestToXML( edg_wll_Context ctx, const char *function,
 
 extern int edg_wll_QuerySequenceCodeToXML(edg_wll_Context ctx, glite_jobid_const_t jobId, char **message);
 
-extern int edg_wll_StatsRequestToXML(edg_wll_Context,const char *,const edg_wll_QueryRec *,edg_wll_JobStatCode,int,time_t *,time_t *,char **); 
+extern int edg_wll_StatsRequestToXML(edg_wll_Context,const char *,const edg_wll_QueryRec *,edg_wll_JobStatCode,edg_wll_JobStatCode,int,time_t *,time_t *,char **); 
        
 extern edg_wll_ErrorCode edg_wll_ParseStatsResult(edg_wll_Context ctx, char *messageBody, time_t *from, time_t *to, float *rate, float *duration, int *res_from, int *res_to);
 
index 8deb8ca..795b73c 100644 (file)
@@ -86,7 +86,8 @@ void edg_wll_initXMLCtx(edg_wll_XML_ctx *c) {
        c->bound = 0;
        c->statsFunction = NULL;
        c->statsConditions = NULL;
-       c->statsMajor = EDG_WLL_JOB_UNDEF;
+       c->statsBaseState = EDG_WLL_JOB_UNDEF;
+       c->statsFinalState = EDG_WLL_JOB_UNDEF;
        c->statsMinor = 0;
        c->statsRate = 0;
        c->statsDuration = 0;
index 4e81d9d..23b54cf 100644 (file)
@@ -2978,7 +2978,8 @@ int edg_wll_StatsRequestToXML(
                 edg_wll_Context        ctx, 
                const char              *function,
                const edg_wll_QueryRec  *cond,
-               edg_wll_JobStatCode     major,
+               edg_wll_JobStatCode     base_state,
+               edg_wll_JobStatCode     final_state,
                int                     minor,
                time_t                  *from,
                time_t                  *to,
@@ -2996,7 +2997,8 @@ int edg_wll_StatsRequestToXML(
 
        pomA = strdup("");
        
-       edg_wll_add_edg_wll_JobStatCode_to_XMLBody(&pomA, major, "major", EDG_WLL_JOB_UNDEF);
+       edg_wll_add_edg_wll_JobStatCode_to_XMLBody(&pomA, base_state, "base_state", EDG_WLL_JOB_UNDEF);
+       edg_wll_add_edg_wll_JobStatCode_to_XMLBody(&pomA, final_state, "final_state", EDG_WLL_JOB_UNDEF);
        edg_wll_add_int_to_XMLBody(&pomA, minor, "minor", -1);
        edg_wll_add_time_t_to_XMLBody(&pomA, *from, "from", 0); 
        edg_wll_add_time_t_to_XMLBody(&pomA, *to, "to", 0); 
index cfba7a2..3955926 100644 (file)
@@ -1190,15 +1190,14 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx,
                else if (!strncmp(requestPTR,KEY_STATS_REQUEST,sizeof(KEY_STATS_REQUEST)-1)) {
                        char *function;
                        edg_wll_QueryRec **conditions;
-                       edg_wll_JobStatCode major = EDG_WLL_JOB_UNDEF;
+                       edg_wll_JobStatCode base = EDG_WLL_JOB_UNDEF;
+                       edg_wll_JobStatCode final = EDG_WLL_JOB_UNDEF;
                        time_t from, to;
                        int i, j, minor, res_from, res_to;
                        float rate = 0, duration = 0;
                        
-                       
-                       
                        if (parseStatsRequest(ctx, messageBody, &function, &conditions, 
-                                               &major, &minor, &from, &to))
+                                               &base, &final, &minor, &from, &to))
                                ret = HTTP_BADREQ;
                        else {
                                int     fatal = 0, err = 0;
@@ -1207,12 +1206,17 @@ edg_wll_ErrorCode edg_wll_Proto(edg_wll_Context ctx,
                                // navratove chyby nejsou zname, nutno predelat dle aktualni situace
                                if (!strcmp(function,"Rate")) 
                                        err = edg_wll_StateRateServer(ctx,
-                                               conditions[0], major, minor, 
+                                               conditions[0], base, minor, 
                                                &from, &to, &rate, &res_from, &res_to); 
                                else if (!strcmp(function,"Duration"))
                                        err = edg_wll_StateDurationServer(ctx,
-                                               conditions[0], major, minor, 
+                                               conditions[0], base, minor, 
                                                &from, &to, &duration, &res_from, &res_to); 
+                               else if (!strcmp(function, "DurationFromTo"))
+                                       err = edg_wll_StateDurationFromToServer(
+                                               ctx, conditions[0], base, final,
+                                                minor, &from, &to, &duration,
+                                                &res_from, &res_to);
                                
                                switch (err) {
                                        case 0: if (html) ret = HTTP_NOTIMPL;
index 73f274d..c61c137 100644 (file)
@@ -442,7 +442,7 @@ static void startStatsRequest(void *data, const char *el, const char **attr)
                case 1: if (!strcasecmp(el,"and")) {
                                XMLCtx->jobQueryRec_begin = XML_GetCurrentByteIndex(XMLCtx->p);
                        }
-                       else if ( (strcasecmp(el,"major")) && (strcasecmp(el,"minor")) && 
+                       else if ( (strcasecmp(el,"base_state")) && (strcasecmp(el,"final_state")) && (strcasecmp(el,"minor")) && 
                                (strcasecmp(el,"from")) && (strcasecmp(el,"to")) ) unexp()
                        break;
                case 2: /* fall through */
@@ -919,10 +919,14 @@ static void endStatsRequest(void *data, const char *el UNUSED_VAR)
         edg_wll_XML_ctx *XMLCtx = data;
 
         if (XMLCtx->level == 2) {
-                if (!strcmp(XMLCtx->element,"major")) {
-                       XMLCtx->statsMajor = 
+                if (!strcmp(XMLCtx->element,"base_state")) {
+                       XMLCtx->statsBaseState = 
                                edg_wll_from_string_to_edg_wll_JobStatCode(XMLCtx);
                }
+               else if (!strcmp(XMLCtx->element,"final_state")) {
+                        XMLCtx->statsFinalState =
+                                edg_wll_from_string_to_edg_wll_JobStatCode(XMLCtx);
+                }
                else if (!strcmp(XMLCtx->element,"minor")) {
                        XMLCtx->statsMinor = edg_wll_from_string_to_int(XMLCtx);
                }
@@ -1284,7 +1288,7 @@ int parseLoadRequest(edg_wll_Context ctx, char *messageBody, edg_wll_LoadRequest
 
 
 /* parse Stats request from client */
-int parseStatsRequest(edg_wll_Context ctx, char *messageBody, char **function, edg_wll_QueryRec ***conditions, edg_wll_JobStatCode *major, int *minor, time_t *from, time_t *to) 
+int parseStatsRequest(edg_wll_Context ctx, char *messageBody, char **function, edg_wll_QueryRec ***conditions, edg_wll_JobStatCode *base_state, edg_wll_JobStatCode *final_state, int *minor, time_t *from, time_t *to) 
 {
        int     ret;
        edg_wll_XML_ctx         XMLCtx;
@@ -1294,7 +1298,8 @@ int parseStatsRequest(edg_wll_Context ctx, char *messageBody, char **function, e
        /* returns emty variables as default; only some variables will be filled in */
        /* depending on vaule of XMLCtx.notifFunction */
        *function = NULL;
-       *major = EDG_WLL_JOB_UNDEF;
+       *base_state = EDG_WLL_JOB_UNDEF;
+       *final_state = EDG_WLL_JOB_UNDEF;
        *minor = 0;
        *from = 0;
        *to = 0;
@@ -1340,7 +1345,8 @@ int parseStatsRequest(edg_wll_Context ctx, char *messageBody, char **function, e
 
                *function = NULL;
                *conditions = NULL;
-               *major = EDG_WLL_JOB_UNDEF;
+               *base_state = EDG_WLL_JOB_UNDEF;
+               *final_state = EDG_WLL_JOB_UNDEF;
                *minor = 0;
                *from = 0;
                *to = 0;
@@ -1348,7 +1354,8 @@ int parseStatsRequest(edg_wll_Context ctx, char *messageBody, char **function, e
        else {
                *function = XMLCtx.statsFunction;
                *conditions = XMLCtx.statsConditions;
-               *major = XMLCtx.statsMajor;
+               *base_state = XMLCtx.statsBaseState;
+               *final_state = XMLCtx.statsFinalState;
                *minor = XMLCtx.statsMinor;
                *from = XMLCtx.statsFrom;
                *to = XMLCtx.statsTo;
index 4096062..93bfdad 100644 (file)
@@ -52,7 +52,7 @@ int edg_wll_QuerySequenceCodeResultToXML(edg_wll_Context ctx, char *source, char
 
 int edg_wll_StatsResultToXML(edg_wll_Context,time_t,time_t,float,float,int,int,char **);
 
-int parseStatsRequest(edg_wll_Context,char *,char **,edg_wll_QueryRec ***,edg_wll_JobStatCode *,int *,time_t *,time_t *);
+int parseStatsRequest(edg_wll_Context,char *,char **,edg_wll_QueryRec ***,edg_wll_JobStatCode *,edg_wll_JobStatCode *,int *,time_t *,time_t *);
 
 
 #ifdef __cplusplus
index 4319c7e..a092f01 100644 (file)
@@ -35,11 +35,9 @@ limitations under the License.
 #include "glite/jobid/strmd5.h"
 
 #include "stats.h"
-#include "authz_policy.h"
 static int stats_inc_counter(edg_wll_Context,const edg_wll_JobStat *,edg_wll_Stats *);
 static int stats_record_duration(edg_wll_Context,const edg_wll_JobStat *,const edg_wll_JobStat *,edg_wll_Stats *);
-
-#define dprintf(x) { printf("[%d] ",getpid()); printf x; }
+static int stats_record_duration_fromto(edg_wll_Context ctx, const edg_wll_JobStat *from, const edg_wll_JobStat *to, edg_wll_Stats *stats);
 
 /* XXX: should be configurable at run time */
 static struct _edg_wll_StatsArchive default_archives[] = { 
@@ -55,13 +53,14 @@ static edg_wll_QueryRec default_group[2] = {
 };
 
 static edg_wll_Stats default_stats[] = {
-       { STATS_COUNT, default_group, EDG_WLL_JOB_READY, 0, default_archives },
-       { STATS_COUNT, default_group, EDG_WLL_JOB_SCHEDULED, 0, default_archives },
-       { STATS_COUNT, default_group, EDG_WLL_JOB_RUNNING, 0, default_archives },
-       { STATS_COUNT, default_group, EDG_WLL_JOB_DONE, EDG_WLL_STAT_OK, default_archives },
-       { STATS_COUNT, default_group, EDG_WLL_JOB_DONE, EDG_WLL_STAT_FAILED, default_archives },
-       { STATS_DURATION, default_group, EDG_WLL_JOB_SCHEDULED, 0, default_archives },
-       { STATS_DURATION, default_group, EDG_WLL_JOB_RUNNING, 0, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_READY, 0, 0, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_SCHEDULED, 0, 0, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_RUNNING, 0, 0, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_DONE, 0, EDG_WLL_STAT_OK, default_archives },
+       { STATS_COUNT, default_group, EDG_WLL_JOB_DONE, 0, EDG_WLL_STAT_FAILED, default_archives },
+       { STATS_DURATION, default_group, EDG_WLL_JOB_SCHEDULED, 0, 0, default_archives },
+       { STATS_DURATION, default_group, EDG_WLL_JOB_RUNNING, 0, 0, default_archives },
+       { STATS_DURATION_FROMTO, default_group, EDG_WLL_JOB_SUBMITTED, EDG_WLL_JOB_RUNNING, 0, default_archives },
        { STATS_UNDEF, }
        
 };
@@ -116,7 +115,7 @@ int edg_wll_UpdateStatistics(
        for (i=0; stats[i].type; i++) switch (stats[i].type) {
                case STATS_COUNT:
                        if (!to) continue;
-                       if (to->state == stats[i].major && (!from || from->state != to->state)) {
+                       if (to->state == stats[i].base_state && (!from || from->state != to->state)) {
                                switch (to->state) {
                                        case EDG_WLL_JOB_DONE:
                                                if (to->done_code != stats[i].minor) continue;
@@ -128,8 +127,14 @@ int edg_wll_UpdateStatistics(
                        break;
                case STATS_DURATION:
                        if (!to || !from) continue;
-                       if (from->state == stats[i].major && from->state != to->state) 
+                       if (from->state == stats[i].base_state && from->state != to->state) 
                                stats_record_duration(ctx,from,to,stats+i);
+                       break;
+               case STATS_DURATION_FROMTO:
+                       if (!from) continue;
+                       if ((to->state == stats[i].final_state) && (from->state != to->state))
+                               stats_record_duration_fromto(ctx, from, to, stats+i);
+                       break;
                default: break;
        }
        return 0;
@@ -165,10 +170,70 @@ static int stats_remap(edg_wll_Stats *stats)
 }
 
 
+static int stats_search_group(edg_wll_Context ctx, const edg_wll_JobStat *jobstat,edg_wll_Stats *stats, struct edg_wll_stats_group **g)
+{
+       int     i, j;
+       char    *sig = NULL;
+       struct edg_wll_stats_archive    *a;
+
+       sig = str2md5base64(jobstat->destination);
+
+        for (i=0; i<stats->grpno; i++) {
+                *g = (struct edg_wll_stats_group *) (
+                                ((char *) stats->map) + stats->grpsize * i
+                        );
+                if (!strcmp(sig,(*g)->sig)) break;
+        }
+
+       /* not found, initialize new */
+        if (i == stats->grpno) {
+                glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                        "group %s not found",sig);
+                if (stats->grpno) {
+                        char    *zero = calloc(1,stats->grpsize);
+                        munmap(stats->map,stats->grpno * stats->grpsize);
+                        lseek(stats->fd,0,SEEK_END);
+                        write(stats->fd,zero,stats->grpsize);
+                        free(zero);
+                        stats->map = mmap(NULL,(stats->grpno+1) * stats->grpsize,
+                                        PROT_READ|PROT_WRITE,MAP_SHARED,stats->fd,0);
+
+                        if (stats->map == MAP_FAILED) {
+                               free(sig);
+                                edg_wll_SetError(ctx,errno,"mmap()");
+                               return -1;
+                        }
+                }
+                stats->grpno++;
+                stats->map->grpno++;
+                glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                        "allocated");
+
+                *g = (struct edg_wll_stats_group *) (
+                                ((char *) stats->map) + stats->grpsize * i);
+
+                /* invalidate all cells in all archives */
+                a = (*g)->archive;
+                for (i=0; stats->archives[i].interval; i++) {
+                        for (j=0; j<stats->archives[i].length; j++) a->cells[j].cnt = -1;
+                        a = archive_skip(a,stats->archives[i].length);
+                }
+
+                strcpy((*g)->sig,sig);
+                (*g)->last_update = jobstat->stateEnterTime.tv_sec; //now;
+        }
+        else
+                glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                        "group %s found at %d", sig, i);
+
+       free(sig);
+
+       return 0;
+}
+
 static int stats_inc_counter(edg_wll_Context ctx,const edg_wll_JobStat *jobstat,edg_wll_Stats *stats)
 {
        int     i,j;
-       char    *sig = NULL;
        struct edg_wll_stats_group      *g;
        struct edg_wll_stats_archive    *a;
        time_t  now = jobstat->stateEnterTime.tv_sec;
@@ -189,55 +254,9 @@ static int stats_inc_counter(edg_wll_Context ctx,const edg_wll_JobStat *jobstat,
                goto cleanup;
        }
 
-       sig = str2md5base64(jobstat->destination);
-
-       for (i=0; i<stats->grpno; i++) {
-               g = (struct edg_wll_stats_group *) (
-                               ((char *) stats->map) + stats->grpsize * i
-                       );
-               if (!strcmp(sig,g->sig)) break;
-       }
-
-       /* not found, initialize new */
-       if (i == stats->grpno) {
-               glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
-                       "group %s not found",sig);
-               if (stats->grpno) {
-                       char    *zero = calloc(1,stats->grpsize);
-                       munmap(stats->map,stats->grpno * stats->grpsize);
-                       lseek(stats->fd,0,SEEK_END);
-                       write(stats->fd,zero,stats->grpsize);
-                       free(zero);
-                       stats->map = mmap(NULL,(stats->grpno+1) * stats->grpsize,
-                                       PROT_READ|PROT_WRITE,MAP_SHARED,stats->fd,0);
-
-                       if (stats->map == MAP_FAILED) {
-                               edg_wll_SetError(ctx,errno,"mmap()");
-                               goto cleanup;
-                       }
-               }
-               stats->grpno++;
-               stats->map->grpno++;
-               glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
-                       "allocated");
-
-               g = (struct edg_wll_stats_group *) (
-                               ((char *) stats->map) + stats->grpsize * i);
-
-               /* invalidate all cells in all archives */
-               a = g->archive;
-               for (i=0; stats->archives[i].interval; i++) {
-                       for (j=0; j<stats->archives[i].length; j++) a->cells[j].cnt = -1;
-                       a = archive_skip(a,stats->archives[i].length);
-               }
-
-               strcpy(g->sig,sig);
-               g->last_update = now;
-       }
-       else 
-               glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
-                       "group %s found at %d", sig, i);
-
+       if (stats_search_group(ctx, jobstat, stats, &g))
+               goto cleanup;
        a = g->archive;
        for (i=0; stats->archives[i].interval; i++) {
                time_t  pt = g->last_update;
@@ -274,7 +293,6 @@ static int stats_inc_counter(edg_wll_Context ctx,const edg_wll_JobStat *jobstat,
 
 
 cleanup:
-       free(sig);
        flock(stats->fd,LOCK_UN);
        return edg_wll_Error(ctx,NULL,NULL);
 }
@@ -288,6 +306,137 @@ static int stats_record_duration(
        return 0;
 }
 
+static int stats_record_duration_fromto(
+                edg_wll_Context ctx,
+                const edg_wll_JobStat *from,
+                const edg_wll_JobStat *to,
+                edg_wll_Stats *stats)
+{
+       struct edg_wll_stats_group      *g;
+        struct edg_wll_stats_archive    *a;
+       int i, j;
+       time_t  now = to->stateEnterTime.tv_sec;
+
+       /* XXX: we support destination grouping only */
+        if (!to->destination) return 0;
+        edg_wll_ResetError(ctx);
+
+       glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                "record_duration_fromto: destination %s, stats %d",
+                to->destination, (int) (stats - (edg_wll_Stats *) default_stats));
+
+       if (flock(stats->fd,LOCK_EX)) return edg_wll_SetError(ctx,errno,"flock()");
+
+        /* remap the file if someone changed its size */
+        if (stats->map->grpno != stats->grpno && stats_remap(stats)) {
+                edg_wll_SetError(ctx,errno,"shmem remap failed");
+                goto cleanup;
+        }
+
+       if (stats_search_group(ctx, to, stats, &g))
+               goto cleanup;
+
+       time_t base = to->stateEnterTimes[stats->base_state+1];
+       time_t final =  to->stateEnterTimes[stats->final_state+1];
+       time_t timedif = final-base;
+       if (base && final){  /* final should be always not null*/
+               a = g->archive;
+
+               for (i=0; stats->archives[i].interval; i++) {
+                       time_t  pt = g->last_update;
+
+                       pt -= pt % stats->archives[i].interval;
+       
+                       /* nothing happened longer than is history of this archive */
+                       if (now-pt > stats->archives[i].interval * stats->archives[i].length) {
+                               for (j=0; j<stats->archives[i].length; j++) a->cells[j].cnt = 0;
+                               a->ptr = 0;
+                       }
+                       /* catch up eventually, cleaning not touched cells */
+                       else for (pt += stats->archives[i].interval; pt < now;
+                                       pt += stats->archives[i].interval)
+                       {
+                               if (++(a->ptr) == stats->archives[i].length) a->ptr = 0;
+                               a->cells[a->ptr].cnt = 0;
+                       }
+       
+                       /* validate an unused cell */
+                       if (a->cells[a->ptr].cnt < 0) a->cells[a->ptr].cnt = 0;
+
+                       /* now we can do IT */
+                       a->cells[a->ptr].cnt++;
+                       a->cells[a->ptr].value += (float)timedif;
+                       glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                               "update archive %d, cell %d incremented to %f",
+                               i, a->ptr, a->cells[a->ptr].value);
+
+                       /* go to next archive */
+                       a = archive_skip(a,stats->archives[i].length);
+               }
+       }
+       
+       g->last_update = now;
+cleanup:
+       flock(stats->fd,LOCK_UN);
+        return edg_wll_Error(ctx,NULL,NULL);
+}
+
+static int findStat(
+       edg_wll_Context ctx,
+        const edg_wll_QueryRec  *group,
+        edg_wll_JobStatCode     base_state,
+        edg_wll_JobStatCode     final_state,
+        int                     minor,
+       time_t                  *from,
+        time_t                 *to,
+       edg_wll_Stats           **stats
+)
+{
+       edg_wll_JobStatCode later_state;
+
+       switch (ctx->count_statistics) {
+                case 0: return edg_wll_SetError(ctx,ENOSYS,NULL);
+                case 1: if (!ctx->noAuth) return edg_wll_SetError(ctx,EPERM,NULL);
+                        break;
+                case 2: break;
+                default: abort();
+        }
+
+        if (group[0].attr != EDG_WLL_QUERY_ATTR_DESTINATION
+                || group[1].attr != EDG_WLL_QUERY_ATTR_UNDEF)
+                        return edg_wll_SetError(ctx,ENOSYS,
+                                "the only supported grouping is by destination");
+
+        if (*from >= *to) return edg_wll_SetError(ctx,EINVAL,"from >= to");
+
+        for (;(*stats)->type; (*stats)++) {
+                //if ((*stats)->type != STATS_COUNT || (*stats)->base_state != base_state || (*stats)->final_state != final_state) continue;
+               if ((*stats)->type == STATS_COUNT){
+                       if ((*stats)->base_state != base_state) continue;
+                       else later_state = base_state;
+               }
+               if ((*stats)->type == STATS_DURATION_FROMTO){
+                       if((*stats)->base_state != base_state || (*stats)->final_state != final_state) continue;
+                       else later_state = final_state;
+               }
+               if  ((*stats)->type == STATS_DURATION){
+                       continue;
+               } 
+                switch (later_state) {
+                        case EDG_WLL_JOB_DONE:
+                                if ((*stats)->minor != minor) continue;
+                                break;
+                        default: break;
+                }
+                break;
+        }
+
+       if (!(*stats)->type) return edg_wll_SetError(ctx,ENOENT,"no matching state counter");
+
+       return 0;
+}
+
 int edg_wll_StateRateServer(
        edg_wll_Context ctx,
        const edg_wll_QueryRec  *group,
@@ -306,41 +455,12 @@ int edg_wll_StateRateServer(
        int     i,j,matchi;
        char    *sig = NULL;
        time_t  afrom,ato;
-       long    match;
-       struct _edg_wll_GssPrincipal_data princ;
-
-        princ.name = ctx->peerName;
-        princ.fqans = ctx->fqans;
+       long    match, diff;
+       int     err;
 
        edg_wll_ResetError(ctx);
 
-       switch (ctx->count_statistics) {
-               case 0: return edg_wll_SetError(ctx,ENOSYS,NULL);
-               case 1: if (!ctx->noAuth && !check_authz_policy(&ctx->authz_policy, &princ, GET_STATISTICS)) return edg_wll_SetError(ctx,EPERM,NULL);
-                       break;
-               case 2: break;
-               default: abort();
-       }
-
-       if (group[0].attr != EDG_WLL_QUERY_ATTR_DESTINATION
-               || group[1].attr != EDG_WLL_QUERY_ATTR_UNDEF)
-                       return edg_wll_SetError(ctx,ENOSYS,
-                               "the only supported grouping is by destination");
-
-       if (*from >= *to) return edg_wll_SetError(ctx,EINVAL,"from >= to");
-
-       for (;stats->type; stats++) {
-               if (stats->type != STATS_COUNT || stats->major != major) continue;
-               switch (major) {
-                       case EDG_WLL_JOB_DONE:
-                               if (stats->minor != minor) continue;
-                               break;
-                       default: break;
-               }
-               break;
-       }
-
-       if (!stats->type) return edg_wll_SetError(ctx,ENOENT,"no matching state counter");
+       if (err = findStat(ctx, group, major, EDG_WLL_JOB_UNDEF, minor, from, to, &stats)) return err;
 
        /* remap the file if someone changed its size */
        if (stats->map->grpno != stats->grpno)
@@ -436,37 +556,31 @@ int edg_wll_StateRateServer(
                }
 
                glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
-                       "search %ld in %ld, %ld", *from, afrom, afrom+i);
-
-               if (*from >= afrom && *from < afrom+i) {
-                       match += *from - afrom;
-                       *rate += c->cnt * (1.0 - ((float) *from-afrom)/i);
-                       glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "matched from: match %ld, rate %f", match, *rate);
-               }
-               else if (*from < afrom && *to >= afrom) {
-                       match += i;
-                       *rate += c->cnt;
-                       glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "matched in: match %ld, rate %f", match, *rate);
-               }
-
-               if (*to >= afrom && *to < afrom+i) {
-                       match -= i-(*to-afrom);
-                       *rate -= c->cnt * (((float) i)-(*to - afrom))/i;
-
-                       glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "matched to: match %ld, rate %f", match, *rate);
-
-               /* asi blbost 
-                       if (j == stats->archives[matchi].length - 1
-                               && *to > g->last_update)
-                       {
-                               match -= *to - g->last_update;
-                               *rate -= c->cnt * (((float) *to) - g->last_update)/i;
-                               dprintf(("corrected wrt. last_update: match %ld, rate %f\n",match,*rate));
-                       }
-               */
-
-                       break;
-               }
+                        "search %ld in %ld, %ld", *from, afrom, afrom+i);
+
+                // (from, to) is inside (afrom, afrom+i)
+                if (*from >= afrom && *to < afrom+i) {
+                        diff = *to - *from;
+                }
+                // (afrom, afrom+i) is inside (from, to)
+                else if (*from < afrom && *to >= afrom+i) {
+                        diff = i;
+                }
+                // from is in (afrom, afrom+i)
+                else if (*from >= afrom && *from < afrom+i) {
+                        diff = afrom+i - *from;
+                }
+                // to is in (afrom, afrom+i)
+                else if (*to >= afrom && *to < afrom+i) {
+                        diff = afrom+i - *to;
+                }
+                match += diff;
+                *rate += c->cnt * (float)diff/i;
+
+                if (*to >= afrom && *to < afrom+i) {
+                        glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "matched to: match %d, rate %f", match, rate);
+                        break;
+                }
        }
        *rate /= match;
 
@@ -491,3 +605,168 @@ int edg_wll_StateDurationServer(
        return edg_wll_SetError(ctx,ENOSYS,NULL);
 }
 
+int edg_wll_StateDurationFromToServer(
+        edg_wll_Context ctx,
+        const edg_wll_QueryRec  *group,
+        edg_wll_JobStatCode     base_state,
+       edg_wll_JobStatCode     final_state,
+        int                     minor,
+        time_t  *from,
+        time_t  *to,
+        float   *duration,
+        int     *res_from,
+        int     *res_to
+)
+{
+        edg_wll_Stats *stats = default_stats;   /* XXX: hardcoded */
+        struct edg_wll_stats_group      *g;
+        struct edg_wll_stats_archive    *a;
+        int     i,j,matchi;
+        char    *sig = NULL;
+        time_t  afrom,ato;
+        long    match, diff;
+       float   rate;
+       int     err;
+
+        edg_wll_ResetError(ctx);
+
+       if (err = findStat(ctx, group, base_state, final_state, minor, from, to, &stats)) return err;
+
+       /* remap the file if someone changed its size */
+        if (stats->map->grpno != stats->grpno)
+        {       
+                if (flock(stats->fd,LOCK_EX)) return edg_wll_SetError(ctx,errno,"flock()");
+                if (stats_remap(stats)) {
+                        edg_wll_SetError(ctx,errno,"shmem remap failed");
+                        goto cleanup;
+                }
+        }
+
+        if (flock(stats->fd,LOCK_SH)) return edg_wll_SetError(ctx,errno,"flock()");
+
+        /* XXX */
+        sig = str2md5base64(group->value.c);
+
+
+        for (i=0, g=stats->map; i<stats->grpno; i++) {
+                if (!strcmp(sig,g->sig)) break;
+                g = (struct edg_wll_stats_group *) (((char *) g) + stats->grpsize);
+        }
+
+        if (i == stats->grpno) {
+                glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                        "no match: %s\n",sig);
+                edg_wll_SetError(ctx,ENOENT,"no matching group");
+                goto cleanup;
+        }
+
+       match = 0;
+        matchi = -1;
+        /* XXX: assumes decreasing resolution of archives */
+        for (j=0; stats->archives[j].interval; j++) {
+                afrom = ato = g->last_update;
+
+                ato += stats->archives[j].interval - ato % stats->archives[j].interval;
+                afrom -= afrom % stats->archives[j].interval;
+                afrom -= stats->archives[j].interval * (stats->archives[j].length-1);
+
+                /* intersection of [from,to] and [afrom,ato] */
+                if (afrom < *from) afrom = *from;
+                if (ato > *to) ato = *to;
+
+                /* find the best match */
+                if (ato-afrom > match) {
+                        match = ato - afrom;
+                        matchi = j;
+                }
+        }
+
+        glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                "best match: archive %d, interval %ld", matchi, match);
+
+       if (matchi < 0) {
+                if (*from > g->last_update) {
+                        /* special case -- we are sure that nothing arrived */
+                       *duration = 0.0f;
+                        *res_from = *res_to = stats->archives[0].interval;
+                        goto cleanup;
+                }
+                edg_wll_SetError(ctx,ENOENT,"no data available");
+                goto cleanup;
+        }
+
+        *res_from = *res_to = stats->archives[matchi].interval;
+
+        a = g->archive;
+        for (j=0; j<matchi; j++) a = archive_skip(a,stats->archives[j].length);
+
+        i = stats->archives[matchi].interval;
+        afrom = g->last_update - g->last_update % i
+                        - (stats->archives[matchi].length-1)*i;
+
+        glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                "archive from %ld = %s", afrom, ctime(&afrom));
+
+        if (afrom > *from) *from = afrom;
+        if (afrom + stats->archives[matchi].length * i < *to) *to = afrom + stats->archives[matchi].length * i;
+
+        rate = 0.0f;
+       *duration = 0.0f;
+        match = 0;
+
+       for (j=0; j<stats->archives[matchi].length; j++,afrom += i) {
+                struct edg_wll_stats_cell       *c = a->cells + ((a->ptr+j+1) % stats->archives[matchi].length);
+
+                 glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                        "cell %d (abs %d): ",
+                        j, (a->ptr+j+1) % stats->archives[matchi].length);
+                if (c->cnt < 0) {
+                        glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "invalid");
+                        continue; /* invalid cell */
+                }
+
+                glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG,
+                        "search %ld in %ld, %ld", *from, afrom, afrom+i);
+
+               // (from, to) is inside (afrom, afrom+i)
+               if (*from >= afrom && *to < afrom+i) {
+                       printf("branch 1, (%ld %ld), %ld %ld\n", *from, *to, afrom, afrom+i);
+                       diff = *to - *from;
+               }
+               // (afrom, afrom+i) is inside (from, to)
+               else if (*from < afrom && *to >= afrom+i) {
+                       printf("branch 2, (%ld %ld), %ld %ld\n", *from, *to, afrom, afrom+i);                   
+                       diff = i;
+               }
+               // from is in (afrom, afrom+i)
+               else if (*from >= afrom && *from < afrom+i) {
+                       printf("branch 3, (%ld %ld), %ld %ld\n", *from, *to, afrom, afrom+i);
+                       diff = afrom+i - *from;
+               }
+               // to is in (afrom, afrom+i)
+               else if (*to >= afrom && *to < afrom+i) {
+                       printf("branch 4, (%ld %ld), (%ld %ld)\n", *from, *to, afrom, afrom+i);
+                       diff = afrom+i - *to;
+               }
+               printf("diff: %ld\n", diff);
+               match += diff;
+               rate += c->cnt * (float)diff/i;
+               if (c->cnt)
+                       *duration += (float)diff * c->value/c->cnt;
+
+               if (*to >= afrom && *to < afrom+i) { 
+                       glite_common_log(LOG_CATEGORY_LB_SERVER, LOG_PRIORITY_DEBUG, "matched to: match %d, rate %f, duration %f", match, rate, *duration);
+                       break;
+               }
+        }
+       printf("XXX %f %f %ld\n", rate, *duration, match);
+        rate /= match;
+       *duration /= match;
+       printf("YYY %f %f %ld\n", rate, *duration, match);
+
+cleanup:
+       free(sig);
+        flock(stats->fd,LOCK_UN);
+        return edg_wll_Error(ctx,NULL,NULL);
+}
+
index aad067e..b242bf9 100644 (file)
@@ -53,9 +53,10 @@ struct edg_wll_stats_group {
 
 
 typedef struct {
-       enum { STATS_UNDEF = 0, STATS_COUNT, STATS_DURATION }   type;
+       enum { STATS_UNDEF = 0, STATS_COUNT, STATS_DURATION, STATS_DURATION_FROMTO }    type;
        edg_wll_QueryRec        *group;
-       edg_wll_JobStatCode     major;
+       edg_wll_JobStatCode     base_state;
+       edg_wll_JobStatCode     final_state;
        int                     minor;
        struct _edg_wll_StatsArchive {
                int     interval,length;
@@ -91,4 +92,17 @@ int edg_wll_StateDurationServer(
        int     *res_to
 );
 
+int edg_wll_StateDurationFromToServer(
+        edg_wll_Context ctx,
+        const edg_wll_QueryRec  *group,
+        edg_wll_JobStatCode     base_state,
+        edg_wll_JobStatCode     final_state,
+        int                     minor,
+        time_t  *from,
+        time_t  *to,
+        float   *duration,
+        int     *res_from,
+        int     *res_to
+);
+
 #endif /* GLITE_LB_STATS_H */