partial implementation of batch feed (not complete neither working yet)
authorAleš Křenek <ljocha@ics.muni.cz>
Tue, 6 Sep 2005 15:02:21 +0000 (15:02 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Tue, 6 Sep 2005 15:02:21 +0000 (15:02 +0000)
org.glite.jp.primary/src/feed.c
org.glite.jp.primary/src/feed.h

index 1c80a8e..f59be5e 100644 (file)
@@ -275,6 +275,118 @@ static char *generate_feedid(void)
        return str2md5base64(buf);
 }
 
+static struct jpfeed *make_jpfeed(
+       const char *destination,
+       char const *const *attrs,
+       const glite_jp_query_rec_t *qry,
+       char *id,
+       time_t expires)
+{
+       int     i;
+       struct jpfeed   *f = calloc(1,sizeof *f);
+
+       f->id = id ? strdup(id) : NULL;
+       f->destination = strdup(destination);
+       f->expires = expires;
+       for (i=0; attrs[i]; i++) {
+               f->attrs = realloc(f->attrs,(i+2) * sizeof *f->attrs);
+               f->attrs[i] = strdup(attrs[i]);
+               f->attrs[i+1] = NULL;
+       }
+       for (i=0; qry[i].attr; i++) {
+               f->qry = realloc(f->qry,(i+2) * sizeof *f->qry);
+               glite_jp_queryrec_copy(f->qry+i,qry+i);
+               memset(f->qry+i+1,0,sizeof *f->qry);
+       }
+
+       return f;
+}
+
+void jpfeed_free(struct jpfeed *f)
+{
+       /* TODO */
+       abort();
+}
+
+static int feed_query_callback(
+               glite_jp_context_t ctx,
+               const char *job,
+               const glite_jp_attrval_t attr[],
+               void *arg)
+{
+       struct jpfeed   *f = arg;
+       glite_jp_attrval_t      *attrs = NULL;
+
+       glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&attrs);
+}
+
+static int run_feed_deferred(glite_jp_context_t ctx,void *feed)
+{
+       struct jpfeed   *f = feed;
+       int     i,m,o,cnt,ret = 0;
+
+       glite_jp_clear_error(ctx);
+/* count "meta" attributes */
+       cnt = 0;
+       for (i=0; f->attrs[i]; f++)
+               if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) cnt++;
+
+       f->meta_attr = cnt ? malloc(cnt * sizeof *f->meta_attr) : NULL;
+       f->nmeta_attr = cnt;
+
+       f->other_attr = i-cnt ? malloc((i-cnt) * sizeof *f->other_attr) : NULL;
+
+/* sort attributes to "meta" and others */
+       m = o = 0;
+       for (i=0; f->attrs[i]; i++)
+               if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) f->meta_attr[m++] = f->attrs[i];
+               else f->other_attr[o++] = f->attrs[i];
+
+       if (f->meta_attr) f->meta_attr[m] = NULL;
+       if (f->other_attr) f->other_attr[o] = NULL;
+
+/* the same for query records */
+       cnt = 0;
+       for (i=0; f->qry[i].attr; i++)
+               if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) cnt++;
+
+       f->meta_qry = malloc(cnt * sizeof *f->meta_qry);
+       f->nmeta_qry = cnt;
+       f->other_qry = malloc((i-cnt) * sizeof *f->other_qry);
+       f->nother_qry = i-cnt;
+
+       m = o = 0;
+       for (i=0; f->qry[i].attr; i++) 
+               if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) memcpy(f->meta_qry+m++,f->qry+i,sizeof *f->meta_qry);
+               else memcpy(f->other_qry+o++,f->qry+i,sizeof *f->other_qry);
+
+       memset(f->meta_qry+m,0,sizeof *f->meta_qry);
+       memset(f->other_qry+m,0,sizeof *f->other_qry);
+
+       free(f->attrs); free(f->qry);
+       f->attrs = NULL;
+       f->qry = NULL;
+
+/* extract other_qry items that are not present in other_attr */
+       f->int_other_attr = o;
+       for (i=0; i<f->nother_qry; i++) {
+               int     j;
+               for (j=0; j<f->int_other_attr && strcmp(f->other_attr[j],f->other_qry[i].attr); j++);
+               if (j == f->int_other_attr) {
+                       f->other_attr = realloc(f->other_attr,(o+1) * sizeof *f->other_attr);
+                       f->other_attr[o++] = strdup(f->other_qry[i].attr);
+               }
+       }
+       if (f->other_attr) f->other_attr[o] = NULL;
+       f->nother_attr = o;
+
+       ret = glite_jppsbe_query(ctx,f->meta_qry,f->meta_attr,f,feed_query_callback);
+
+cleanup:
+
+       jpfeed_free(f);
+       return ret;
+}
 
 int glite_jpps_run_feed(
        glite_jp_context_t ctx,
@@ -283,7 +395,14 @@ int glite_jpps_run_feed(
        const glite_jp_query_rec_t *qry,
        char **feed_id)
 {
+       struct jpfeed   *f;
+
        fprintf(stderr,"%s: \n",__FUNCTION__);
+       if (!*feed_id) *feed_id = generate_feedid();
+
+               f = make_jpfeed(destination,attrs,qry,*feed_id,(time_t) 0);
+       glite_jp_add_deferred(ctx,run_feed_deferred,f);
+
        return 0;
 }
 
@@ -296,6 +415,7 @@ static int register_feed_deferred(glite_jp_context_t ctx,void *feed)
        return 0;
 }
 
+
 /* FIXME:
  * - volatile implementation: should store the registrations in a file
  *   and recover after restart
@@ -309,26 +429,12 @@ int glite_jpps_register_feed(
        char **feed_id,
        time_t *expires)
 {
-       int     i;
-       struct jpfeed   *f = calloc(1,sizeof *f);
+       struct jpfeed   *f;
 
        if (!*feed_id) *feed_id = generate_feedid();
        time(expires); *expires += FEED_TTL;
 
-       f->id = strdup(*feed_id);
-       f->destination = strdup(destination);
-       f->expires = *expires;
-       for (i=0; attrs[i]; i++) {
-               f->attrs = realloc(f->attrs,(i+2) * sizeof *f->attrs);
-               f->attrs[i] = strdup(attrs[i]);
-               f->attrs[i+1] = NULL;
-       }
-       for (i=0; qry[i].attr; i++) {
-               f->qry = realloc(f->qry,(i+2) * sizeof *f->qry);
-               glite_jp_queryrec_copy(f->qry+i,qry+i);
-               memset(f->qry+i+1,0,sizeof *f->qry);
-       }
-
+               f = make_jpfeed(destination,attrs,qry,*feed_id,*expires);
        glite_jp_add_deferred(ctx,register_feed_deferred,f);
 
        return 0;
index d141c5b..5348a09 100644 (file)
@@ -5,8 +5,12 @@
 struct jpfeed {
        char    *id,*destination;
        time_t  expires;
-       char    **attrs;
-       glite_jp_query_rec_t    *qry;
+       char    **attrs,**meta_attr,**other_attr;
+       int     int_other_attr; /* index from where other_attr is extended
+                                 with attributes from other_query */ 
+
+       int     nother_attr, nmeta_attr, nmeta_qry, nother_qry;
+       glite_jp_query_rec_t    *qry,*meta_qry,*other_qry;
        struct jpfeed   *next;
 };