Deferred operations RegisterJob, RecordTag and CommitUpload.
authorJiří Filipovič <fila@ics.muni.cz>
Thu, 20 Sep 2007 13:35:02 +0000 (13:35 +0000)
committerJiří Filipovič <fila@ics.muni.cz>
Thu, 20 Sep 2007 13:35:02 +0000 (13:35 +0000)
Optimized using of plugins (in multiargument query each file is parsed only once).

org.glite.jp.primary/src/attrs.c
org.glite.jp.primary/src/feed.c
org.glite.jp.primary/src/is_client.c

index 01fac7f..fbdb150 100644 (file)
@@ -18,7 +18,10 @@ static struct {
        char *namespace;
        glite_jpps_fplug_data_t **plugins;
         int     nplugins;
-
+       char **opened_classes;          // for each plugin contains name of the open class (NULL for no open file)
+       char **opened_files;            // for each plugin contains name of the open file (NULL for no open file)
+       void **plugin_handles;          // contains handle for each opened plugin (NULL for not opened)
+       void **file_handles;            // contains handle for each opened file by plugin (NULL for not opened)
 } *known_namespaces;
 
 static void scan_namespaces(glite_jp_context_t ctx)
@@ -43,6 +46,18 @@ static void scan_namespaces(glite_jp_context_t ctx)
                                        known_namespaces[k].plugins[known_namespaces[k].nplugins++] = pd;
                                        known_namespaces[k].plugins[known_namespaces[k].nplugins] = NULL;
                                        known_namespaces[k].namespace = pd->namespaces[j];
+                                       known_namespaces[k].opened_classes = realloc(known_namespaces[k].opened_classes,
+                                               (known_namespaces[k].nplugins + 1) * sizeof(char*));
+                                       known_namespaces[k].opened_classes[known_namespaces[k].nplugins-1] = NULL;
+                                       known_namespaces[k].opened_files = realloc(known_namespaces[k].opened_files,
+                                                (known_namespaces[k].nplugins + 1) * sizeof(char*));
+                                       known_namespaces[k].opened_files[known_namespaces[k].nplugins-1] = NULL;
+                                       known_namespaces[k].plugin_handles = realloc(known_namespaces[k].plugin_handles,
+                                               (known_namespaces[k].nplugins + 1) * sizeof(void*));
+                                       known_namespaces[k].plugin_handles[known_namespaces[k].nplugins-1] = NULL;
+                                       known_namespaces[k].file_handles = realloc(known_namespaces[k].file_handles,
+                                                (known_namespaces[k].nplugins + 1) * sizeof(void*));
+                                        known_namespaces[k].file_handles[known_namespaces[k].nplugins-1] = NULL;
                                }
                                else {
                                        printf("Adding new namespace %s\n", pd->namespaces[j]);
@@ -53,6 +68,10 @@ static void scan_namespaces(glite_jp_context_t ctx)
                                        known_namespaces[k].nplugins = 1;
                                        known_namespaces[k].namespace = pd->namespaces[j];
                                        memset(known_namespaces+k+1,0,sizeof *known_namespaces);
+                                       known_namespaces[k].opened_classes = calloc(1, sizeof(char*));
+                                       known_namespaces[k].opened_files = calloc(1, sizeof(char*));
+                                       known_namespaces[k].plugin_handles = calloc(1, sizeof(void*));
+                                       known_namespaces[k].file_handles = calloc(1, sizeof(void*));
                                        }
                        }
                }
@@ -72,7 +91,7 @@ static int merge_attrvals(glite_jp_attrval_t **out,int nout,const glite_jp_attrv
        return nout+nin;
 }
 
