From c796e5c11392b15f9678911538a169900f90e1c7 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20Filipovi=C4=8D?= Date: Thu, 6 Mar 2008 12:54:31 +0000 Subject: [PATCH] merge --- org.glite.jp.primary/Makefile | 16 +- org.glite.jp.primary/examples/getjobattr.pl | 44 +++ org.glite.jp.primary/examples/jpps-test.c | 25 +- org.glite.jp.primary/src/attrs.c | 120 ++++-- org.glite.jp.primary/src/classad_plugin.c | 8 +- org.glite.jp.primary/src/feed.c | 107 +++++- org.glite.jp.primary/src/is_client.c | 6 +- org.glite.jp.primary/src/new_ftp_backend.c | 564 ++-------------------------- 8 files changed, 295 insertions(+), 595 deletions(-) create mode 100644 org.glite.jp.primary/examples/getjobattr.pl diff --git a/org.glite.jp.primary/Makefile b/org.glite.jp.primary/Makefile index eb43ad7..0e8fa45 100644 --- a/org.glite.jp.primary/Makefile +++ b/org.glite.jp.primary/Makefile @@ -27,6 +27,12 @@ ifeq (${host_cpu},x86_64) archlib:=lib64 endif +archlib:=lib +host_cpu:=${shell uname -m} +ifeq (${host_cpu},x86_64) + archlib:=lib64 +endif + ifneq (${classads_prefix},/usr) classadslib := -L${classads_prefix}/${archlib} -lclassad @@ -36,9 +42,13 @@ CLASSADPLUGIN_LIBS:= ${classadslib} -lstdc++ CLASSADPLUGIN_LOBJS:= classad_plugin.lo +GLOBUS_CFLAGS:=-I${globus_prefix}/include/${nothrflavour} + DEBUG:=-g -O0 -DDEBUG -CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include -I${classads_prefix}/include -I${libtar_prefix}/include +CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include -I${classads_prefix}/include -I${libtar_prefix}/include ${GLOBUS_CFLAGS} +# 3.1 CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql -I${classads_prefix}/include -I${libtar_prefix}/include + LDFLAGS:=-L${stagedir}/lib LINK:=libtool --mode=link ${CC} ${LDFLAGS} @@ -151,6 +161,7 @@ install: -mkdir -p ${PREFIX}/bin ${PREFIX}/etc ${PREFIX}/examples ${PREFIX}/etc/init.d ${PREFIX}/lib ${INSTALL} -m 755 ${daemon} ${PREFIX}/bin ${INSTALL} -m 755 jpps-test ${PREFIX}/examples/glite-jp-primary-test + ${INSTALL} -m 755 ${top_srcdir}/examples/getjobattr.pl ${PREFIX}/examples/glite-jpps-getjobattr.pl if [ x${DOSTAGE} = xyes ]; then \ mkdir -p ${PREFIX}/include/${globalprefix}/${jpprefix} ; \ (cd ${top_srcdir}/interface && install -m 644 ${HDRS_I} ${PREFIX}/include/${globalprefix}/${jpprefix}) ; \ @@ -189,6 +200,9 @@ glite-jp-ftpdauth.la: ftpd_auth.lo #glite-jp-classad.lo: classad_plugin.c # ${LTCOMPILE} -DPLUGIN_DEBUG -o $@ -c $< +#glite-jp-classad.lo: classad_plugin.c +# ${LTCOMPILE} -DPLUGIN_DEBUG -o $@ -c $< + %.lo: %.c ${LTCOMPILE} -o $@ -c $< diff --git a/org.glite.jp.primary/examples/getjobattr.pl b/org.glite.jp.primary/examples/getjobattr.pl new file mode 100644 index 0000000..fb4be1c --- /dev/null +++ b/org.glite.jp.primary/examples/getjobattr.pl @@ -0,0 +1,44 @@ +#!/usr/bin/perl + +use SOAP::Lite; +use Data::Dumper; + +$ENV{HTTPS_CA_DIR}='/etc/grid-security/certificates'; +$ENV{HTTPS_VERSION}='3'; + +$ENV{HTTPS_CERT_FILE}="$ENV{HOME}/.globus/usercert.pem"; +$ENV{HTTPS_KEY_FILE}="$ENV{HOME}/.globus/userkey.pem"; + +$proxy = shift; +$job = shift; + +die "usage: $0 https://jp.primary.storage.org:8901/jpps https://some.nice.job/id attr attr ...\n" + unless $ARGV[0]; + +$c = SOAP::Lite + -> proxy($proxy) + -> uri('http://glite.org/wsdl/services/jp'); + +service $c 'http://egee.cesnet.cz/cms/export/sites/egee/en/WSDL/3.1/JobProvenancePS.wsdl' or die "service: $1\n"; + +ns $c 'http://glite.org/wsdl/elements/jp'; + +print "WSDL OK\n"; + +push @attr,SOAP::Data->name(attributes => $_) for (@ARGV); + +$req = SOAP::Data->value( + SOAP::Data->name(jobid => $job), + @attr +# SOAP::Data->name(attributes => 'http://egee.cesnet.cz/en/Schema/LB/Attributes:CE'), +# SOAP::Data->name(attributes => 'http://egee.cesnet.cz/en/Schema/JP/System:owner') +); + + + +on_fault $c sub { print Dumper($_[1]->fault); $fault = 1; }; + +$resp = GetJobAttributes $c $req; + +print Dumper $resp->body unless $fault; + diff --git a/org.glite.jp.primary/examples/jpps-test.c b/org.glite.jp.primary/examples/jpps-test.c index c03df55..1af5271 100644 --- a/org.glite.jp.primary/examples/jpps-test.c +++ b/org.glite.jp.primary/examples/jpps-test.c @@ -31,7 +31,7 @@ static void usage(const char *me) " RegisterJob jobid owner\n" " StartUpload jobid class commit_before mimetype\n" " CommitUpload destination\n" - " RecordTag jobid tagname stringvalue\n" + " RecordTag jobid tagname stringvalue ...\n" " GetJobFiles jobid\n" " GetJobAttr jobid attr\n" " FeedIndex [yes (history)]\n" @@ -138,21 +138,24 @@ int main(int argc,char *argv[]) struct _jpelem__RecordTagResponse empty; struct jptype__tagValue tagval; struct jptype__stringOrBlob val; - - int seq = 0; + int idx; - if (argc != 5) usage(argv[0]); - + if (argc < 5 && argc % 2 == 0) usage(argv[0]); + in.jobid = argv[2]; in.tag = &tagval; - tagval.name = argv[3]; tagval.value = &val; - memset(&val, 0, sizeof(val)); - GSOAP_SETSTRING(&val, argv[4]); - if (!(ret = check_fault(soap, - soap_call___jpsrv__RecordTag(soap, server, "",&in, &empty)))) { - /* OK */ + for (idx = 3; idx < argc; idx += 2) { + tagval.name = argv[idx]; + memset(&val, 0, sizeof(val)); + GSOAP_SETSTRING(&val, argv[idx+1]); + + printf("%s ... ",tagval.name); + if (!(ret = check_fault(soap, + soap_call___jpsrv__RecordTag(soap, server, "",&in, &empty)))) { + /* OK */ + } } } else if (!strcasecmp(argv[1],"FeedIndex")) { diff --git a/org.glite.jp.primary/src/attrs.c b/org.glite.jp.primary/src/attrs.c index 8cc0f50..4d31864 100644 --- a/org.glite.jp.primary/src/attrs.c +++ b/org.glite.jp.primary/src/attrs.c @@ -19,7 +19,10 @@ static struct { char *namespace; glite_jpps_fplug_data_t **plugins; int nplugins; - + char **opened_classes; // for each plugin contains name of the open class (NULL for no open file) + char **opened_files; // for each plugin contains name of the open file (NULL for no open file) + void **plugin_handles; // contains handle for each opened plugin (NULL for not opened) + void **file_handles; // contains handle for each opened file by plugin (NULL for not opened) } *known_namespaces; static void scan_namespaces(glite_jp_context_t ctx) @@ -44,6 +47,18 @@ static void scan_namespaces(glite_jp_context_t ctx) known_namespaces[k].plugins[known_namespaces[k].nplugins++] = pd; known_namespaces[k].plugins[known_namespaces[k].nplugins] = NULL; known_namespaces[k].namespace = pd->namespaces[j]; + known_namespaces[k].opened_classes = realloc(known_namespaces[k].opened_classes, + (known_namespaces[k].nplugins + 1) * sizeof(char*)); + known_namespaces[k].opened_classes[known_namespaces[k].nplugins-1] = NULL; + known_namespaces[k].opened_files = realloc(known_namespaces[k].opened_files, + (known_namespaces[k].nplugins + 1) * sizeof(char*)); + known_namespaces[k].opened_files[known_namespaces[k].nplugins-1] = NULL; + known_namespaces[k].plugin_handles = realloc(known_namespaces[k].plugin_handles, + (known_namespaces[k].nplugins + 1) * sizeof(void*)); + known_namespaces[k].plugin_handles[known_namespaces[k].nplugins-1] = NULL; + known_namespaces[k].file_handles = realloc(known_namespaces[k].file_handles, + (known_namespaces[k].nplugins + 1) * sizeof(void*)); + known_namespaces[k].file_handles[known_namespaces[k].nplugins-1] = NULL; } else { printf("Adding new namespace %s\n", pd->namespaces[j]); @@ -54,6 +69,10 @@ static void scan_namespaces(glite_jp_context_t ctx) known_namespaces[k].nplugins = 1; known_namespaces[k].namespace = pd->namespaces[j]; memset(known_namespaces+k+1,0,sizeof *known_namespaces); + known_namespaces[k].opened_classes = calloc(1, sizeof(char*)); + known_namespaces[k].opened_files = calloc(1, sizeof(char*)); + known_namespaces[k].plugin_handles = calloc(1, sizeof(void*)); + known_namespaces[k].file_handles = calloc(1, sizeof(void*)); } } } @@ -73,7 +92,7 @@ static int merge_attrvals(glite_jp_attrval_t **out,int nout,const glite_jp_attrv return nout+nin; } -void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t** out, int* nout, const char* attr, const glite_jpps_fplug_data_t* plugin, const char* class, const char* uri){ +void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t** out, int* nout, const char* attr, const glite_jpps_fplug_data_t* plugin, const char* class, const char* uri, const char **opened_class, const char **opened_file, void **op_handle, void **of_handle){ void *ph, *beh; char** names = NULL; int nnames; @@ -82,29 +101,56 @@ void process_files(glite_jp_context_t ctx, const char *job, glite_jp_attrval_t** nnames = glite_jppsbe_get_names(ctx, job, class, &names); - for (n = 0; n < nnames; n++) - if (! glite_jppsbe_open_file(ctx,job,class, names[n], O_RDONLY, &beh)) { - if (!plugin->ops.open(plugin->fpctx,beh,uri,&ph)) { - glite_jp_attrval_t* myattr; - // XXX: ignore errors - if (!plugin->ops.attr(plugin->fpctx,ph,attr,&myattr) && myattr) { - int k; - for (k=0; myattr[k].name; k++) { - myattr[k].origin = GLITE_JP_ATTR_ORIG_FILE; - if (!myattr[k].origin_detail) - trio_asprintf(&myattr[k].origin_detail,"%s %s", uri, names[n] ? names[n] : ""); - } - *nout = merge_attrvals(out,*nout,myattr); - free(myattr); - } - keep_err = ctx->error; ctx->error = NULL; - plugin->ops.close(plugin->fpctx, ph); - if (keep_err) { ctx->error = keep_err; keep_err = NULL; } - } + for (n = 0; n < nnames; n++){ + int plugin_ok = 0; + if (*opened_class && !strcmp(*opened_class, class) && ((!*opened_file && !names[n]) || !strcmp(*opened_file, names[n]))){ + ph = *op_handle; + beh = *of_handle; + plugin_ok = 1; + } + else{ + if (*opened_class){ + free(*opened_class); + *opened_class = NULL; + free(*opened_file); + *opened_file = NULL; + plugin->ops.close(plugin->fpctx, *op_handle); + *op_handle = NULL; + glite_jppsbe_close_file(ctx, *of_handle); + *of_handle = NULL; + } + if (! glite_jppsbe_open_file(ctx,job,class, names[n], O_RDONLY, &beh) + && !plugin->ops.open(plugin->fpctx,beh,uri,&ph)){ + plugin_ok = 1; + *opened_class = strdup(class); + if (names[n]) + *opened_file = strdup(names[n]); + else + *opened_file = NULL; + *op_handle = (void*)ph; + *of_handle = (void*)beh; + printf("opening plugin %i at class %s, file %s\n", *op_handle, class, names[n]); + } + } + if (plugin_ok){ + glite_jp_attrval_t* myattr; + // XXX: ignore errors + if (!plugin->ops.attr(plugin->fpctx,ph,attr,&myattr) && myattr) { + int k; + for (k=0; myattr[k].name; k++) { + myattr[k].origin = GLITE_JP_ATTR_ORIG_FILE; + if (!myattr[k].origin_detail) + trio_asprintf(&myattr[k].origin_detail,"%s %s", uri, names[n] ? names[n] : ""); + } + *nout = merge_attrvals(out,*nout,myattr); + free(myattr); + } keep_err = ctx->error; ctx->error = NULL; - glite_jppsbe_close_file(ctx,beh); if (keep_err) { ctx->error = keep_err; keep_err = NULL; } - } + } + keep_err = ctx->error; ctx->error = NULL; + if (keep_err) { ctx->error = keep_err; keep_err = NULL; } + } } int glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char **attr,int nattr,glite_jp_attrval_t **attrs_out) @@ -119,6 +165,7 @@ int glite_jpps_get_attrs(glite_jp_context_t ctx,const char *job,char **attr,int /* sort the queried attributes to backend metadata and others -- retrived by plugins * XXX: assumes unique values for metadata. */ + for (i=0; iclasses[l]; l++) process_files(ctx, job, &out, &nout, other[i], known_namespaces[j].plugins[k] , known_namespaces[j].plugins[k]->classes[l] - , known_namespaces[j].plugins[k]->uris[l]); + , known_namespaces[j].plugins[k]->uris[l] + , &known_namespaces[j].opened_classes[k] + , &known_namespaces[j].opened_files[k] + , &known_namespaces[j].plugin_handles[k] + , &known_namespaces[j].file_handles[k]); break; } - free(attr_namespace); + free(attr_namespace); attr_namespace = NULL; } + free(attr_namespace); } +/* close plugins */ + for (i = 0; known_namespaces && known_namespaces[i].namespace; i++) + for (j = 0; known_namespaces[i].plugins[j]; j++) + if (known_namespaces[i].opened_classes[j]){ + known_namespaces[i].plugins[j]->ops.close(known_namespaces[i].plugins[j]->fpctx + , known_namespaces[i].plugin_handles[j]); + printf("closing plugin %i at class %s, file %s\n", known_namespaces[i].plugin_handles[j], known_namespaces[i].opened_classes[j], known_namespaces[i].opened_files[j]); + glite_jppsbe_close_file(ctx, known_namespaces[i].file_handles[j]); + known_namespaces[i].opened_classes[j] = NULL; + known_namespaces[i].opened_files[j] = NULL; + known_namespaces[i].plugin_handles[j] = NULL; + known_namespaces[i].file_handles[j] = NULL; + } + nout = merge_attrvals(&out,nout,meta); free(meta); meta = NULL; diff --git a/org.glite.jp.primary/src/classad_plugin.c b/org.glite.jp.primary/src/classad_plugin.c index 6bd8328..1d7b6ec 100644 --- a/org.glite.jp.primary/src/classad_plugin.c +++ b/org.glite.jp.primary/src/classad_plugin.c @@ -176,13 +176,17 @@ static int classad_query(void *fpctx,void *handle, const char *attr,glite_jp_att return glite_jp_stack_error(ctx,&err); } - if (cclassad_evaluate_to_string(h->ad, strrchr(attr, ':')+1, &str)) { + if (!cclassad_evaluate_to_string(h->ad, strrchr(attr, ':')+1, &str) && + cclassad_evaluate_to_expr(h->ad, strrchr(attr, ':')+1, &str) && + !strcasecmp(str,"undefined")) { free(str); str = NULL; } + + if (str) { //struct stat fattr; /*XXX ignore error */ //glite_jppsbe_file_attrs(ctx, h->bhandle, &fattr); av = calloc(2, sizeof(glite_jp_attrval_t)); av[0].name = strdup(attr); - av[0].value = strdup(str); + av[0].value = str; str = NULL; av[0].size = -1; av[0].timestamp = h->timestamp; av[0].origin = GLITE_JP_ATTR_ORIG_FILE; diff --git a/org.glite.jp.primary/src/feed.c b/org.glite.jp.primary/src/feed.c index 47e9fea..b461403 100644 --- a/org.glite.jp.primary/src/feed.c +++ b/org.glite.jp.primary/src/feed.c @@ -23,7 +23,6 @@ extern pid_t master; * seconds before feed expires: should be * XXX: should be configurable, default for real deployment sort of 1 hour */ -//#define FEED_TTL 36000000 #define FEED_TTL 360 /* XXX: configurable */ @@ -190,8 +189,11 @@ static int match_feed( } else { if (!owner) { + for (i=0; meta[i].name; i++) glite_jp_attrval_free(meta+i,0); + memset(meta,0,sizeof meta); + meta[0].name = strdup(GLITE_JP_ATTR_OWNER); glite_jppsbe_get_job_metadata(ctx,job,meta); - for (i=0; meta[i].name && strcmp(meta[i].name,GLITE_JP_ATTR_OWNER); i++); + owner = meta[0].value; } glite_jpps_single_feed(ctx,feed->id,0,feed->destination,job,owner,attrs); } @@ -205,12 +207,20 @@ cleanup: * kdyby ne, stejne se to nepovede ; * totez pro match_file */ -int glite_jpps_match_attr( +typedef struct{ + char *job; + glite_jp_attrval_t *attrs; +} match_attr; + + +int match_attr_deferred( glite_jp_context_t ctx, - const char *job, - const glite_jp_attrval_t attrs[] + void *ma ) { + char *job = ((match_attr*)ma)->job; + glite_jp_attrval_t *attrs = ((match_attr*)ma)->attrs; + struct jpfeed *f = (struct jpfeed *) ctx->feeds; int i,j,doit; @@ -225,9 +235,35 @@ int glite_jpps_match_attr( if (doit) match_feed(ctx,f,job,attrs); } + free(((match_attr*)ma)->job); + //free(((match_attr*)ma)->attrs); + return glite_jp_clear_error(ctx); } +int glite_jpps_match_attr( + glite_jp_context_t ctx, + const char *job, + const glite_jp_attrval_t attrs[] +) +{ + match_attr *ma = malloc(sizeof *ma); + ma->job = strdup(job); + ma->attrs = malloc(sizeof(*ma->attrs)); + ma->attrs[0].name = NULL; + int i; + for (i = 0; attrs[i].name; i++){ + ma->attrs = realloc(ma->attrs, (i+2)*sizeof(*ma->attrs)); + + memcpy(&(ma->attrs[i]), &(attrs[i]), sizeof(*ma->attrs)); + ma->attrs[i].name = strdup(attrs[i].name); + ma->attrs[i].value = strdup(attrs[i].value); + + ma->attrs[i+1].name = NULL; + } + glite_jp_add_deferred(ctx, match_attr_deferred, ma); +} + static int attr_void_cmp(const void *a, const void *b) { char const * const *ca = (char const * const *) a; @@ -259,13 +295,21 @@ static void attr_union(char **a, char **b, char ***c) *c = out; } -int glite_jpps_match_file( +typedef struct{ + char *job; + char *class; + char *name; +} match_file; + +int match_file_deferred( glite_jp_context_t ctx, - const char *job, - const char *class, - const char *name + void *mf ) { + char *job = ((match_file*)mf)->job; + char *class = ((match_file*)mf)->class; + char *name = ((match_file*)mf)->name; + glite_jpps_fplug_data_t **pd = NULL; int pi; void *bh = NULL; @@ -298,6 +342,9 @@ int glite_jpps_match_file( attrs = attrs2; } + vals = malloc(sizeof *vals); + vals[0].name = NULL; + for (pi=0; pd[pi]; pi++) { int ci; for (ci=0; pd[pi]->uris[ci]; ci++) if (!strcmp(pd[pi]->classes[ci],class)) { @@ -367,9 +414,31 @@ int glite_jpps_match_file( for (i=0; meta[i].name; i++) glite_jp_attrval_free(meta+i,0); + free(((match_file*)mf)->job); + free(((match_file*)mf)->class); + free(((match_file*)mf)->name); + return 0; } +int glite_jpps_match_file( + glite_jp_context_t ctx, + const char *job, + const char *class, + const char *name +) +{ + match_file* mf = malloc(sizeof(*mf)); + mf->job = strdup(job); + mf->class = strdup(class); + if (name) + mf->name = strdup(name); + else + mf->name = NULL; + + glite_jp_add_deferred(ctx, match_file_deferred, mf); +} + static char *generate_feedid(void) { char hname[200],buf[1000]; @@ -496,10 +565,9 @@ static int feed_query_callback( goto cleanup; } -/* no attributes known -- can't match */ - if (!other) goto cleanup; - /* filter on non-meta query items */ + if (f->nother_qry && !other) goto cleanup; /* unknown values can't match */ + for (i=0; inother_qry; i++) { for (j=0; other[j].name; j++) if (check_qry_item(ctx,f->other_qry+i,other+j)) break; @@ -515,13 +583,14 @@ static int feed_query_callback( nout++; } - for (i=0; other[i].name; i++) - for (j=0; jint_other_attr; j++) - if (!strcmp(other[i].name,f->other_attr[j])) { - out = realloc(out,(nout+2) * sizeof *out); - glite_jp_attrval_copy(out+nout,other+i); - nout++; - } + if (other) + for (i=0; other[i].name; i++) + for (j=0; jint_other_attr; j++) + if (!strcmp(other[i].name,f->other_attr[j])) { + out = realloc(out,(nout+2) * sizeof *out); + glite_jp_attrval_copy(out+nout,other+i); + nout++; + } if (nout) { int oi; diff --git a/org.glite.jp.primary/src/is_client.c b/org.glite.jp.primary/src/is_client.c index b26a18e..9d93345 100644 --- a/org.glite.jp.primary/src/is_client.c +++ b/org.glite.jp.primary/src/is_client.c @@ -168,7 +168,6 @@ static int glite_jpps_single_feed_wrapped( return err.code; } - int glite_jpps_single_feed( glite_jp_context_t ctx, const char *feed, @@ -180,6 +179,7 @@ int glite_jpps_single_feed( ) { int retry,ret; + assert(owner); for (retry = 0; retry < MAX_RETRY; retry++) { if ((ret = glite_jpps_single_feed_wrapped(ctx,feed,done,destination,job,owner,attrs)) == 0) break; sleep(RETRY_SLEEP); @@ -227,6 +227,8 @@ static int glite_jpps_multi_feed_wrapped( jr->jobid = jobs[i]; jr->owner = owners[i]; + assert(jr->owner); + jr->__sizeattributes = jp2s_attrValues(ctx->other_soap, attrs[i], &jr->attributes,0); @@ -236,6 +238,7 @@ static int glite_jpps_multi_feed_wrapped( jr->primaryStorage = &ctx->myURL; } + //#ifndef JP_PERF SWITCH_SOAP_CTX check_fault(ctx,ctx->other_soap, soap_call___jpsrv__UpdateJobs(ctx->other_soap,destination,"", &in,&out)); @@ -245,6 +248,7 @@ static int glite_jpps_multi_feed_wrapped( attrValues_free(ctx->other_soap,jr->attributes,jr->__sizeattributes); } GLITE_SECURITY_GSOAP_LIST_DESTROY(ctx->other_soap, &in, jobAttributes); + //#endif return err.code; } diff --git a/org.glite.jp.primary/src/new_ftp_backend.c b/org.glite.jp.primary/src/new_ftp_backend.c index 353c81b..84de773 100644 --- a/org.glite.jp.primary/src/new_ftp_backend.c +++ b/org.glite.jp.primary/src/new_ftp_backend.c @@ -415,6 +415,8 @@ int glite_jppsbe_start_upload( char *peername = NULL; char *peerhash = NULL; char *commit_before_inout_str; + time_t now; + char *now_str = NULL; char *stmt = NULL; glite_lbu_Statement db_res; @@ -504,7 +506,7 @@ int glite_jppsbe_start_upload( /* XXX: gsoap does not like so much, one year should be enough *commit_before_inout = (time_t) LONG_MAX; */ - *commit_before_inout = time(NULL) + 365*24*60*60; + *commit_before_inout = time(NULL) + 5*60;//365*24*60*60; /* if (add_to_gridmap(ctx, peername)) { @@ -513,6 +515,8 @@ int glite_jppsbe_start_upload( goto error_out; } */ + else if (*commit_before_inout > time(NULL) + 5*60) + *commit_before_inout = time(NULL) + 5*60; peerhash = str2md5(peername); /* static buffer */ if (store_user(ctx, peerhash, peername)) { @@ -522,10 +526,29 @@ int glite_jppsbe_start_upload( } free(stmt); stmt = NULL; + + time(&now); + glite_lbu_TimeToDB(now,&now_str); + + trio_asprintf(&stmt,"delete from files where jobid = '%|Ss' and state = 'uploading' and deadline < %s", ju, now_str); + free(now_str); + + if (!stmt) { + err.code = ENOMEM; + goto error_out; + } + if (glite_jp_db_ExecSQL(ctx, stmt, NULL) < 0) { + err.code = EIO; + err.desc = "DB access failed"; + goto error_out; + } + + free(stmt); stmt = NULL; + glite_lbu_TimeToDB(*commit_before_inout, &commit_before_inout_str); trio_asprintf(&stmt,"insert into files" "(jobid,filename,int_path,ext_url,state,deadline,ul_userid) " - "values ('%|Ss','%|Ss','%|Ss','%|Ss','%|Ss', '%|Ss', '%|Ss')", + "values ('%|Ss','%|Ss','%|Ss','%|Ss','%|Ss', %s, '%|Ss')", ju, data_basename, data_fname, *destination_out, "uploading", commit_before_inout_str, peerhash); free(commit_before_inout_str); @@ -544,7 +567,7 @@ int glite_jppsbe_start_upload( } goto error_out; } - + error_out: free(db_row[0]); free(db_row[1]); free(stmt); @@ -1078,6 +1101,7 @@ error_out: } } + int glite_jppsbe_file_attrs(glite_jp_context_t ctx, void *handle, struct stat *buf){ glite_jp_error_t err; @@ -1294,70 +1318,6 @@ cleanup: return err.code; } -#if 0 /* called from query */ -static int get_job_info_int( - glite_jp_context_t ctx, - const char *int_fname, - char **jobid, - char **owner, - struct timeval *tv_reg -) -{ - FILE *regfile = NULL; - long reg_time_sec; - long reg_time_usec; - int ownerlen = 0; - int info_version; - char jobid_buf[256]; - glite_jp_error_t err; - - glite_jp_clear_error(ctx); - memset(&err,0,sizeof err); - err.source = __FUNCTION__; - - regfile = fopen(int_fname, "r"); - if (regfile == NULL) { - err.code = errno; - err.desc = "Cannot open jobs's reg info file"; - goto error_out; - } - if (fscanf(regfile, "%d %ld.%ld %s %*s %d ", &info_version, - ®_time_sec, ®_time_usec, jobid_buf, &ownerlen) < 5 || ferror(regfile)) { - fclose(regfile); - err.code = errno; - err.desc = "Cannot read jobs's reg info file"; - goto error_out; - } - *jobid = strdup(jobid_buf); - if (ownerlen) { - *owner = (char *) calloc(1, ownerlen+1); - if (!*owner) { - err.code = ENOMEM; - goto error_out; - } - if (fgets(*owner, ownerlen+1, regfile) == NULL) { - fclose(regfile); - free(*owner); - err.code = errno; - err.desc = "Cannot read jobs's reg info file"; - goto error_out; - } - } - fclose(regfile); - - tv_reg->tv_sec = reg_time_sec; - tv_reg->tv_usec = reg_time_usec; - -error_out: - if (err.code) { - return glite_jp_stack_error(ctx,&err); - } else { - return 0; - } -} - -#endif - int glite_jppsbe_get_job_metadata( glite_jp_context_t ctx, const char *job, @@ -1367,12 +1327,7 @@ int glite_jppsbe_get_job_metadata( int got_info = 0; struct timeval tv_reg; char *owner = NULL; -/* do in plugin - int got_tags = 0; - void *tags_handle = NULL; - glite_jp_tagval_t* tags = NULL; -*/ - int i; + int i,j; glite_jp_error_t err; assert(job != NULL); @@ -1385,6 +1340,7 @@ int glite_jppsbe_get_job_metadata( for (i = 0; attrs_inout[i].name; i++) { /* must be implemented via filetype plugin case GLITE_JP_ATTR_TIME: + case GLITE_JP_ATTR_TAG: */ if (!strcmp(attrs_inout[i].name,GLITE_JP_ATTR_OWNER) || !strcmp(attrs_inout[i].name,GLITE_JP_ATTR_REGTIME)) { @@ -1397,27 +1353,6 @@ int glite_jppsbe_get_job_metadata( got_info = 1; } } - -/* must be implemented via filetype plugin - case GLITE_JP_ATTR_TAG: - if (!got_tags) { - if (glite_jppsbe_open_file(ctx, job, GLITE_JP_FILECLASS_TAGS, - O_RDONLY, &tags_handle)) { - err.code = ctx->error->code; - err.desc = "Cannot open tag file"; - goto error_out; - } - if (glite_jpps_tag_readall(ctx, tags_handle, &tags)) { - err.code = ctx->error->code; - err.desc = "Cannot read tags"; - glite_jppsbe_close_file(ctx, tags_handle); - goto error_out; - } - glite_jppsbe_close_file(ctx, tags_handle); - got_tags = 1; - } - break; -*/ else { err.code = EINVAL; err.desc = "Invalid attribute type"; @@ -1446,40 +1381,14 @@ int glite_jppsbe_get_job_metadata( attrs_inout[i].timestamp = tv_reg.tv_sec; } -/* TODO: - case GLITE_JP_ATTR_TIME: - attrs_inout[i].value.time = tv_reg; - break; -*/ - /* must be implemented via filetype plugin + case GLITE_JP_ATTR_TIME: case GLITE_JP_ATTR_TAG: - for (j = 0; tags[j].name != NULL; j++) { - if (!strcmp(tags[j].name, attrs_inout[i].attr.name)) { - if (glite_jpps_tagval_copy(ctx, &tags[j], - &attrs_inout[i].value.tag)) { - err.code = ENOMEM; - err.desc = "Cannot copy tag value"; - goto error_out; - } - break; - } - } - if (!tags[j].name) attrs_inout[i].value.tag.name = NULL; - break; */ } error_out: free(owner); -/* plugin - if (tags) for (j = 0; tags[j].name != NULL; j++) { - free(tags[j].name); - free(tags[j].value); - } - free(tags); -*/ - if (err.code) { while (i > 0) { i--; @@ -1500,417 +1409,6 @@ static int compare_timeval(struct timeval a, struct timeval b) } -/* FIXME: disabled -- clarification wrt. filetype plugin needed */ - -#if 0 - -static int query_phase2( - glite_jp_context_t ctx, - const char *ownerhash, - long regtime_tr, - int q_tags, - int md_tags, - const glite_jp_query_rec_t query[], - glite_jp_attrval_t metadata[], - int (*callback)( - glite_jp_context_t ctx, - const char *job, - const glite_jp_attrval_t metadata[] - ) -); - -static int query_phase2( - glite_jp_context_t ctx, - const char *ownerhash, - long regtime_tr, - int q_tags, - int md_tags, - const glite_jp_query_rec_t query[], - glite_jp_attrval_t metadata[], - int (*callback)( - glite_jp_context_t ctx, - const char *job, - const glite_jp_attrval_t metadata[] - ) -) -{ - char *time_dirname = NULL; - DIR *time_dirp = NULL; - struct dirent *jobent; - char *info_fname = NULL; - char *jobid = NULL; - char *owner = NULL; - struct timeval tv_reg; - void *tags_handle = NULL; - int matching; - int i, j; - glite_jp_tagval_t* tags = NULL; - glite_jp_error_t err; - - glite_jp_clear_error(ctx); - memset(&err,0,sizeof err); - err.source = __FUNCTION__; - - if (asprintf(&time_dirname, "%s/data/%s/%d", config->internal_path, - ownerhash, regtime_tr) == -1) { - err.code = ENOMEM; - return glite_jp_stack_error(ctx,&err); - } - time_dirp = opendir(time_dirname); - if (!time_dirp) { - free(time_dirname); - return 0; /* found nothing */ - } - while ((jobent = readdir(time_dirp)) != NULL) { - if (!strcmp(jobent->d_name, ".")) continue; - if (!strcmp(jobent->d_name, "..")) continue; - if (asprintf(&info_fname, "%s/%s/_info", time_dirname, - jobent->d_name) == -1) { - err.code = ENOMEM; - goto error_out; - } - if (get_job_info_int(ctx, info_fname, &jobid, &owner, &tv_reg)) { - err.code = EIO; - err.desc = "Cannot retrieve job info"; - goto error_out; - } - if (q_tags || md_tags) { - if (glite_jppsbe_open_file(ctx, jobid, GLITE_JP_FILECLASS_TAGS, - O_RDONLY, &tags_handle)) { - err.code = ctx->error->code; - err.desc = "Cannot open tag file"; - goto error_out; - } - if (glite_jpps_tag_readall(ctx, tags_handle, &tags)) { - err.code = ctx->error->code; - err.desc = "Cannot read tags"; - glite_jppsbe_close_file(ctx, tags_handle); - goto error_out; - } - glite_jppsbe_close_file(ctx, tags_handle); - tags_handle = NULL; - } - - matching = 1; - for (i = 0; matching && query[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { - switch (query[i].attr.type) { - case GLITE_JP_ATTR_OWNER: - if (query[i].value.s == NULL || - strcmp(query[i].value.s, owner)) matching = 0; - break; - case GLITE_JP_ATTR_TIME: - switch (query[i].op) { - case GLITE_JP_QUERYOP_EQUAL: - matching = !compare_timeval(tv_reg, query[i].value.time); - break; - case GLITE_JP_QUERYOP_UNEQUAL: - matching = compare_timeval(tv_reg, query[i].value.time); - break; - case GLITE_JP_QUERYOP_LESS: - matching = compare_timeval(tv_reg, query[i].value.time) < 0; - break; - case GLITE_JP_QUERYOP_GREATER: - matching = compare_timeval(tv_reg, query[i].value.time) > 0; - break; - case GLITE_JP_QUERYOP_WITHIN: - matching = compare_timeval(tv_reg, query[i].value.time) >= 0 - && compare_timeval(tv_reg, query[i].value2.time) <= 0; - break; - } - break; - case GLITE_JP_ATTR_TAG: - if (!tags) { - matching = 0; - break; - } - for (j = 0; tags[j].name != NULL; j++) { - if (!strcmp(tags[j].name, query[i].attr.name)) { - switch (query[i].op) { - case GLITE_JP_QUERYOP_EQUAL: - matching = !strcmp(tags[j].value, query[i].value.s); - break; - case GLITE_JP_QUERYOP_UNEQUAL: - matching = strcmp(tags[j].value, query[i].value.s); - break; - case GLITE_JP_QUERYOP_LESS: - matching = strcmp(tags[j].value, query[i].value.s) < 0; - break; - case GLITE_JP_QUERYOP_GREATER: - matching = strcmp(tags[j].value, query[i].value.s) > 0; - break; - case GLITE_JP_QUERYOP_WITHIN: - matching = strcmp(tags[j].value, query[i].value.s) >= 0 \ - && strcmp(tags[j].value, query[i].value2.s) <= 0 ; - break; - default: - break; - } - } - } - break; - default: - break; - } - } - if (!matching) { - free(info_fname); info_fname = NULL; - free(jobid); jobid = NULL; - if (tags) for (j = 0; tags[j].name != NULL; j++) { - free(tags[j].name); - free(tags[j].value); - } - free(tags); tags = NULL; - continue; - } - - for (i = 0; metadata[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { - switch (metadata[i].attr.type) { - case GLITE_JP_ATTR_OWNER: - metadata[i].value.s = owner; - break; - case GLITE_JP_ATTR_TIME: - metadata[i].value.time = tv_reg; - break; - case GLITE_JP_ATTR_TAG: - for (j = 0; tags[j].name != NULL; j++) { - if (!strcmp(tags[j].name, metadata[i].attr.name)) { - if (glite_jpps_tagval_copy(ctx, &tags[j], - &metadata[i].value.tag)) { - err.code = ENOMEM; - err.desc = "Cannot copy tag value"; - goto error_out; - } - break; - } - } - if (!tags[j].name) { - metadata[i].value.tag.name = NULL; - metadata[i].value.tag.value = NULL; - } - break; - default: - break; - } - } - (*callback)(ctx, jobid, metadata); - free(jobid); jobid = NULL; - while (i > 0) { - i--; - switch (metadata[i].attr.type) { - case GLITE_JP_ATTR_TAG: - free(metadata[i].value.tag.name); - free(metadata[i].value.tag.value); - default: - break; - } - } - } - -error_out: - if (tags) for (j = 0; tags[j].name != NULL; j++) { - free(tags[j].name); - free(tags[j].value); - } - if (tags_handle) glite_jppsbe_close_file(ctx, tags_handle); - free(info_fname); - free(owner); - free(jobid); - closedir(time_dirp); - free(time_dirname); - if (err.code) { - while (i > 0) { - i--; - switch (metadata[i].attr.type) { - case GLITE_JP_ATTR_TAG: - free(metadata[i].value.tag.name); - free(metadata[i].value.tag.value); - default: - break; - } - } - return glite_jp_stack_error(ctx,&err); - } else - return 0; -} - -int glite_jppsbe_query( - glite_jp_context_t ctx, - const glite_jp_query_rec_t query[], - const glite_jp_attrval_t metadata[], - void *arg, - int (*callback)( - glite_jp_context_t ctx, - const char *job, - const glite_jp_attrval_t metadata[], - void *arg - ) -) -{ - /* XXX clone metadata */ - int i; - char *q_exact_owner = NULL; - char *ownerhash = NULL; - long q_min_time = 0; - long q_max_time = LONG_MAX; - long q_min_time_tr; - long q_max_time_tr; - int q_with_tags = 0; - int md_info = 0; - int md_tags = 0; - char *owner_dirname = NULL; - DIR *owner_dirp = NULL; - struct dirent *ttimeent; - char *data_dirname = NULL; - DIR *data_dirp = NULL; - struct dirent *ownerent; - long ttime = 0; - glite_jp_attrval_t *metadata_templ = NULL; - glite_jp_error_t err; - - glite_jp_clear_error(ctx); - memset(&err,0,sizeof err); - err.source = __FUNCTION__; - - for (i = 0; query[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { - if (query[i].attr.type == GLITE_JP_ATTR_OWNER && query[i].op == GLITE_JP_QUERYOP_EQUAL) { - q_exact_owner = query[i].value.s; - } - if (query[i].attr.type == GLITE_JP_ATTR_TIME) { - switch (query[i].op) { - case GLITE_JP_QUERYOP_EQUAL: - q_min_time = query[i].value.time.tv_sec; - q_max_time = query[i].value.time.tv_sec + 1; - break; - case GLITE_JP_QUERYOP_LESS: - if (q_max_time > query[i].value.time.tv_sec + 1) - q_max_time = query[i].value.time.tv_sec + 1; - break; - case GLITE_JP_QUERYOP_WITHIN: - if (q_max_time > query[i].value2.time.tv_sec + 1) - q_max_time = query[i].value2.time.tv_sec + 1; - /* fallthrough */ - case GLITE_JP_QUERYOP_GREATER: - if (q_min_time < query[i].value.time.tv_sec) - q_min_time = query[i].value.time.tv_sec; - break; - default: - err.code = EINVAL; - err.desc = "Invalid query op"; - return glite_jp_stack_error(ctx,&err); - break; - } - } - if (query[i].attr.type == GLITE_JP_ATTR_TAG) - q_with_tags = 1; - - } - - for (i = 0; metadata[i].attr.type != GLITE_JP_ATTR_UNDEF; i++) { - switch (metadata[i].attr.type) { - case GLITE_JP_ATTR_OWNER: - case GLITE_JP_ATTR_TIME: - md_info = 1; - break; - case GLITE_JP_ATTR_TAG: - md_tags = 1; - break; - default: - err.code = EINVAL; - err.desc = "Invalid attribute type in metadata parameter"; - return glite_jp_stack_error(ctx,&err); - break; - } - } - metadata_templ = (glite_jp_attrval_t *) calloc(i + 1, sizeof(glite_jp_attrval_t)); - if (!metadata_templ) { - err.code = ENOMEM; - return glite_jp_stack_error(ctx,&err); - } - memcpy(metadata_templ, metadata, (i + 1) * sizeof(glite_jp_attrval_t)); - - q_min_time_tr = regtime_trunc(q_min_time); - q_max_time_tr = regtime_ceil(q_max_time); - - if (q_exact_owner) { - ownerhash = str2md5(q_exact_owner); /* static buffer */ - if (asprintf(&owner_dirname, "%s/data/%s", config->internal_path, ownerhash) == -1) { - err.code = ENOMEM; - return glite_jp_stack_error(ctx,&err); - } - owner_dirp = opendir(owner_dirname); - free(owner_dirname); - if (!owner_dirp) { - free(metadata_templ); - return 0; /* found nothing */ - } - while ((ttimeent = readdir(owner_dirp)) != NULL) { - if (!strcmp(ttimeent->d_name, ".")) continue; - if (!strcmp(ttimeent->d_name, "..")) continue; - ttime = atol(ttimeent->d_name); - if (ttime >= q_min_time_tr && ttime < q_max_time_tr) { - if (query_phase2(ctx, ownerhash, ttime, q_with_tags, md_tags, - query, metadata_templ, callback)) { - err.code = EIO; - err.desc = "query_phase2() error"; - goto error_out; - } - } - } - } else { /* !q_exact_owner */ - if (asprintf(&data_dirname, "%s/data", config->internal_path) == -1) { - err.code = ENOMEM; - goto error_out; - } - data_dirp = opendir(data_dirname); - if (!data_dirp) { - err.code = EIO; - err.desc = "Cannot open data directory"; - goto error_out; - } - while ((ownerent = readdir(data_dirp)) != NULL) { - if (!strcmp(ownerent->d_name, ".")) continue; - if (!strcmp(ownerent->d_name, "..")) continue; - if (asprintf(&owner_dirname, "%s/data/%s", config->internal_path, - ownerent->d_name) == -1) { - err.code = ENOMEM; - goto error_out; - } - owner_dirp = opendir(owner_dirname); - free(owner_dirname); - if (!owner_dirp) { - err.code = EIO; - err.desc = "Cannot open owner data directory"; - goto error_out; - } - while ((ttimeent = readdir(owner_dirp)) != NULL) { - if (!strcmp(ttimeent->d_name, ".")) continue; - if (!strcmp(ttimeent->d_name, "..")) continue; - ttime = atol(ttimeent->d_name); - if (ttime >= q_min_time_tr && ttime < q_max_time_tr) { - if (query_phase2(ctx, ownerent->d_name, ttime, q_with_tags, md_tags, - query, metadata_templ, callback)) { - err.code = EIO; - err.desc = "query_phase2() error"; - goto error_out; - } - } - } - closedir(owner_dirp); owner_dirp = NULL; - } - closedir(data_dirp); data_dirp = NULL; - } - return 0; - -error_out: - if (owner_dirp) closedir(owner_dirp); - if (data_dirp) closedir(data_dirp); - free(data_dirname); - free(metadata_templ); - return glite_jp_stack_error(ctx,&err); -} - -#else - int glite_jppsbe_query( glite_jp_context_t ctx, const glite_jp_query_rec_t query[], @@ -2095,8 +1593,6 @@ cleanup: return err.code; } -#endif - int glite_jppsbe_is_metadata(glite_jp_context_t ctx,const char *attr) { -- 1.8.2.3