From 4da2726da8a59e29fdb369d92459ec49a3f70346 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Wed, 7 Sep 2005 08:45:52 +0000 Subject: [PATCH] batch feed implementation - query and filtering - attribute assembly - actual feed NOT done yet --- org.glite.jp.primary/examples/README.test | 6 +++ org.glite.jp.primary/src/feed.c | 81 +++++++++++++++++++++++++++++-- org.glite.jp.primary/src/feed.h | 10 ++++ 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/org.glite.jp.primary/examples/README.test b/org.glite.jp.primary/examples/README.test index 6bfe1d8..cef2692 100644 --- a/org.glite.jp.primary/examples/README.test +++ b/org.glite.jp.primary/examples/README.test @@ -34,6 +34,12 @@ Returns: - Operation not permitted (you are not a trusted peer) +Check system attributes known from job registration: + +$ ./jpps-test GetJobAttr JOBID http://egee.cesnet.cz/en/WSDL/jp-system:owner +$ ./jpps-test GetJobAttr JOBID http://egee.cesnet.cz/en/WSDL/jp-system:regtime + + Record JP tag: -------------- diff --git a/org.glite.jp.primary/src/feed.c b/org.glite.jp.primary/src/feed.c index 3797c48..8283ae8 100644 --- a/org.glite.jp.primary/src/feed.c +++ b/org.glite.jp.primary/src/feed.c @@ -19,6 +19,9 @@ */ #define FEED_TTL 120 +/* XXX: configurable */ +#define BATCH_FEED_SIZE 200 + static int check_qry_item( glite_jp_context_t ctx, const glite_jp_query_rec_t *qry, @@ -136,6 +139,10 @@ static int match_feed( return 0; } +/* TODO: overit, ze do dalsich atributu se leze az kdyz matchuji metadata + * kdyby ne, stejne se to nepovede ; + * totez pro match_file */ + int glite_jpps_match_attr( glite_jp_context_t ctx, const char *job, @@ -318,18 +325,85 @@ void jpfeed_free(struct jpfeed *f) abort(); } +static int drain_feed(glite_jp_context_t ctx, struct jpfeed *f) +{ + /* TODO */ abort(); +} + static int feed_query_callback( glite_jp_context_t ctx, const char *job, - const glite_jp_attrval_t attr[], + const glite_jp_attrval_t meta[], void *arg) { + int i,j,nout = 0; + glite_jp_error_t err; struct jpfeed *f = arg; - glite_jp_attrval_t *attrs = NULL; + glite_jp_attrval_t *other = NULL,*out = NULL; + + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + glite_jp_clear_error(ctx); + +/* retrieve other attributes */ + if (glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&other)) { + err.code = EIO; + err.desc = "retrieve job attributes"; + glite_jp_stack_error(ctx,&err); + goto cleanup; + } + +/* no attributes known -- can't match */ + if (!other) goto cleanup; + +/* filter on non-meta query items */ + for (i=0; inother_qry; i++) { + for (j=0; other[j].name; j++) + if (check_qry_item(ctx,f->other_qry+i,other+j)) break; + if (!other[j].name) goto cleanup; /* no match is not an error */ + } + +/* extract attributes to be fed, stack the job for a batch feed */ + for (i=0; meta && meta[i].name; i++) + for (j=0; jnmeta_attr; j++) + if (!strcmp(meta[i].name,f->meta_attr[j])) { + out = realloc(out,(nout+2) * sizeof(out)); + glite_jp_attrval_copy(out+nout,meta+i); + nout++; + } + + for (i=0; other[i].name; i++) + for (j=0; jint_other_attr; j++) + if (!strcmp(other[i].name,f->other_attr[j])) { + out = realloc(out,(nout+2) * sizeof(out)); + glite_jp_attrval_copy(out+nout,other+i); + nout++; + } + + if (nout) { + f->jobs = realloc(f->jobs,(f->njobs+1)*sizeof *f->jobs); + f->jobs[f->njobs] = strdup(job); + f->job_attrs = realloc(f->job_attrs,(f->njobs+1)*sizeof *f->job_attrs); + f->job_attrs[f->njobs] = out; + out = NULL; + f->njobs++; + } + +/* run the feed eventually */ + if (f->njobs >= BATCH_FEED_SIZE && drain_feed(ctx,f)) { + err.code = EIO; + err.desc = "sending batch feed"; + glite_jp_stack_error(ctx,&err); + } - glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&attrs); +cleanup: + for (i=0; other && other[i].name; i++) glite_jp_attrval_free(other+i,0); + free(other); + + return err.code; } + static int run_feed_deferred(glite_jp_context_t ctx,void *feed) { struct jpfeed *f = feed; @@ -391,6 +465,7 @@ static int run_feed_deferred(glite_jp_context_t ctx,void *feed) f->nother_attr = o; ret = glite_jppsbe_query(ctx,f->meta_qry,f->meta_attr,f,feed_query_callback); + if (!ret) ret = drain_feed(ctx,f); cleanup: diff --git a/org.glite.jp.primary/src/feed.h b/org.glite.jp.primary/src/feed.h index 5348a09..a9204ef 100644 --- a/org.glite.jp.primary/src/feed.h +++ b/org.glite.jp.primary/src/feed.h @@ -3,14 +3,24 @@ struct jpfeed { +/* feed data */ char *id,*destination; time_t expires; + +/* complete and split query and attribute list */ char **attrs,**meta_attr,**other_attr; int int_other_attr; /* index from where other_attr is extended with attributes from other_query */ int nother_attr, nmeta_attr, nmeta_qry, nother_qry; glite_jp_query_rec_t *qry,*meta_qry,*other_qry; + +/* jobs stacked for feed */ + int njobs; + char **jobs; + glite_jp_attrval_t **job_attrs; + +/* next feed */ struct jpfeed *next; }; -- 1.8.2.3