-void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t** out, int* nout, const char* attr, const glite_jpps_fplug_data_t* plugin, const char* class, const char* uri){
+void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t** out, int* nout, const char* attr, const glite_jpps_fplug_data_t* plugin, const char* class, const char* uri, const char **opened_class, const char **opened_file, void **op_handle, void **of_handle){
        void *ph, *beh; 
        char** names = NULL;
         int nnames;
@@ -81,29 +100,56 @@ void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t**
 
        nnames = glite_jppsbe_get_names(ctx, job, class, &names);
 
-        for (n = 0; n < nnames; n++)
-               if (! glite_jppsbe_open_file(ctx,job,class, names[n], O_RDONLY, &beh)) {
-                       if (!plugin->ops.open(plugin->fpctx,beh,uri,&ph)) {
-                               glite_jp_attrval_t* myattr;
-                               // XXX: ignore errors
-                               if (!plugin->ops.attr(plugin->fpctx,ph,attr,&myattr) && myattr) {
-                                       int k;
-                                       for (k=0; myattr[k].name; k++) {
-                                               myattr[k].origin = GLITE_JP_ATTR_ORIG_FILE;
-                                               if (!myattr[k].origin_detail) 
-                                                       trio_asprintf(&myattr[k].origin_detail,"%s %s", uri, names[n] ? names[n] : "");
-                                       }
-                                       *nout = merge_attrvals(out,*nout,myattr);
-                                       free(myattr);
-                               }
-                               keep_err = ctx->error; ctx->error = NULL;
-                               plugin->ops.close(plugin->fpctx, ph);
-                               if (keep_err) { ctx->error = keep_err; keep_err = NULL; }
-                               }
+        for (n = 0; n < nnames; n++){
+               int plugin_ok = 0;
+               if (*opened_class && !strcmp(*opened_class, class) && ((!*opened_file && !names[n]) || !strcmp(*opened_file, names[n]))){
+                       ph = *op_handle;
+                       beh = *of_handle;
+                       plugin_ok = 1;
+               }
+               else{
+                       if (*opened_class){
+                               free(*opened_class);
+                               *opened_class = NULL;
+                               free(*opened_file);
+                                *opened_file = NULL;
+                               plugin->ops.close(plugin->fpctx, *op_handle);
+                               *op_handle = NULL;
+                               glite_jppsbe_close_file(ctx, *of_handle);
+                               *of_handle = NULL;
+                       }
+                       if (! glite_jppsbe_open_file(ctx,job,class, names[n], O_RDONLY, &beh) 
+                       && !plugin->ops.open(plugin->fpctx,beh,uri,&ph)){
+                               plugin_ok = 1;
+                               *opened_class = strdup(class);
+                               if (names[n])
+                                       *opened_file = strdup(names[n]);
+                               else
+                                       *opened_file = NULL;
+                               *op_handle = (void*)ph;
+                               *of_handle = (void*)beh;
+                               printf("opening plugin %i at class %s, file %s\n", *op_handle, class, names[n]);
+                       }
+               }
+               if (plugin_ok){
+                       glite_jp_attrval_t* myattr;
+                       // XXX: ignore errors
+                       if (!plugin->ops.attr(plugin->fpctx,ph,attr,&myattr) && myattr) {
+                               int k;
+                               for (k=0; myattr[k].name; k++) {
+                                       myattr[k].origin = GLITE_JP_ATTR_ORIG_FILE;
+                                       if (!myattr[k].origin_detail) 
+                                               trio_asprintf(&myattr[k].origin_detail,"%s %s", uri, names[n] ? names[n] : "");
+                               }
+                               *nout = merge_attrvals(out,*nout,myattr);
+                               free(myattr);
+                       }
                        keep_err = ctx->error; ctx->error = NULL;
-                       glite_jppsbe_close_file(ctx,beh);
                        if (keep_err) { ctx->error = keep_err; keep_err = NULL; }
-               }
+                       }
+               keep_err = ctx->error; ctx->error = NULL;
+               if (keep_err) { ctx->error = keep_err; keep_err = NULL; }
+       }
 }
 
 int glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char **attr,int nattr,glite_jp_attrval_t **attrs_out)
