LB-plugin - reading line by line from JP and parsing to events
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 21 Apr 2005 12:02:33 +0000 (12:02 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 21 Apr 2005 12:02:33 +0000 (12:02 +0000)
org.glite.lb.server/src/lb_plugin.c

index 84ef774..358b6d7 100644 (file)
@@ -11,6 +11,7 @@
 #include "glite/lb/jobstat.h"
 
 #include "glite/lb/events.h"
+#include "glite/lb/events_parse.h"
 
 //#include "jobstat.h"
 
 #include "glite/jp/context.h"
 #include "glite/jp/file_plugin.h"
 #include "glite/jp/builtin_plugins.h"
+#include "glite/jp/backend.h"
+
+#define INITIAL_NUMBER_EVENTS 100
+
+typedef struct _lb_buffer_t {
+       char *buf;
+       size_t pos, size;
+       ssize_t offset;
+} lb_buffer_t;
 
 typedef struct _lb_handle {
-       edg_wll_Event   *events;
+       edg_wll_Event   **events;
        edg_wll_JobStat status;
 } lb_handle;
 
@@ -29,11 +39,12 @@ typedef struct _lb_handle {
 static int lb_query(void *fpctx,void *handle,glite_jp_attr_t attr,glite_jp_attrval_t **attrval);
 static int lb_open(void *,void *,void **);
 static int lb_close(void *,void *);
-static int lb_status(edg_wll_Event *event, edg_wll_JobStat *status);
+/*static int lb_status(edg_wll_Event *event, edg_wll_JobStat *status);*/
+static int read_line(glite_jp_context_t  ctx, void *handle, lb_buffer_t *buffer, char **line);
 
 
 
-static int lb_dummy()
+static int lb_dummy(void *fpctx, void *handle, int oper, ...)
 {
        puts("lb_dummy() - generic call not used; for testing purposes only...");
        return -1;
@@ -62,39 +73,75 @@ int init(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data)
 
 
 
-static int lb_open(void *fpctx,void *bhandle,void **handle)
+static int lb_open(void *fpctx, void *bhandle, void **handle)
 {
-       lb_handle       *h;
-
+       lb_handle           *h;
+       lb_buffer_t         buffer;
+       glite_jp_context_t  ctx = (glite_jp_context_t) fpctx;
+       char *line;
+       int retval;
+       edg_wll_Context     context;
+       size_t              nevents, maxnevents, i;
        
        h = calloc(1, sizeof(lb_handle));
 
+       if ((retval = edg_wll_InitContext(&context)) != 0) return retval;
 
        // read the file given by bhandle
        // parse events into h->events array
+       memset(&buffer, sizeof(buffer), 0);
+       buffer.buf = malloc(BUFSIZ);
+       maxnevents = INITIAL_NUMBER_EVENTS;
+       nevents = 0;
+       h->events = malloc(maxnevents * sizeof(edg_wll_Event *));
+
+       if ((retval = read_line(ctx, bhandle, &buffer, &line)) != 0) goto fail;
+       while (line) {
+               printf("(DEBUG)lb plugin: '%s'\n", line);
+
+               if (nevents >= maxnevents) {
+                       maxnevents <<= 1;
+                       h->events = realloc(h->events, maxnevents * sizeof(edg_wll_Event *));
+               }
+               if ((retval = edg_wll_ParseEvent(context, line, &h->events[nevents])) != 0) goto fail;
+               nevents++;
+               free(line);
+
+               if ((retval = read_line(ctx, bhandle, &buffer, &line)) != 0) goto fail;
+       }
+
+       free(buffer.buf);
 
        // compute state of the job - still unclear if needed
+       // TODO
 
        handle = (void **) &h;
 
        return 0;
+
+fail:
+       for (i = 0; i < nevents; i++) edg_wll_FreeEvent(h->events[i]);
+       free(h->events);
+       free(buffer.buf);
+       edg_wll_FreeContext(context);
+       return retval;
 }
 
 
 
 static int lb_close(void *fpctx,void *handle)
 {
-       lb_handle       *h = (lb_handle *) handle;
-       int             i;
+       lb_handle           *h = (lb_handle *) handle;
+       int                 i;
 
        
        // Free allocated stuctures
        if (h->events)
        {
                i = 0;
-               while (h->events[i].type != EDG_WLL_EVENT_UNDEF)
+               while (h->events[i])
                {
-                       edg_wll_FreeEvent(&h->events[i]); 
+                       edg_wll_FreeEvent(h->events[i]); 
                        i++;
                }
                free(h->events);
@@ -103,6 +150,8 @@ static int lb_close(void *fpctx,void *handle)
        if (h->status.state != EDG_WLL_JOB_UNDEF) 
                edg_wll_FreeStatus(&h->status);
 
+       free(h);
+
        return 0;
 }
 
@@ -138,19 +187,19 @@ static int lb_query(void *fpctx,void *handle,glite_jp_attr_t attr,glite_jp_attrv
                                i = 0;
                                n_tags = 0;
 
-                               while (h->events[i].type != EDG_WLL_EVENT_UNDEF)
+                               while (h->events[i])
                                {
-                                       if (h->events[i].type == EDG_WLL_EVENT_USERTAG)
+                                       if (h->events[i]->type == EDG_WLL_EVENT_USERTAG)
                                        {
                                                av = realloc(av, (n_tags+2) * sizeof(glite_jp_attrval_t));
 
-                                               av[i].value.tag.name = strdup(h->events[i].userTag.name);
+                                               av[i].value.tag.name = strdup(h->events[i]->userTag.name);
                                                av[i].value.tag.sequence = -1;
                                                av[i].value.tag.timestamp = 
-                                                       h->events[i].any.timestamp.tv_sec;  
+                                                       h->events[i]->any.timestamp.tv_sec;
                                                av[i].value.tag.binary = 0;
                                                av[i].value.tag.size = -1;
-                                               av[i].value.tag.value = strdup(h->events[i].userTag.value);
+                                               av[i].value.tag.value = strdup(h->events[i]->userTag.value);
                                                av[i].attr.type = GLITE_JP_ATTR_TAG;
 
                                                av[i+1].attr.type = GLITE_JP_ATTR_UNDEF;
@@ -186,9 +235,9 @@ static int lb_status(edg_wll_Event *event, edg_wll_JobStat *status)
         calloc(1, sizeof(intJobStat));
         
        i = 0;
-        while (events[i].any.type != EDG_WLL_EVENT_UNDEF)  
+        while (events[i])  
         {
-               processEvent(js, &(events[i]), 0, be_strict, &errstring);
+               processEvent(js, events[i], 0, be_strict, &errstring);
                i++;
        }
 
@@ -201,3 +250,80 @@ err:
 }
 */
 
+
+/*
+ * realloc the line to double size if needed
+ *
+ * \return 0 if failed, did nothing
+ * \return 1 if success
+ */
+int check_realloc_line(char **line, size_t *maxlen, size_t len) {
+       void *tmp;
+
+       if (len >= *maxlen) {
+               *maxlen <<= 1;
+               tmp = realloc(*line, *maxlen);
+               if (!tmp) return 0;
+               *line = tmp;
+       }
+
+       return 1;
+}
+
+
+/*
+ * read next line from stream
+ *
+ * /return error code
+ */
+static int read_line(glite_jp_context_t  ctx, void *handle, lb_buffer_t *buffer, char **line) {
+       size_t maxlen, len, i;
+       ssize_t nbytes;
+       int retval;
+
+       maxlen = BUFSIZ;
+       i = 0;
+       len = 0;
+       *line = malloc(maxlen);
+
+       do {
+               /* read next portion */
+               if (buffer->pos >= buffer->size) {
+                       buffer->pos = 0;
+                       buffer->size = 0;
+                       if ((retval = glite_jppsbe_pread(ctx, handle, buffer->buf, BUFSIZ, buffer->offset, &nbytes)) == 0) {
+                               if (nbytes < 0) {
+                                       retval = EINVAL;
+                                       goto fail;
+                               } else {
+                                       buffer->size = (size_t)nbytes;
+                                       buffer->offset += nbytes;
+                               }
+                       } else goto fail;
+               }
+
+               /* we have buffer->size - buffer->pos bytes */
+               i = buffer->pos;
+               while (i < buffer->size && buffer->buf[i] != '\n' && buffer->buf[i] != '\0') {
+                       if (!check_realloc_line(line, &maxlen, len)) {
+                               retval = ENOMEM;
+                               goto fail;
+                       }
+                       *line[len++] = buffer->buf[i++];
+               }
+               buffer->pos = i;
+       } while (len && *line[len - 1] != '\n' && *line[len - 1] != '\0');
+
+       if (len) *line[len - 1] = '\0';
+       else {
+               free(*line);
+               *line = NULL;
+       }
+
+       return 0;
+
+fail:
+       free(*line);
+       *line = NULL;
+       return retval;
+}