batch feed implementation
authorAleš Křenek <ljocha@ics.muni.cz>
Wed, 7 Sep 2005 08:45:52 +0000 (08:45 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Wed, 7 Sep 2005 08:45:52 +0000 (08:45 +0000)
- query and filtering
- attribute assembly
- actual feed NOT done yet

org.glite.jp.primary/examples/README.test
org.glite.jp.primary/src/feed.c
org.glite.jp.primary/src/feed.h

index 6bfe1d8..cef2692 100644 (file)
@@ -34,6 +34,12 @@ Returns:
 - Operation not permitted (you are not a trusted peer)
 
 
+Check system attributes known from job registration:
+
+$ ./jpps-test GetJobAttr JOBID http://egee.cesnet.cz/en/WSDL/jp-system:owner
+$ ./jpps-test GetJobAttr JOBID http://egee.cesnet.cz/en/WSDL/jp-system:regtime
+
+
 Record JP tag:
 --------------
 
index 3797c48..8283ae8 100644 (file)
@@ -19,6 +19,9 @@
  */
 #define FEED_TTL       120
 
+/* XXX: configurable */
+#define BATCH_FEED_SIZE        200
+
 static int check_qry_item(
                glite_jp_context_t ctx,
                const glite_jp_query_rec_t    *qry,
@@ -136,6 +139,10 @@ static int match_feed(
        return 0;
 }
 
+/* TODO: overit, ze do dalsich atributu se leze az kdyz matchuji metadata
+ * kdyby ne, stejne se to nepovede ;
+ * totez pro match_file */
+
 int glite_jpps_match_attr(
                glite_jp_context_t ctx,
                const char *job,
@@ -318,18 +325,85 @@ void jpfeed_free(struct jpfeed *f)
        abort();
 }
 
+static int drain_feed(glite_jp_context_t ctx, struct jpfeed *f)
+{
+       /* TODO */ abort();
+}
+
 static int feed_query_callback(
                glite_jp_context_t ctx,
                const char *job,
-               const glite_jp_attrval_t attr[],
+               const glite_jp_attrval_t meta[],
                void *arg)
 {
+       int     i,j,nout = 0;
+       glite_jp_error_t        err;
        struct jpfeed   *f = arg;
-       glite_jp_attrval_t      *attrs = NULL;
+       glite_jp_attrval_t      *other = NULL,*out = NULL;
+
+       memset(&err,0,sizeof err);
+       err.source = __FUNCTION__;
+       glite_jp_clear_error(ctx);
+
+/* retrieve other attributes */
+       if (glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&other)) {
+               err.code = EIO;
+               err.desc = "retrieve job attributes";
+               glite_jp_stack_error(ctx,&err);
+               goto cleanup;
+       }
+
+/* no attributes known -- can't match */
+       if (!other) goto cleanup;
+
+/* filter on non-meta query items */
+       for (i=0; i<f->nother_qry; i++) {
+               for (j=0; other[j].name; j++) 
+                       if (check_qry_item(ctx,f->other_qry+i,other+j)) break;
+               if (!other[j].name) goto cleanup; /* no match is not an error */
+       }
+
+/* extract attributes to be fed, stack the job for a batch feed */
+       for (i=0; meta && meta[i].name; i++)
+               for (j=0; j<f->nmeta_attr; j++) 
+                       if (!strcmp(meta[i].name,f->meta_attr[j])) {
+                               out = realloc(out,(nout+2) * sizeof(out));
+                               glite_jp_attrval_copy(out+nout,meta+i);
+                               nout++;
+                       }
+
+       for (i=0; other[i].name; i++) 
+               for (j=0; j<f->int_other_attr; j++)
+                       if (!strcmp(other[i].name,f->other_attr[j])) {
+                               out = realloc(out,(nout+2) * sizeof(out));
+                               glite_jp_attrval_copy(out+nout,other+i);
+                               nout++;
+                       }
+
+       if (nout) {
+               f->jobs = realloc(f->jobs,(f->njobs+1)*sizeof *f->jobs);
+               f->jobs[f->njobs] = strdup(job);
+               f->job_attrs = realloc(f->job_attrs,(f->njobs+1)*sizeof *f->job_attrs);
+               f->job_attrs[f->njobs] = out;
+               out = NULL;
+               f->njobs++;
+       }
+
+/* run the feed eventually */
+       if (f->njobs >= BATCH_FEED_SIZE && drain_feed(ctx,f)) {
+               err.code = EIO;
+               err.desc = "sending batch feed";
+               glite_jp_stack_error(ctx,&err);
+       }
 
-       glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&attrs);
+cleanup:
+       for (i=0; other && other[i].name; i++) glite_jp_attrval_free(other+i,0);
+       free(other);
+
+       return err.code;
 }
 
+
 static int run_feed_deferred(glite_jp_context_t ctx,void *feed)
 {
        struct jpfeed   *f = feed;
@@ -391,6 +465,7 @@ static int run_feed_deferred(glite_jp_context_t ctx,void *feed)
        f->nother_attr = o;
 
        ret = glite_jppsbe_query(ctx,f->meta_qry,f->meta_attr,f,feed_query_callback);
+       if (!ret) ret = drain_feed(ctx,f);
 
 cleanup:
 
index 5348a09..a9204ef 100644 (file)
@@ -3,14 +3,24 @@
 
 
 struct jpfeed {
+/* feed data */
        char    *id,*destination;
        time_t  expires;
+
+/* complete and split query and attribute list */
        char    **attrs,**meta_attr,**other_attr;
        int     int_other_attr; /* index from where other_attr is extended
                                  with attributes from other_query */ 
 
        int     nother_attr, nmeta_attr, nmeta_qry, nother_qry;
        glite_jp_query_rec_t    *qry,*meta_qry,*other_qry;
+
+/* jobs stacked for feed */
+       int     njobs;
+       char    **jobs;
+       glite_jp_attrval_t      **job_attrs;
+
+/* next feed */
        struct jpfeed   *next;
 };