@@ -118,6 +164,7 @@ int glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char **attr,int
 /* sort the queried attributes to backend metadata and others -- retrived by plugins 
  * XXX: assumes unique values for metadata.
  */
+
        for (i=0; i<nattr; i++) {
                if (glite_jppsbe_is_metadata(ctx,attr[i])) {
                        meta = realloc(meta,(nmeta+2) * sizeof *meta);
@@ -144,20 +191,38 @@ int glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char **attr,int
                        nout = merge_attrvals(&out, nout, tag_out);
                        free(tag_out); tag_out = NULL;
                }
+               char* attr_namespace = glite_jpps_get_namespace(other[i]);
                for (j = 0; known_namespaces && known_namespaces[j].namespace; j++) {
-                       char* attr_namespace = glite_jpps_get_namespace(other[i]);
                        if (strcmp(attr_namespace, known_namespaces[j].namespace) == 0){
                                for (k = 0; known_namespaces[j].plugins[k]; k++)
                                        for (l = 0; known_namespaces[j].plugins[k]->classes[l]; l++)
                                                process_files(ctx, job, &out, &nout, other[i], known_namespaces[j].plugins[k]
                                                        , known_namespaces[j].plugins[k]->classes[l]
-                                                       , known_namespaces[j].plugins[k]->uris[l]);
+                                                       , known_namespaces[j].plugins[k]->uris[l]
+                                                       , &known_namespaces[j].opened_classes[k]
+                                                       , &known_namespaces[j].opened_files[k]
+                                                       , &known_namespaces[j].plugin_handles[k]
+                                                       , &known_namespaces[j].file_handles[k]);
                                break;
                        }
-                       free(attr_namespace);
                }
+               free(attr_namespace);
        }
 
+/* close plugins */
+       for (i = 0; known_namespaces && known_namespaces[i].namespace; i++)
+                for (j = 0; known_namespaces[i].plugins[j]; j++)
+                       if (known_namespaces[i].opened_classes[j]){
+                               known_namespaces[i].plugins[j]->ops.close(known_namespaces[i].plugins[j]->fpctx
+                                       , known_namespaces[i].plugin_handles[j]);
+                               printf("closing plugin %i at class %s, file %s\n", known_namespaces[i].plugin_handles[j], known_namespaces[i].opened_classes[j], known_namespaces[i].opened_files[j]);
+                               glite_jppsbe_close_file(ctx, known_namespaces[i].file_handles[j]);
+                               known_namespaces[i].opened_classes[j] = NULL;
+                               known_namespaces[i].opened_files[j] = NULL;
+                               known_namespaces[i].plugin_handles[j] = NULL;
+                               known_namespaces[i].file_handles[j] = NULL;
+                       }
+
        nout = merge_attrvals(&out,nout,meta);
 
        free(meta); meta = NULL;
index 46d27df..be9ab1f 100644 (file)
@@ -207,12 +207,20 @@ cleanup:
  * kdyby ne, stejne se to nepovede ;
  * totez pro match_file */
 
