From 955d791380834f6219fb5560611a7e6a13dfe1e3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20Filipovi=C4=8D?= Date: Thu, 20 Sep 2007 13:35:02 +0000 Subject: [PATCH] Deferred operations RegisterJob, RecordTag and CommitUpload. Optimized using of plugins (in multiargument query each file is parsed only once). --- org.glite.jp.primary/src/attrs.c | 117 +++++++++++++++++++++++++++-------- org.glite.jp.primary/src/feed.c | 73 +++++++++++++++++++--- org.glite.jp.primary/src/is_client.c | 5 ++ 3 files changed, 162 insertions(+), 33 deletions(-) diff --git a/org.glite.jp.primary/src/attrs.c b/org.glite.jp.primary/src/attrs.c index 01fac7f..fbdb150 100644 --- a/org.glite.jp.primary/src/attrs.c +++ b/org.glite.jp.primary/src/attrs.c @@ -18,7 +18,10 @@ static struct { char *namespace; glite_jpps_fplug_data_t **plugins; int nplugins; - + char **opened_classes; // for each plugin contains name of the open class (NULL for no open file) + char **opened_files; // for each plugin contains name of the open file (NULL for no open file) + void **plugin_handles; // contains handle for each opened plugin (NULL for not opened) + void **file_handles; // contains handle for each opened file by plugin (NULL for not opened) } *known_namespaces; static void scan_namespaces(glite_jp_context_t ctx) @@ -43,6 +46,18 @@ static void scan_namespaces(glite_jp_context_t ctx) known_namespaces[k].plugins[known_namespaces[k].nplugins++] = pd; known_namespaces[k].plugins[known_namespaces[k].nplugins] = NULL; known_namespaces[k].namespace = pd->namespaces[j]; + known_namespaces[k].opened_classes = realloc(known_namespaces[k].opened_classes, + (known_namespaces[k].nplugins + 1) * sizeof(char*)); + known_namespaces[k].opened_classes[known_namespaces[k].nplugins-1] = NULL; + known_namespaces[k].opened_files = realloc(known_namespaces[k].opened_files, + (known_namespaces[k].nplugins + 1) * sizeof(char*)); + known_namespaces[k].opened_files[known_namespaces[k].nplugins-1] = NULL; + known_namespaces[k].plugin_handles = realloc(known_namespaces[k].plugin_handles, + (known_namespaces[k].nplugins + 1) * sizeof(void*)); + known_namespaces[k].plugin_handles[known_namespaces[k].nplugins-1] = NULL; + known_namespaces[k].file_handles = realloc(known_namespaces[k].file_handles, + (known_namespaces[k].nplugins + 1) * sizeof(void*)); + known_namespaces[k].file_handles[known_namespaces[k].nplugins-1] = NULL; } else { printf("Adding new namespace %s\n", pd->namespaces[j]); @@ -53,6 +68,10 @@ static void scan_namespaces(glite_jp_context_t ctx) known_namespaces[k].nplugins = 1; known_namespaces[k].namespace = pd->namespaces[j]; memset(known_namespaces+k+1,0,sizeof *known_namespaces); + known_namespaces[k].opened_classes = calloc(1, sizeof(char*)); + known_namespaces[k].opened_files = calloc(1, sizeof(char*)); + known_namespaces[k].plugin_handles = calloc(1, sizeof(void*)); + known_namespaces[k].file_handles = calloc(1, sizeof(void*)); } } } @@ -72,7 +91,7 @@ static int merge_attrvals(glite_jp_attrval_t **out,int nout,const glite_jp_attrv return nout+nin; } -void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t** out, int* nout, const char* attr, const glite_jpps_fplug_data_t* plugin, const char* class, const char* uri){ +void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t** out, int* nout, const char* attr, const glite_jpps_fplug_data_t* plugin, const char* class, const char* uri, const char **opened_class, const char **opened_file, void **op_handle, void **of_handle){ void *ph, *beh; char** names = NULL; int nnames; @@ -81,29 +100,56 @@ void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t** nnames = glite_jppsbe_get_names(ctx, job, class, &names); - for (n = 0; n < nnames; n++) - if (! glite_jppsbe_open_file(ctx,job,class, names[n], O_RDONLY, &beh)) { - if (!plugin->ops.open(plugin->fpctx,beh,uri,&ph)) { - glite_jp_attrval_t* myattr; - // XXX: ignore errors - if (!plugin->ops.attr(plugin->fpctx,ph,attr,&myattr) && myattr) { - int k; - for (k=0; myattr[k].name; k++) { - myattr[k].origin = GLITE_JP_ATTR_ORIG_FILE; - if (!myattr[k].origin_detail) - trio_asprintf(&myattr[k].origin_detail,"%s %s", uri, names[n] ? names[n] : ""); - } - *nout = merge_attrvals(out,*nout,myattr); - free(myattr); - } - keep_err = ctx->error; ctx->error = NULL; - plugin->ops.close(plugin->fpctx, ph); - if (keep_err) { ctx->error = keep_err; keep_err = NULL; } - } + for (n = 0; n < nnames; n++){ + int plugin_ok = 0; + if (*opened_class && !strcmp(*opened_class, class) && ((!*opened_file && !names[n]) || !strcmp(*opened_file, names[n]))){ + ph = *op_handle; + beh = *of_handle; + plugin_ok = 1; + } + else{ + if (*opened_class){ + free(*opened_class); + *opened_class = NULL; + free(*opened_file); + *opened_file = NULL; + plugin->ops.close(plugin->fpctx, *op_handle); + *op_handle = NULL; + glite_jppsbe_close_file(ctx, *of_handle); + *of_handle = NULL; + } + if (! glite_jppsbe_open_file(ctx,job,class, names[n], O_RDONLY, &beh) + && !plugin->ops.open(plugin->fpctx,beh,uri,&ph)){ + plugin_ok = 1; + *opened_class = strdup(class); + if (names[n]) + *opened_file = strdup(names[n]); + else + *opened_file = NULL; + *op_handle = (void*)ph; + *of_handle = (void*)beh; + printf("opening plugin %i at class %s, file %s\n", *op_handle, class, names[n]); + } + } + if (plugin_ok){ + glite_jp_attrval_t* myattr; + // XXX: ignore errors + if (!plugin->ops.attr(plugin->fpctx,ph,attr,&myattr) && myattr) { + int k; + for (k=0; myattr[k].name; k++) { + myattr[k].origin = GLITE_JP_ATTR_ORIG_FILE; + if (!myattr[k].origin_detail) + trio_asprintf(&myattr[k].origin_detail,"%s %s", uri, names[n] ? names[n] : ""); + } + *nout = merge_attrvals(out,*nout,myattr); + free(myattr); + } keep_err = ctx->error; ctx->error = NULL; - glite_jppsbe_close_file(ctx,beh); if (keep_err) { ctx->error = keep_err; keep_err = NULL; } - } + } + keep_err = ctx->error; ctx->error = NULL; + if (keep_err) { ctx->error = keep_err; keep_err = NULL; } + } } int glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char **attr,int nattr,glite_jp_attrval_t **attrs_out) @@ -118,6 +164,7 @@ int glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char **attr,int /* sort the queried attributes to backend metadata and others -- retrived by plugins * XXX: assumes unique values for metadata. */ + for (i=0; iclasses[l]; l++) process_files(ctx, job, &out, &nout, other[i], known_namespaces[j].plugins[k] , known_namespaces[j].plugins[k]->classes[l] - , known_namespaces[j].plugins[k]->uris[l]); + , known_namespaces[j].plugins[k]->uris[l] + , &known_namespaces[j].opened_classes[k] + , &known_namespaces[j].opened_files[k] + , &known_namespaces[j].plugin_handles[k] + , &known_namespaces[j].file_handles[k]); break; } - free(attr_namespace); } + free(attr_namespace); } +/* close plugins */ + for (i = 0; known_namespaces && known_namespaces[i].namespace; i++) + for (j = 0; known_namespaces[i].plugins[j]; j++) + if (known_namespaces[i].opened_classes[j]){ + known_namespaces[i].plugins[j]->ops.close(known_namespaces[i].plugins[j]->fpctx + , known_namespaces[i].plugin_handles[j]); + printf("closing plugin %i at class %s, file %s\n", known_namespaces[i].plugin_handles[j], known_namespaces[i].opened_classes[j], known_namespaces[i].opened_files[j]); + glite_jppsbe_close_file(ctx, known_namespaces[i].file_handles[j]); + known_namespaces[i].opened_classes[j] = NULL; + known_namespaces[i].opened_files[j] = NULL; + known_namespaces[i].plugin_handles[j] = NULL; + known_namespaces[i].file_handles[j] = NULL; + } + nout = merge_attrvals(&out,nout,meta); free(meta); meta = NULL; diff --git a/org.glite.jp.primary/src/feed.c b/org.glite.jp.primary/src/feed.c index 46d27df..be9ab1f 100644 --- a/org.glite.jp.primary/src/feed.c +++ b/org.glite.jp.primary/src/feed.c @@ -207,12 +207,20 @@ cleanup: * kdyby ne, stejne se to nepovede ; * totez pro match_file */ -int glite_jpps_match_attr( +typedef struct{ + char *job; + glite_jp_attrval_t *attrs; +} match_attr; + + +int match_attr_deferred( glite_jp_context_t ctx, - const char *job, - const glite_jp_attrval_t attrs[] + void *ma ) { + char *job = ((match_attr*)ma)->job; + glite_jp_attrval_t *attrs = ((match_attr*)ma)->attrs; + struct jpfeed *f = (struct jpfeed *) ctx->feeds; int i,j,doit; @@ -227,9 +235,30 @@ int glite_jpps_match_attr( if (doit) match_feed(ctx,f,job,attrs); } + free(((match_attr*)ma)->job); + //free(((match_attr*)ma)->attrs); + return glite_jp_clear_error(ctx); } +int glite_jpps_match_attr( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t attrs[] +) +{ + match_attr *ma = malloc(sizeof *ma); + ma->job = strdup(job); + ma->attrs = malloc(sizeof(*ma->attrs)); + ma->attrs[0].name = NULL; + int i; + for (i = 0; attrs[i].name; i++){ + ma->attrs = realloc(ma->attrs, (i+2)*sizeof(*ma->attrs)); + ma->attrs[i+1].name = NULL; + } + glite_jp_add_deferred(ctx, match_attr_deferred, ma); +} + static int attr_void_cmp(const void *a, const void *b) { char const * const *ca = (char const * const *) a; @@ -261,13 +290,21 @@ static void attr_union(char **a, char **b, char ***c) *c = out; } -int glite_jpps_match_file( +typedef struct{ + char *job; + char *class; + char *name; +} match_file; + +int match_file_deferred( glite_jp_context_t ctx, - const char *job, - const char *class, - const char *name + void *mf ) { + char *job = ((match_file*)mf)->job; + char *class = ((match_file*)mf)->class; + char *name = ((match_file*)mf)->name; + glite_jpps_fplug_data_t **pd = NULL; int pi; void *bh = NULL; @@ -372,9 +409,31 @@ int glite_jpps_match_file( for (i=0; meta[i].name; i++) glite_jp_attrval_free(meta+i,0); + free(((match_file*)mf)->job); + free(((match_file*)mf)->class); + free(((match_file*)mf)->name); + return 0; } +int glite_jpps_match_file( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name +) +{ + match_file* mf = malloc(sizeof(*mf)); + mf->job = strdup(job); + mf->class = strdup(class); + if (name) + mf->name = strdup(name); + else + mf->name = NULL; + + glite_jp_add_deferred(ctx, match_file_deferred, mf); +} + static char *generate_feedid(void) { char hname[200],buf[1000]; diff --git a/org.glite.jp.primary/src/is_client.c b/org.glite.jp.primary/src/is_client.c index a128659..ca7553f 100644 --- a/org.glite.jp.primary/src/is_client.c +++ b/org.glite.jp.primary/src/is_client.c @@ -174,6 +174,7 @@ int glite_jpps_single_feed( ) { int retry,ret; + assert(owner); for (retry = 0; retry < MAX_RETRY; retry++) { if ((ret = glite_jpps_single_feed_wrapped(ctx,feed,done,destination,job,owner,attrs)) == 0) break; sleep(RETRY_SLEEP); @@ -221,6 +222,8 @@ static int glite_jpps_multi_feed_wrapped( jr->jobid = jobs[i]; jr->owner = owners[i]; + assert(jr->owner); + jr->__sizeattributes = jp2s_attrValues(ctx->other_soap, attrs[i], &jr->attributes,0); @@ -230,6 +233,7 @@ static int glite_jpps_multi_feed_wrapped( jr->primaryStorage = &ctx->myURL; } + //#ifndef JP_PERF SWITCH_SOAP_CTX check_fault(ctx,ctx->other_soap, soap_call___jpsrv__UpdateJobs(ctx->other_soap,destination,"", &in,&out)); @@ -239,6 +243,7 @@ static int glite_jpps_multi_feed_wrapped( attrValues_free(ctx->other_soap,jr->attributes,jr->__sizeattributes); } GLITE_SECURITY_GSOAP_LIST_DESTROY(ctx->other_soap, &in, jobAttributes); + //#endif return err.code; } -- 1.8.2.3