From 3d97095710d2ebdd34228950e89fed287f17bd1b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Tue, 6 Sep 2005 15:02:21 +0000 Subject: [PATCH] partial implementation of batch feed (not complete neither working yet) --- org.glite.jp.primary/src/feed.c | 138 +++++++++++++++++++++++++++++++++++----- org.glite.jp.primary/src/feed.h | 8 ++- 2 files changed, 128 insertions(+), 18 deletions(-) diff --git a/org.glite.jp.primary/src/feed.c b/org.glite.jp.primary/src/feed.c index 1c80a8e..f59be5e 100644 --- a/org.glite.jp.primary/src/feed.c +++ b/org.glite.jp.primary/src/feed.c @@ -275,6 +275,118 @@ static char *generate_feedid(void) return str2md5base64(buf); } +static struct jpfeed *make_jpfeed( + const char *destination, + char const *const *attrs, + const glite_jp_query_rec_t *qry, + char *id, + time_t expires) +{ + int i; + struct jpfeed *f = calloc(1,sizeof *f); + + f->id = id ? strdup(id) : NULL; + f->destination = strdup(destination); + f->expires = expires; + for (i=0; attrs[i]; i++) { + f->attrs = realloc(f->attrs,(i+2) * sizeof *f->attrs); + f->attrs[i] = strdup(attrs[i]); + f->attrs[i+1] = NULL; + } + for (i=0; qry[i].attr; i++) { + f->qry = realloc(f->qry,(i+2) * sizeof *f->qry); + glite_jp_queryrec_copy(f->qry+i,qry+i); + memset(f->qry+i+1,0,sizeof *f->qry); + } + + return f; +} + +void jpfeed_free(struct jpfeed *f) +{ + /* TODO */ + abort(); +} + +static int feed_query_callback( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t attr[], + void *arg) +{ + struct jpfeed *f = arg; + glite_jp_attrval_t *attrs = NULL; + + glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&attrs); +} + +static int run_feed_deferred(glite_jp_context_t ctx,void *feed) +{ + struct jpfeed *f = feed; + int i,m,o,cnt,ret = 0; + + glite_jp_clear_error(ctx); +/* count "meta" attributes */ + cnt = 0; + for (i=0; f->attrs[i]; f++) + if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) cnt++; + + f->meta_attr = cnt ? malloc(cnt * sizeof *f->meta_attr) : NULL; + f->nmeta_attr = cnt; + + f->other_attr = i-cnt ? malloc((i-cnt) * sizeof *f->other_attr) : NULL; + +/* sort attributes to "meta" and others */ + m = o = 0; + for (i=0; f->attrs[i]; i++) + if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) f->meta_attr[m++] = f->attrs[i]; + else f->other_attr[o++] = f->attrs[i]; + + if (f->meta_attr) f->meta_attr[m] = NULL; + if (f->other_attr) f->other_attr[o] = NULL; + +/* the same for query records */ + cnt = 0; + for (i=0; f->qry[i].attr; i++) + if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) cnt++; + + f->meta_qry = malloc(cnt * sizeof *f->meta_qry); + f->nmeta_qry = cnt; + f->other_qry = malloc((i-cnt) * sizeof *f->other_qry); + f->nother_qry = i-cnt; + + m = o = 0; + for (i=0; f->qry[i].attr; i++) + if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) memcpy(f->meta_qry+m++,f->qry+i,sizeof *f->meta_qry); + else memcpy(f->other_qry+o++,f->qry+i,sizeof *f->other_qry); + + memset(f->meta_qry+m,0,sizeof *f->meta_qry); + memset(f->other_qry+m,0,sizeof *f->other_qry); + + free(f->attrs); free(f->qry); + f->attrs = NULL; + f->qry = NULL; + +/* extract other_qry items that are not present in other_attr */ + f->int_other_attr = o; + for (i=0; inother_qry; i++) { + int j; + for (j=0; jint_other_attr && strcmp(f->other_attr[j],f->other_qry[i].attr); j++); + if (j == f->int_other_attr) { + f->other_attr = realloc(f->other_attr,(o+1) * sizeof *f->other_attr); + f->other_attr[o++] = strdup(f->other_qry[i].attr); + } + } + if (f->other_attr) f->other_attr[o] = NULL; + f->nother_attr = o; + + ret = glite_jppsbe_query(ctx,f->meta_qry,f->meta_attr,f,feed_query_callback); + +cleanup: + + jpfeed_free(f); + return ret; +} int glite_jpps_run_feed( glite_jp_context_t ctx, @@ -283,7 +395,14 @@ int glite_jpps_run_feed( const glite_jp_query_rec_t *qry, char **feed_id) { + struct jpfeed *f; + fprintf(stderr,"%s: \n",__FUNCTION__); + if (!*feed_id) *feed_id = generate_feedid(); + + f = make_jpfeed(destination,attrs,qry,*feed_id,(time_t) 0); + glite_jp_add_deferred(ctx,run_feed_deferred,f); + return 0; } @@ -296,6 +415,7 @@ static int register_feed_deferred(glite_jp_context_t ctx,void *feed) return 0; } + /* FIXME: * - volatile implementation: should store the registrations in a file * and recover after restart @@ -309,26 +429,12 @@ int glite_jpps_register_feed( char **feed_id, time_t *expires) { - int i; - struct jpfeed *f = calloc(1,sizeof *f); + struct jpfeed *f; if (!*feed_id) *feed_id = generate_feedid(); time(expires); *expires += FEED_TTL; - f->id = strdup(*feed_id); - f->destination = strdup(destination); - f->expires = *expires; - for (i=0; attrs[i]; i++) { - f->attrs = realloc(f->attrs,(i+2) * sizeof *f->attrs); - f->attrs[i] = strdup(attrs[i]); - f->attrs[i+1] = NULL; - } - for (i=0; qry[i].attr; i++) { - f->qry = realloc(f->qry,(i+2) * sizeof *f->qry); - glite_jp_queryrec_copy(f->qry+i,qry+i); - memset(f->qry+i+1,0,sizeof *f->qry); - } - + f = make_jpfeed(destination,attrs,qry,*feed_id,*expires); glite_jp_add_deferred(ctx,register_feed_deferred,f); return 0; diff --git a/org.glite.jp.primary/src/feed.h b/org.glite.jp.primary/src/feed.h index d141c5b..5348a09 100644 --- a/org.glite.jp.primary/src/feed.h +++ b/org.glite.jp.primary/src/feed.h @@ -5,8 +5,12 @@ struct jpfeed { char *id,*destination; time_t expires; - char **attrs; - glite_jp_query_rec_t *qry; + char **attrs,**meta_attr,**other_attr; + int int_other_attr; /* index from where other_attr is extended + with attributes from other_query */ + + int nother_attr, nmeta_attr, nmeta_qry, nother_qry; + glite_jp_query_rec_t *qry,*meta_qry,*other_qry; struct jpfeed *next; }; -- 1.8.2.3