-int glite_jpps_match_attr(
+typedef struct{
+       char *job;
+       glite_jp_attrval_t *attrs;
+} match_attr;
+
+
+int match_attr_deferred(
                glite_jp_context_t ctx,
-               const char *job,
-               const glite_jp_attrval_t attrs[]
+               void *ma
 )
 {
+       char *job = ((match_attr*)ma)->job;
+       glite_jp_attrval_t *attrs = ((match_attr*)ma)->attrs;
+
        struct jpfeed   *f = (struct jpfeed *) ctx->feeds;
        int     i,j,doit;
 
@@ -227,9 +235,30 @@ int glite_jpps_match_attr(
                if (doit) match_feed(ctx,f,job,attrs);
        }
 
+       free(((match_attr*)ma)->job);
+       //free(((match_attr*)ma)->attrs);
+
        return glite_jp_clear_error(ctx);
 }
 
+int glite_jpps_match_attr(
+                glite_jp_context_t ctx,
+                const char *job,
+                const glite_jp_attrval_t attrs[]
+)
+{
+       match_attr *ma = malloc(sizeof *ma);
+       ma->job = strdup(job);
+       ma->attrs = malloc(sizeof(*ma->attrs));
+       ma->attrs[0].name = NULL;
+       int i;
+       for (i = 0; attrs[i].name; i++){
+               ma->attrs = realloc(ma->attrs, (i+2)*sizeof(*ma->attrs));
+               ma->attrs[i+1].name = NULL;
+       }
+       glite_jp_add_deferred(ctx, match_attr_deferred, ma);
+}
+
 static int attr_void_cmp(const void *a, const void *b)
 {
        char const * const *ca = (char const * const *) a;
@@ -261,13 +290,21 @@ static void attr_union(char **a, char **b, char ***c)
        *c = out;
 }
 
-int glite_jpps_match_file(
+typedef struct{
+       char *job;
+       char *class;
+       char *name;
+} match_file;
+
+int match_file_deferred(
        glite_jp_context_t ctx,
-       const char *job,
-       const char *class,
-       const char *name
+       void *mf
 )
 {
+       char *job = ((match_file*)mf)->job;
+       char *class = ((match_file*)mf)->class;
+       char *name = ((match_file*)mf)->name;
+
        glite_jpps_fplug_data_t **pd = NULL;
        int     pi;
        void    *bh = NULL;
@@ -372,9 +409,31 @@ int glite_jpps_match_file(
 
        for (i=0; meta[i].name; i++) glite_jp_attrval_free(meta+i,0);
 
+       free(((match_file*)mf)->job);
+       free(((match_file*)mf)->class);
+       free(((match_file*)mf)->name);
+
        return 0;
 }
 
+int glite_jpps_match_file(
+        glite_jp_context_t ctx,
+        const char *job,
+        const char *class,
+        const char *name
+)
+{
+       match_file* mf = malloc(sizeof(*mf));
+       mf->job = strdup(job);
+       mf->class = strdup(class);
+       if (name)
+               mf->name = strdup(name);
+       else
+               mf->name = NULL;
+
+       glite_jp_add_deferred(ctx, match_file_deferred, mf);
+}
+
 static char *generate_feedid(void)
 {
        char    hname[200],buf[1000];
index a128659..ca7553f 100644 (file)
@@ -174,6 +174,7 @@ int glite_jpps_single_feed(
 )
 {
        int     retry,ret;
+       assert(owner);
        for (retry = 0; retry < MAX_RETRY; retry++) {
                if ((ret = glite_jpps_single_feed_wrapped(ctx,feed,done,destination,job,owner,attrs)) == 0) break;
                sleep(RETRY_SLEEP);
@@ -221,6 +222,8 @@ static int glite_jpps_multi_feed_wrapped(
                jr->jobid = jobs[i];
                jr->owner = owners[i];
 
+               assert(jr->owner);
+
                jr->__sizeattributes = jp2s_attrValues(ctx->other_soap,
                        attrs[i],
                        &jr->attributes,0);
@@ -230,6 +233,7 @@ static int glite_jpps_multi_feed_wrapped(
                jr->primaryStorage = &ctx->myURL;
        }
 
+       //#ifndef JP_PERF
        SWITCH_SOAP_CTX
        check_fault(ctx,ctx->other_soap,
                soap_call___jpsrv__UpdateJobs(ctx->other_soap,destination,"", &in,&out));
@@ -239,6 +243,7 @@ static int glite_jpps_multi_feed_wrapped(
                attrValues_free(ctx->other_soap,jr->attributes,jr->__sizeattributes);
        }
        GLITE_SECURITY_GSOAP_LIST_DESTROY(ctx->other_soap, &in, jobAttributes);
+       //#endif
 
        return err.code;
 }