From b14c7047f456d90e229c128f5fa109e4a86a509a Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Thu, 21 Apr 2005 12:02:33 +0000 Subject: [PATCH] LB-plugin - reading line by line from JP and parsing to events --- org.glite.lb.server/src/lb_plugin.c | 160 ++++++++++++++++++++++++++++++++---- 1 file changed, 143 insertions(+), 17 deletions(-) diff --git a/org.glite.lb.server/src/lb_plugin.c b/org.glite.lb.server/src/lb_plugin.c index 84ef774..358b6d7 100644 --- a/org.glite.lb.server/src/lb_plugin.c +++ b/org.glite.lb.server/src/lb_plugin.c @@ -11,6 +11,7 @@ #include "glite/lb/jobstat.h" #include "glite/lb/events.h" +#include "glite/lb/events_parse.h" //#include "jobstat.h" @@ -18,9 +19,18 @@ #include "glite/jp/context.h" #include "glite/jp/file_plugin.h" #include "glite/jp/builtin_plugins.h" +#include "glite/jp/backend.h" + +#define INITIAL_NUMBER_EVENTS 100 + +typedef struct _lb_buffer_t { + char *buf; + size_t pos, size; + ssize_t offset; +} lb_buffer_t; typedef struct _lb_handle { - edg_wll_Event *events; + edg_wll_Event **events; edg_wll_JobStat status; } lb_handle; @@ -29,11 +39,12 @@ typedef struct _lb_handle { static int lb_query(void *fpctx,void *handle,glite_jp_attr_t attr,glite_jp_attrval_t **attrval); static int lb_open(void *,void *,void **); static int lb_close(void *,void *); -static int lb_status(edg_wll_Event *event, edg_wll_JobStat *status); +/*static int lb_status(edg_wll_Event *event, edg_wll_JobStat *status);*/ +static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line); -static int lb_dummy() +static int lb_dummy(void *fpctx, void *handle, int oper, ...) { puts("lb_dummy() - generic call not used; for testing purposes only..."); return -1; @@ -62,39 +73,75 @@ int init(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data) -static int lb_open(void *fpctx,void *bhandle,void **handle) +static int lb_open(void *fpctx, void *bhandle, void **handle) { - lb_handle *h; - + lb_handle *h; + lb_buffer_t buffer; + glite_jp_context_t ctx = (glite_jp_context_t) fpctx; + char *line; + int retval; + edg_wll_Context context; + size_t nevents, maxnevents, i; h = calloc(1, sizeof(lb_handle)); + if ((retval = edg_wll_InitContext(&context)) != 0) return retval; // read the file given by bhandle // parse events into h->events array + memset(&buffer, sizeof(buffer), 0); + buffer.buf = malloc(BUFSIZ); + maxnevents = INITIAL_NUMBER_EVENTS; + nevents = 0; + h->events = malloc(maxnevents * sizeof(edg_wll_Event *)); + + if ((retval = read_line(ctx, bhandle, &buffer, &line)) != 0) goto fail; + while (line) { + printf("(DEBUG)lb plugin: '%s'\n", line); + + if (nevents >= maxnevents) { + maxnevents <<= 1; + h->events = realloc(h->events, maxnevents * sizeof(edg_wll_Event *)); + } + if ((retval = edg_wll_ParseEvent(context, line, &h->events[nevents])) != 0) goto fail; + nevents++; + free(line); + + if ((retval = read_line(ctx, bhandle, &buffer, &line)) != 0) goto fail; + } + + free(buffer.buf); // compute state of the job - still unclear if needed + // TODO handle = (void **) &h; return 0; + +fail: + for (i = 0; i < nevents; i++) edg_wll_FreeEvent(h->events[i]); + free(h->events); + free(buffer.buf); + edg_wll_FreeContext(context); + return retval; } static int lb_close(void *fpctx,void *handle) { - lb_handle *h = (lb_handle *) handle; - int i; + lb_handle *h = (lb_handle *) handle; + int i; // Free allocated stuctures if (h->events) { i = 0; - while (h->events[i].type != EDG_WLL_EVENT_UNDEF) + while (h->events[i]) { - edg_wll_FreeEvent(&h->events[i]); + edg_wll_FreeEvent(h->events[i]); i++; } free(h->events); @@ -103,6 +150,8 @@ static int lb_close(void *fpctx,void *handle) if (h->status.state != EDG_WLL_JOB_UNDEF) edg_wll_FreeStatus(&h->status); + free(h); + return 0; } @@ -138,19 +187,19 @@ static int lb_query(void *fpctx,void *handle,glite_jp_attr_t attr,glite_jp_attrv i = 0; n_tags = 0; - while (h->events[i].type != EDG_WLL_EVENT_UNDEF) + while (h->events[i]) { - if (h->events[i].type == EDG_WLL_EVENT_USERTAG) + if (h->events[i]->type == EDG_WLL_EVENT_USERTAG) { av = realloc(av, (n_tags+2) * sizeof(glite_jp_attrval_t)); - av[i].value.tag.name = strdup(h->events[i].userTag.name); + av[i].value.tag.name = strdup(h->events[i]->userTag.name); av[i].value.tag.sequence = -1; av[i].value.tag.timestamp = - h->events[i].any.timestamp.tv_sec; + h->events[i]->any.timestamp.tv_sec; av[i].value.tag.binary = 0; av[i].value.tag.size = -1; - av[i].value.tag.value = strdup(h->events[i].userTag.value); + av[i].value.tag.value = strdup(h->events[i]->userTag.value); av[i].attr.type = GLITE_JP_ATTR_TAG; av[i+1].attr.type = GLITE_JP_ATTR_UNDEF; @@ -186,9 +235,9 @@ static int lb_status(edg_wll_Event *event, edg_wll_JobStat *status) calloc(1, sizeof(intJobStat)); i = 0; - while (events[i].any.type != EDG_WLL_EVENT_UNDEF) + while (events[i]) { - processEvent(js, &(events[i]), 0, be_strict, &errstring); + processEvent(js, events[i], 0, be_strict, &errstring); i++; } @@ -201,3 +250,80 @@ err: } */ + +/* + * realloc the line to double size if needed + * + * \return 0 if failed, did nothing + * \return 1 if success + */ +int check_realloc_line(char **line, size_t *maxlen, size_t len) { + void *tmp; + + if (len >= *maxlen) { + *maxlen <<= 1; + tmp = realloc(*line, *maxlen); + if (!tmp) return 0; + *line = tmp; + } + + return 1; +} + + +/* + * read next line from stream + * + * /return error code + */ +static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line) { + size_t maxlen, len, i; + ssize_t nbytes; + int retval; + + maxlen = BUFSIZ; + i = 0; + len = 0; + *line = malloc(maxlen); + + do { + /* read next portion */ + if (buffer->pos >= buffer->size) { + buffer->pos = 0; + buffer->size = 0; + if ((retval = glite_jppsbe_pread(ctx, handle, buffer->buf, BUFSIZ, buffer->offset, &nbytes)) == 0) { + if (nbytes < 0) { + retval = EINVAL; + goto fail; + } else { + buffer->size = (size_t)nbytes; + buffer->offset += nbytes; + } + } else goto fail; + } + + /* we have buffer->size - buffer->pos bytes */ + i = buffer->pos; + while (i < buffer->size && buffer->buf[i] != '\n' && buffer->buf[i] != '\0') { + if (!check_realloc_line(line, &maxlen, len)) { + retval = ENOMEM; + goto fail; + } + *line[len++] = buffer->buf[i++]; + } + buffer->pos = i; + } while (len && *line[len - 1] != '\n' && *line[len - 1] != '\0'); + + if (len) *line[len - 1] = '\0'; + else { + free(*line); + *line = NULL; + } + + return 0; + +fail: + free(*line); + *line = NULL; + return retval; +} -- 1.8.2.3