#include "glite/lb/context.h"
#include "glite/lb/jobstat.h"
-
#include "glite/lb/events.h"
#include "glite/lb/events_parse.h"
-
#include "glite/lb/trio.h"
#include "jobstat.h"
+#include "get_events.h"
#include "glite/jp/types.h"
#include "glite/jp/context.h"
#include "jp_job_attrs.h"
#define INITIAL_NUMBER_EVENTS 100
+#define INITIAL_NUMBER_STATES EDG_WLL_NUMBER_OF_STATCODES
#define LB_PLUGIN_NAMESPACE "urn:org.glite.lb"
typedef struct _lb_buffer_t {
- char *buf;
- size_t pos, size;
- off_t offset;
+ char *buf;
+ size_t pos, size;
+ off_t offset;
} lb_buffer_t;
+typedef struct _lb_historyStatus {
+ edg_wll_JobStatCode state;
+ struct timeval timestamp;
+ char *reason;
+} lb_historyStatus;
+
typedef struct _lb_handle {
- edg_wll_Event **events;
- edg_wll_JobStat status;
+ edg_wll_Event **events;
+ edg_wll_JobStat status;
+ lb_historyStatus **fullStatusHistory, **lastStatusHistory;
} lb_handle;
#define check_strdup(s) ((s) ? strdup(s) : NULL)
extern int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
-static int lb_query(void *fpctx,void *handle,const char * attr,glite_jp_attrval_t **attrval);
-static int lb_open(void *,void *, const char *uri, void **);
-static int lb_close(void *,void *);
-static int lb_status(edg_wll_Event **event, edg_wll_JobStat *status);
-static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line);
-
+static int lb_query(void *fpctx, void *handle, const char *attr, glite_jp_attrval_t **attrval);
+static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle);
+static int lb_close(void *fpctx, void *handle);
+static int lb_status(void *handle);
+static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line);
static int lb_dummy(void *fpctx, void *handle, int oper, ...) {
puts("lb_dummy() - generic call not used; for testing purposes only...");
fprintf(stderr,"lb_plugin: opened %d events\n", nevents);
#endif
- /* count state of job given by loaded events */
- if ((retval = lb_status(h->events, &(h->status))) != 0) goto fail;
+ /* count state and status hiftory of the job given by the loaded events */
+ if ((retval = lb_status(h)) != 0) goto fail;
*handle = (void *)h;
if (h->status.state != EDG_WLL_JOB_UNDEF)
edg_wll_FreeStatus(&h->status);
+ if (h->fullStatusHistory) {
+ i = 0;
+ while (h->fullStatusHistory[i]) {
+ if (h->fullStatusHistory[i]->reason) free(h->fullStatusHistory[i]->reason);
+ free (h->fullStatusHistory[i]);
+ i++;
+ }
+ }
+
free(h);
#ifdef PLUGIN_DEBUG
h->status.children_num);
av[0].size = -1;
av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- } else if (strcmp(attr, GLITE_JP_LB_lastStatusHistory) == 0 ||
- strcmp(attr, GLITE_JP_LB_fullStatusHistory) == 0) {
- /* complex types */
+ } else if (strcmp(attr, GLITE_JP_LB_lastStatusHistory) == 0) {
+/*
av = calloc(1 + EDG_WLL_NUMBER_OF_STATCODES, sizeof(glite_jp_attrval_t));
av[0].name = strdup(attr);
av[0].value = check_strdup(h->status.reason);
av[i].timestamp = h->status.stateEnterTimes[i+1];
av[i].size = -1;
}
+*/
+ i = 0; while (h->lastStatusHistory[i]) i++;
+ av = calloc(i+2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = check_strdup(h->status.reason);
+ av[0].timestamp = h->status.stateEnterTime.tv_sec;
+ av[0].size = -1;
+ if (h->fullStatusHistory[0]) {
+ av[1].name = edg_wll_StatToString(h->fullStatusHistory[0]->state);
+ av[1].value = check_strdup(h->fullStatusHistory[0]->reason);
+ av[1].timestamp = h->fullStatusHistory[0]->timestamp.tv_sec;
+ av[1].size = -1;
+ }
+ i = 0;
+ while (h->lastStatusHistory[i]) {
+ av[i+2].name = edg_wll_StatToString(h->lastStatusHistory[i]->state);
+ av[i+2].value = check_strdup(h->lastStatusHistory[i]->reason);
+ av[i+2].timestamp = h->lastStatusHistory[i]->timestamp.tv_sec;
+ av[i+2].size = -1;
+ i++;
+ }
+
+ } else if (strcmp(attr, GLITE_JP_LB_fullStatusHistory) == 0) {
+ i = 0; while (h->fullStatusHistory[i]) i++;
+ av = calloc(i+1, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = check_strdup(h->status.reason);
+ av[0].timestamp = h->status.stateEnterTime.tv_sec;
+ av[0].size = -1;
+ i = 0;
+ while (h->fullStatusHistory[i]) {
+ av[i+1].name = edg_wll_StatToString(h->fullStatusHistory[i]->state);
+ av[i+1].value = check_strdup(h->fullStatusHistory[i]->reason);
+ av[i+1].timestamp = h->fullStatusHistory[i]->timestamp.tv_sec;
+ av[i+1].size = -1;
+ i++;
+ }
} else if (strncmp(attr, GLITE_JP_LBTAG_NS, sizeof(GLITE_JP_LBTAG_NS)-1) == 0) {
tag = strrchr(attr, ':');
if (h->events && tag) {
}
-static int lb_status(edg_wll_Event **events, edg_wll_JobStat *status) {
+// static int lb_status(edg_wll_Event **events, edg_wll_JobStat *status) {
+static int lb_status(void *handle) {
+ lb_handle *h = (lb_handle *) handle;
intJobStat *js;
- int i, be_strict = 0;
+ int maxnstates, nstates, i, be_strict = 0;
char *errstring;
+ edg_wll_JobStatCode old_state = EDG_WLL_JOB_UNDEF;
js = calloc(1, sizeof(intJobStat));
init_intJobStat(js);
- edg_wll_SortPEvents(events);
+ edg_wll_SortPEvents(h->events);
+ maxnstates = INITIAL_NUMBER_STATES;
+ nstates = 0;
+ h->fullStatusHistory = calloc(maxnstates, sizeof(lb_historyStatus *));
i = 0;
- while (events[i])
+ while (h->events[i])
{
- /* XXX: job owner and jobId not filled from events normally */
- if (events[i]->any.type == EDG_WLL_EVENT_REGJOB) {
- js->pub.owner = check_strdup(events[i]->any.user);
- if (edg_wlc_JobIdDup(events[i]->any.jobId,&js->pub.jobId)) {
+ /* realloc the fullStatusHistory if needed */
+ if (nstates >= maxnstates) {
+ maxnstates <<= 1;
+ h->fullStatusHistory = realloc(h->fullStatusHistory, maxnstates * sizeof(lb_historyStatus *));
+ }
+
+ /* job owner and jobId not filled from events normally */
+ if (h->events[i]->any.type == EDG_WLL_EVENT_REGJOB) {
+ js->pub.owner = check_strdup(h->events[i]->any.user);
+ if (edg_wlc_JobIdDup(h->events[i]->any.jobId,&js->pub.jobId)) {
goto err;
}
}
- if (processEvent(js, events[i], 0, be_strict, &errstring) == RET_FATAL) {
+ /* Process Event and update the state */
+ if (processEvent(js, h->events[i], 0, be_strict, &errstring) == RET_FATAL) {
goto err;
}
+
+ /* if the state has changed, update the status history */
+ if (js->pub.state != old_state) {
+ h->fullStatusHistory[nstates] = calloc(1,sizeof(lb_historyStatus));
+ h->fullStatusHistory[nstates]->state = js->pub.state;
+ h->fullStatusHistory[nstates]->timestamp.tv_sec = js->pub.stateEnterTime.tv_sec;
+ h->fullStatusHistory[nstates]->timestamp.tv_usec = js->pub.stateEnterTime.tv_usec;
+ h->fullStatusHistory[nstates]->reason = check_strdup(js->pub.reason);
+ if (js->pub.state == EDG_WLL_JOB_WAITING) {
+ h->lastStatusHistory = &h->fullStatusHistory[nstates];
+ }
+ old_state = js->pub.state;
+ nstates++;
+ }
+
i++;
}
- memcpy(status, &js->pub, sizeof(edg_wll_JobStat));
+ memcpy(&h->status, &js->pub, sizeof(edg_wll_JobStat));
// XXX: awful, hopefully working
memset(&js->pub, 0, sizeof(edg_wll_JobStat));
plugin_data.ops.attr(jpctx, data_handle, GLITE_JP_LB_lastStatusHistory, &attrval);
if (attrval) {
fprintf(stdout,"\t<lastStatusHistory>\n");
+/*
for (i = 1; i < EDG_WLL_NUMBER_OF_STATCODES; i++) {
char *stat = edg_wll_StatToString(i);
fprintf(stdout,"\t<status>\n");
fprintf(stdout,"\t</status>\n");
if (stat) free(stat);
}
+*/
+ i = 1;
+ while (attrval[i].name) {
+ fprintf(stdout,"\t<status>\n");
+ fprintf(stdout,"\t\t<status>%s</status>\n", attrval[i].name);
+ fprintf(stdout,"\t\t<timestamp>%ld.%06ld</timestamp>\n", attrval[i].timestamp,0);
+ fprintf(stdout,"\t\t<reason>%s</reason>\n", attrval[i].value ? attrval[i].value : "");
+ fprintf(stdout,"\t</status>\n");
+ i++;
+ }
fprintf(stdout,"\t</lastStatusHistory>\n");
free_attrs(attrval);
}
plugin_data.ops.attr(jpctx, data_handle, GLITE_JP_LB_fullStatusHistory, &attrval);
if (attrval) {
fprintf(stdout,"\t<fullStatusHistory>\n");
- for (i = 1; i < EDG_WLL_NUMBER_OF_STATCODES; i++) {
- char *stat = edg_wll_StatToString(i);
+ i = 1;
+ while (attrval[i].name) {
fprintf(stdout,"\t<status>\n");
- fprintf(stdout,"\t\t<status>%s</status>\n", stat);
+ fprintf(stdout,"\t\t<status>%s</status>\n", attrval[i].name);
fprintf(stdout,"\t\t<timestamp>%ld.%06ld</timestamp>\n", attrval[i].timestamp,0);
fprintf(stdout,"\t\t<reason>%s</reason>\n", attrval[i].value ? attrval[i].value : "");
fprintf(stdout,"\t</status>\n");
- if (stat) free(stat);
+ i++;
}
fprintf(stdout,"\t</fullStatusHistory>\n");
free_attrs(attrval);