Functions that is used only in lb_plugin has been moved into lb_plugin.c.
authorJiří Filipovič <fila@ics.muni.cz>
Fri, 28 Mar 2008 09:57:07 +0000 (09:57 +0000)
committerJiří Filipovič <fila@ics.muni.cz>
Fri, 28 Mar 2008 09:57:07 +0000 (09:57 +0000)
org.glite.lb.state-machine/src/lb_plugin.c

index 2a75df9..5298cea 100644 (file)
@@ -39,7 +39,7 @@
 #include "glite/jp/builtin_plugins.h"
 #include "glite/jp/backend.h"
 #include "glite/jp/attr.h"
-#include "glite/jp/utils.h"
+//#include "glite/jp/utils.h"
 #include "glite/jp/known_attr.h"
 #include "job_attrs.h"
 #include "job_attrs2.h"
@@ -66,6 +66,12 @@ typedef struct _lb_handle {
        glite_jpps_fplug_data_t*        classad_plugin;
 } lb_handle;
 
+typedef struct _rl_buffer_t {
+        char                    *buf;
+        size_t                  pos, size;
+        off_t                   offset;
+} rl_buffer_t;
+
 #define check_strdup(s) ((s) ? strdup(s) : NULL)
 
 extern int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
@@ -76,7 +82,14 @@ static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle);
 static int lb_close(void *fpctx, void *handle);
 static int lb_filecom(void *fpctx, void *handle);
 static int lb_status(void *handle);
-//static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line);
+static int readline( 
+        glite_jp_context_t ctx,
+        void *handle,
+        rl_buffer_t *buffer,
+        char **line
+);
+char* get_namespace(const char* attr);
+
 
 static int lb_dummy(void *fpctx, void *handle, int oper, ...) {
        puts("lb_dummy() - generic call not used; for testing purposes only...");
@@ -149,7 +162,7 @@ static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle) {
        nevents = 0;
        h->events = malloc(maxnevents * sizeof(edg_wll_Event *));
 
-       if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) {
+       if ((retval = readline(ctx, bhandle, &buffer, &line)) != 0) {
                err.code = retval;
                err.desc = "reading LB logline";
                err.source = "lb_plugin.c:read_line()";
@@ -199,7 +212,7 @@ static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle) {
                }
                free(line);
 
-               if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) {
+               if ((retval = readline(ctx, bhandle, &buffer, &line)) != 0) {
                        err.code = retval;
                        err.desc = "reading LB logline";
                        err.source = "lb_plugin.c:read_line()";
@@ -349,7 +362,7 @@ static int lb_query(void *fpctx,void *handle, const char *attr,glite_jp_attrval_
        glite_jp_error_t        err; 
        glite_jp_attrval_t      *av = NULL;
        int                     i, j, n_tags;
-       char                    *ns = glite_jpps_get_namespace(attr);
+       char                    *ns = get_namespace(attr);
        char                    *tag;
 
         glite_jp_clear_error(ctx); 
@@ -1044,3 +1057,94 @@ static void edg_wll_SortPEvents(edg_wll_Event **e)
        qsort(e,n,sizeof(*e),compare_pevents_by_seq);
 }
 
+int check_realloc_line(char **line, size_t *maxlen, size_t len) {
+        void *tmp;
+
+        if (len > *maxlen) {
+                *maxlen <<= 1;
+                tmp = realloc(*line, *maxlen);
+                if (!tmp) return 0;
+                *line = tmp;
+        }
+
+        return 1;
+}
+
+int readline(
+        glite_jp_context_t ctx,
+        void *handle,
+        rl_buffer_t *buffer,
+        char **line
+)
+{
+        size_t maxlen, len, i;
+        ssize_t nbytes;
+        int retval, z, end;
+
+        maxlen = BUFSIZ;
+        i = 0;
+        len = 0;
+        *line = malloc(maxlen);
+        end = 0;
+
+        do {
+                /* read next portion */
+                if (buffer->pos >= buffer->size) {
+                        buffer->pos = 0;
+                        buffer->size = 0;
+                        if ((retval = glite_jppsbe_pread(ctx, handle, buffer->buf, BUFSIZ, buffer->offset, &nbytes)) == 0) {
+                                if (nbytes < 0) {
+                                        retval = EINVAL;
+                                        goto fail;
+                                } else {
+                                        if (nbytes) {
+                                                buffer->size = (size_t)nbytes;
+                                                buffer->offset += nbytes;
+                                        } else end = 1;
+                                }
+                        } else goto fail;
+                }
+
+                /* we have buffer->size - buffer->pos bytes */
+                i = buffer->pos;
+                do {
+                        if (i >= buffer->size) z = '\0';
+                        else {
+                                z = buffer->buf[i];
+                                if (z == '\n') z = '\0';
+                        }
+                        len++;
+
+                        if (!check_realloc_line(line, &maxlen, len)) {
+                                retval = ENOMEM;
+                                goto fail;
+                        }
+                        (*line)[len - 1] = z;
+                        i++;
+                } while (z && i < buffer->size);
+                buffer->pos = i;
+        } while (len && (*line)[len - 1] != '\0');
+
+        if ((!len || !(*line)[0]) && end) {
+                free(*line);
+                *line = NULL;
+        }
+
+        return 0;
+
+fail:
+        free(*line);
+        *line = NULL;
+        return retval;
+}
+
+char* get_namespace(const char* attr){
+        char* namespace = strdup(attr);
+        char* colon = strrchr(namespace, ':');
+        if (colon)
+                namespace[strrchr(namespace, ':') - namespace] = 0;
+        else
+                namespace[0] = 0;
+        return namespace;
+}
+