#include "glite/jp/builtin_plugins.h"
#include "glite/jp/backend.h"
#include "glite/jp/attr.h"
-#include "glite/jp/utils.h"
+//#include "glite/jp/utils.h"
#include "glite/jp/known_attr.h"
#include "job_attrs.h"
#include "job_attrs2.h"
glite_jpps_fplug_data_t* classad_plugin;
} lb_handle;
+typedef struct _rl_buffer_t {
+ char *buf;
+ size_t pos, size;
+ off_t offset;
+} rl_buffer_t;
+
#define check_strdup(s) ((s) ? strdup(s) : NULL)
extern int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
static int lb_close(void *fpctx, void *handle);
static int lb_filecom(void *fpctx, void *handle);
static int lb_status(void *handle);
-//static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line);
+static int readline(
+ glite_jp_context_t ctx,
+ void *handle,
+ rl_buffer_t *buffer,
+ char **line
+);
+char* get_namespace(const char* attr);
+
static int lb_dummy(void *fpctx, void *handle, int oper, ...) {
puts("lb_dummy() - generic call not used; for testing purposes only...");
nevents = 0;
h->events = malloc(maxnevents * sizeof(edg_wll_Event *));
- if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) {
+ if ((retval = readline(ctx, bhandle, &buffer, &line)) != 0) {
err.code = retval;
err.desc = "reading LB logline";
err.source = "lb_plugin.c:read_line()";
}
free(line);
- if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) {
+ if ((retval = readline(ctx, bhandle, &buffer, &line)) != 0) {
err.code = retval;
err.desc = "reading LB logline";
err.source = "lb_plugin.c:read_line()";
glite_jp_error_t err;
glite_jp_attrval_t *av = NULL;
int i, j, n_tags;
- char *ns = glite_jpps_get_namespace(attr);
+ char *ns = get_namespace(attr);
char *tag;
glite_jp_clear_error(ctx);
qsort(e,n,sizeof(*e),compare_pevents_by_seq);
}
+int check_realloc_line(char **line, size_t *maxlen, size_t len) {
+ void *tmp;
+
+ if (len > *maxlen) {
+ *maxlen <<= 1;
+ tmp = realloc(*line, *maxlen);
+ if (!tmp) return 0;
+ *line = tmp;
+ }
+
+ return 1;
+}
+
+int readline(
+ glite_jp_context_t ctx,
+ void *handle,
+ rl_buffer_t *buffer,
+ char **line
+)
+{
+ size_t maxlen, len, i;
+ ssize_t nbytes;
+ int retval, z, end;
+
+ maxlen = BUFSIZ;
+ i = 0;
+ len = 0;
+ *line = malloc(maxlen);
+ end = 0;
+
+ do {
+ /* read next portion */
+ if (buffer->pos >= buffer->size) {
+ buffer->pos = 0;
+ buffer->size = 0;
+ if ((retval = glite_jppsbe_pread(ctx, handle, buffer->buf, BUFSIZ, buffer->offset, &nbytes)) == 0) {
+ if (nbytes < 0) {
+ retval = EINVAL;
+ goto fail;
+ } else {
+ if (nbytes) {
+ buffer->size = (size_t)nbytes;
+ buffer->offset += nbytes;
+ } else end = 1;
+ }
+ } else goto fail;
+ }
+
+ /* we have buffer->size - buffer->pos bytes */
+ i = buffer->pos;
+ do {
+ if (i >= buffer->size) z = '\0';
+ else {
+ z = buffer->buf[i];
+ if (z == '\n') z = '\0';
+ }
+ len++;
+
+ if (!check_realloc_line(line, &maxlen, len)) {
+ retval = ENOMEM;
+ goto fail;
+ }
+ (*line)[len - 1] = z;
+ i++;
+ } while (z && i < buffer->size);
+ buffer->pos = i;
+ } while (len && (*line)[len - 1] != '\0');
+
+ if ((!len || !(*line)[0]) && end) {
+ free(*line);
+ *line = NULL;
+ }
+
+ return 0;
+
+fail:
+ free(*line);
+ *line = NULL;
+ return retval;
+}
+
+char* get_namespace(const char* attr){
+ char* namespace = strdup(attr);
+ char* colon = strrchr(namespace, ':');
+ if (colon)
+ namespace[strrchr(namespace, ':') - namespace] = 0;
+ else
+ namespace[0] = 0;
+ return namespace;
+}
+