GetJobAttributes roughly working, tested wrt. user tags only
authorAleš Křenek <ljocha@ics.muni.cz>
Tue, 23 Aug 2005 15:16:11 +0000 (15:16 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Tue, 23 Aug 2005 15:16:11 +0000 (15:16 +0000)
org.glite.jp.primary/examples/jpps-test.c
org.glite.jp.primary/src/attrs.c
org.glite.jp.primary/src/new_ftp_backend.c
org.glite.jp.primary/src/tags_plugin.c

index 5e42cf1..5a9077a 100644 (file)
@@ -97,6 +97,16 @@ static struct jptype__PrimaryQueryElement sample_query[][5] = {
 };
 #endif
 
+static const char *orig2str(enum jptype__attrOrig orig)
+{
+       switch (orig) {
+               case jptype__attrOrig__SYSTEM: return "SYSTEM";
+               case jptype__attrOrig__USER: return "USER";
+               case jptype__attrOrig__FILE_: return "FILE";
+               default: return "unknown";
+       }
+}
+
 int main(int argc,char *argv[])
 {
        char    *server = "http://localhost:8901";
@@ -224,7 +234,7 @@ int main(int argc,char *argv[])
                {
                        int     i;
 
-                       printf("JobLog:\n");
+                       printf("JobFiles:\n");
 
                        for (i=0; i<out.__sizefiles;i++) {
                                printf("\tclass = %s, name = %s, url = %s\n",
@@ -236,9 +246,29 @@ int main(int argc,char *argv[])
 
        }
        else if (!strcasecmp(argv[1],"GetJobAttr")) {
+               struct _jpelem__GetJobAttributes        in;
+               struct _jpelem__GetJobAttributesResponse        out;
                
                if (argc != 4) usage(argv[0]);
-               /* TODO */
+               in.jobid = argv[2];
+               in.__sizeattributes = 1;
+               in.attributes = &argv[3];
+
+               if (!check_fault(soap,soap_call___jpsrv__GetJobAttributes(soap,server,"",&in,&out)))
+               {
+                       int     i;
+
+                       puts("Attribute values:");
+                       for (i=0; i<out.__sizeattrValues; i++)
+                               printf("\t%s\t%s\t%s",
+                                       out.attrValues[i]->value->string ?
+                                               out.attrValues[i]->value->string :
+                                               "binary",
+                                       orig2str(out.attrValues[i]->origin),
+                                       ctime(&out.attrValues[i]->timestamp));
+
+               }
+               
        }
        else usage(argv[0]);
 
index 629f9b6..e0277ec 100644 (file)
@@ -6,8 +6,10 @@
 #include "glite/jp/types.h"
 #include "glite/jp/attr.h"
 
+#include "backend.h"
 #include "attrs.h"
 #include "file_plugin.h"
+#include "builtin_plugins.h"
 
 static struct {
        char    *class,*uri;
@@ -15,6 +17,8 @@ static struct {
        int     nplugins;
 } *known_classes;
 
+static int tags_index;
+
 
 static void scan_classes(glite_jp_context_t ctx)
 {
@@ -29,7 +33,7 @@ static void scan_classes(glite_jp_context_t ctx)
                        for (k=0; known_classes && known_classes[k].class
                                        && strcmp(pd->classes[j],known_classes[k].class);
                                k++);
-                       if (known_classes[k].class) {
+                       if (known_classes && known_classes[k].class) {
                                known_classes[k].plugins = realloc(known_classes[k].plugins,
                                                (known_classes[k].nplugins + 2) * sizeof(glite_jpps_fplug_data_t *));
                                known_classes[k].plugins[known_classes[k].nplugins++] = pd;
@@ -43,6 +47,8 @@ static void scan_classes(glite_jp_context_t ctx)
                                known_classes[k].plugins[0] = pd;
                                known_classes[k].plugins[1] = NULL;
                                known_classes[k].nplugins = 1;
+                               memset(known_classes+k+1,0,sizeof *known_classes);
+                               if (!strcmp(known_classes[k].uri,GLITE_JP_FILETYPE_TAGS)) tags_index = k;
                        }
                }
        }
@@ -72,6 +78,7 @@ glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char const *const *a
        int     nfiles = 0;
 
        nmeta = nother = 0;
+       glite_jp_clear_error(ctx);
 
 /* sort the queried attributes to backend metadata and others -- retrived by plugins 
  * XXX: assumes unique values for metadata.
@@ -91,19 +98,28 @@ glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char const *const *a
        }
 
 /* retrieve the metadata */
-       if (err = glite_jppsbe_get_job_metadata(ctx,job,meta)) goto cleanup;
+       if (meta && (err = glite_jppsbe_get_job_metadata(ctx,job,meta))) goto cleanup;
 
        if (!known_classes) scan_classes(ctx);
 
 /* build a list of available files for this job */
-       for (i=0; known_classes[i].class; i++) {
-               char    **names;
-               glite_jppsbe_get_names(ctx,job,known_classes[i].class,&names);
+       files = malloc(sizeof *files);
+       files->class_idx = tags_index;
+       files->name = NULL;
+       nfiles = 1;
 
-               if (names) for (j=0; names[j]; j++) {
-                       files = realloc(files,(nfiles+1) * sizeof *files);
-                       files[nfiles].class_idx = i;
-                       files[nfiles++].name = names[j];
+       for (i=0; known_classes[i].class; i++) {
+               char    **names = NULL;
+               int     nnames = 
+                       glite_jppsbe_get_names(ctx,job,known_classes[i].class,&names);
+               if (nnames < 0) continue; /* XXX: error ignored */
+
+               if (nnames > 0) {
+                       files = realloc(files,nfiles+nnames+1 * sizeof *files);
+                       for (j=0; names[j]; j++) {
+                               files[nfiles].class_idx = i;
+                               files[nfiles++].name = names[j];
+                       }
                }
                free(names);
        }
@@ -115,10 +131,10 @@ glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char const *const *a
 
                /* XXX: ignore error */
                if (!glite_jppsbe_open_file(ctx,job,
-                       known_classes[ci = files[i].class_idx],
+                       known_classes[ci = files[i].class_idx].class,
                        files[i].name,O_RDONLY,&beh))
                {
-                       for (j=0; known_classes[ci].plugins[j]; j++) {
+                       for (j=0; j<known_classes[ci].nplugins; j++) {
                                void    *ph;
 
                                glite_jpps_fplug_data_t *p = 
@@ -145,8 +161,17 @@ glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char const *const *a
        nout = merge_attrvals(&out,nout,meta);
        free(meta); meta = NULL;
 
-       *attrs_out = out;
-       err = 0;
+       if (nout) {
+               *attrs_out = out;
+               err = 0;
+       }
+       else {
+               glite_jp_error_t        e;
+               e.code = ENOENT;
+               e.source = __FUNCTION__;
+               e.desc = "no attributes found";
+               err = glite_jp_stack_error(ctx,&e);
+       }
 
 cleanup:
        if (meta) for (i=0; i<nmeta; i++) glite_jp_attrval_free(meta+i,0);
index 94501d4..134f805 100644 (file)
@@ -1801,8 +1801,64 @@ int glite_jppsbe_get_names(
        char    ***names_out
 )
 {
-       /* TODO */
-       abort();
+       char    *qry = NULL,*file = NULL,*dot;
+       char    **out = NULL;
+       glite_jp_db_stmt_t      s = NULL;
+       int rows,nout = 0;
+       glite_jp_error_t        err;
+
+       glite_jp_clear_error(ctx);
+       memset(&err,0,sizeof err);
+       err.source = __FUNCTION__;
+
+       trio_asprintf(&qry,"select filename from files "
+                       "where jobid = '%|Ss' and state = 'comitted'",job);
+
+       if ((rows = glite_jp_db_execstmt(ctx,qry,&s)) <= 0) {
+               if (rows == 0) {
+                       err.code = ENOENT;
+                       err.desc = "No files for this job";
+               }
+               else {
+                       err.code = EIO;
+                       err.desc = "DB call fail retrieving job files";
+               }
+               glite_jp_stack_error(ctx,&err);
+               goto cleanup;
+       }
+
+       while ((rows = glite_jp_db_fetchrow(s,&file))) {
+               int     l;
+
+               if (rows < 0) {
+                       err.code = EIO;
+                       err.desc = "DB call fail retrieving job files";
+                       goto cleanup;
+               }
+
+               dot = strchr(file,'.'); /* XXX: can class contain dot? */
+
+               if (dot) *dot = 0;
+               if (!strcmp(file,class)) out[nout++] = dot ? dot+1 : NULL;
+
+               free(file);
+               file = NULL;
+       }
+
+cleanup:
+       if (s) glite_jp_db_freestmt(&s);
+       free(qry);
+       free(file);
+
+       if (ctx->error) {
+               int     i;
+               for (i=0; out && out[i]; i++) free(out[i]);
+               free(out);
+               return -ctx->error->code;
+       }
+
+       *names_out = out;
+       return nout;
 }
 
 
index 4fa112d..2cc3d3d 100644 (file)
@@ -14,6 +14,9 @@
 static int tagappend(void *,void *,int,...);
 static int tagopen(void *,void *,const char *uri,void **);
 static int tagclose(void *,void *);
+static int tagattr(void *,void *,const char *,glite_jp_attrval_t **);
+
+static int tagsread(void *,struct tags_handle *);
 
 #define TAGS_MAGIC 0x74c016f2  /* two middle digits encode version, i.e. 01 */
 
@@ -41,7 +44,7 @@ int init(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data)
 
        data->ops.open = tagopen;
        data->ops.close = tagclose;
-       data->ops.attr = tagdummy;
+       data->ops.attr = tagattr;
        data->ops.generic = tagappend;
        
        printf("tags_plugin: URI: \"%s\"; magic number: 0x%08lx\n",GLITE_JP_FILETYPE_TAGS,TAGS_MAGIC);
@@ -51,7 +54,7 @@ int init(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data)
 static int tagopen(void *fpctx,void *bhandle,const char *uri,void **handle)
 {
        struct tags_handle *h = calloc(1,sizeof *h);
-       h->n = -1;
+       h->n = 0;
        h->bhandle = bhandle;
 
        *handle = h;
@@ -64,10 +67,7 @@ static int tagclose(void *fpctx,void *handle)
        int     i;
        struct tags_handle *h = handle;
 
-       for (i=0; i<h->n; i++) {
-               free(h->tags[i].name);
-               free(h->tags[i].value);
-       }
+       for (i=0; i<h->n; i++) glite_jp_attrval_free(h->tags+i,0);
        free(h->tags);
        free(h);
 
@@ -139,12 +139,160 @@ static int tagappend(void *fpctx,void *handle,int oper,...)
        if (r) memcpy(rec + hlen + strlen(hdr) + 1,tag->value,r);
        free(hdr);
 
+/* record format:
+ * - 4B length, net byte order
+ * - attr name, \0
+ * - %ld %c \0 (timestamp, B/S)
+ * - value
+ */
        if (glite_jppsbe_append(ctx,h->bhandle,rec,rlen + sizeof rlen_n)) {
                err.code = EIO;
                err.desc = "writing tag record";
                free(rec);
                return glite_jp_stack_error(ctx,&err);
        }
+
+       /* XXX: should add tag also to handle->tags, but it is never used 
+        * currently */
+       
+       return 0;
+}
+
+static int tagattr(void *fpctx,void *handle,const char *attr,glite_jp_attrval_t **attrval)
+{
+       struct tags_handle      *h = handle;
+       glite_jp_error_t        err;
+       glite_jp_context_t      ctx = fpctx;
+       glite_jp_attrval_t      *out = NULL;
+       int     i,nout = 0;
+
+       memset(&err,0,sizeof err);
+       err.source = __FUNCTION__;
+
+       if (!h->tags) tagsread(fpctx,handle);
+
+       if (!h->tags) {
+               err.code = ENOENT;
+               err.desc = "no tags for this job";
+               return glite_jp_stack_error(ctx,&err);
+       }
+
+       for (i=0; i<h->n; i++) if (!strcmp(h->tags[i].name,attr)) {
+               out = realloc(out,(nout+2) * sizeof *out);
+               glite_jp_attrval_copy(out+nout,h->tags+i);
+               nout++;
+               memset(out+nout,0,sizeof *out);
+       }
+
+       if (nout) {
+               *attrval = out;
+               return 0;
+       }
+       else {
+               err.code = ENOENT;
+               err.desc = "no value for this tag";
+               return glite_jp_stack_error(ctx,&err);
+       }
+}
+
+static int tagsread(void *fpctx,struct tags_handle *h)
+{
+       glite_jp_context_t      ctx = fpctx;
+       uint32_t                magic,rlen;
+       glite_jp_error_t        err;
+       int                     r;
+       size_t                  off = sizeof rlen;
+       glite_jp_attrval_t      *tp;
+       char                    *rp;
+
+       memset(&err,0,sizeof err);
+       err.source = __FUNCTION__;
        
+       glite_jp_clear_error(ctx);
+
+/* read magic number */
+       if (glite_jppsbe_pread(ctx,h->bhandle,&magic,sizeof magic,0,&r)) {
+               err.code = EIO;
+               err.desc = "reading magic number";
+               return glite_jp_stack_error(ctx,&err);
+       }
+
+       if (r != sizeof magic) {
+               err.code = EIO;
+               err.desc = "can't read magic number";
+               return glite_jp_stack_error(ctx,&err);
+       }
+       else if (magic != htonl(TAGS_MAGIC)) {
+               err.code = EINVAL;
+               err.desc = "invalid magic number";
+               return glite_jp_stack_error(ctx,&err);
+       }
+
+
+       while (1) {
+               char    *rec,type;
+               int     rd;
+
+       /* read record header */
+               if (glite_jppsbe_pread(ctx,h->bhandle,&rlen,sizeof rlen,off,&r)) {
+                       err.code = EIO;
+                       err.desc = "reading record header";
+                       return glite_jp_stack_error(ctx,&err);
+               }
+               if (r == 0) break;
+
+               if (r != sizeof rlen) {
+                       err.code = EIO;
+                       err.desc = "can't read record header";
+                       return glite_jp_stack_error(ctx,&err);
+               }
+
+               off += r;
+               rec = malloc(rlen = ntohl(rlen));
+
+       /* read whole record body thoroughly */
+               for (rd=0; rd<rlen; rd+=r) /* XXX: will loop on 0 bytes read */
+                       if (glite_jppsbe_pread(ctx,h->bhandle,rec+rd,rlen-rd,off+rd,&r)) {
+                               err.code = EIO;
+                               err.desc = "reading record body";
+                               free(rec);
+                               return glite_jp_stack_error(ctx,&err);
+                       }
+
+               off += rlen;
+
+       /* parse the record */
+               h->tags = realloc(h->tags,(h->n+2) * sizeof *h->tags);
+               tp = h->tags+h->n++;
+               memset(tp,0,sizeof *tp);
+
+               tp->name = strdup(rec);
+               rp = rec + strlen(rec) + 1;
+
+               sscanf(rp,"%ld %c",&tp->timestamp,&type);
+               rp += strlen(rp) + 1;
+               switch (type) {
+                       int     i;
+
+                       case 'B': tp->binary = 1; break;
+                       case 'S': tp->binary = 0; break;
+                       default: free(rec);
+                                for (i=0; i<h->n; i++)
+                                        glite_jp_attrval_free(h->tags+i,0);
+                                free(h->tags);
+                                h->tags = NULL;
+                                h->n = 0;
+
+                                err.code = EINVAL;
+                                err.desc = "invalid attr type (B/S)";
+                                return glite_jp_stack_error(ctx,&err);
+               }
+               tp->value = malloc((r=rlen - (rp - rec)) + 1);
+               memcpy(tp->value,rp,r);
+               if (!tp->binary) tp->value[r] = 0;
+               tp->origin = GLITE_JP_ATTR_ORIG_USER;
+
+               free(rec);
+       }
        return 0;
 }