#include "glite/jp/types.h"
#include "glite/jp/context.h"
#include "glite/jp/strmd5.h"
+#include "glite/jp/known_attr.h"
#include "feed.h"
#include "file_plugin.h"
#include "builtin_plugins.h"
free(f);
}
+static void drop_jobs(struct jpfeed *f)
+{
+ int i,j;
+ for (i=0; i<f->njobs; i++) {
+ for (j=0; f->job_attrs[i][j].name; j++)
+ glite_jp_attrval_free(&f->job_attrs[i][j],0);
+ free(f->job_attrs[i]);
+ free(f->jobs[i]);
+ free(f->owners[i]);
+ }
+ free(f->job_attrs);
+ free(f->jobs);
+ free(f->owners);
+ f->job_attrs = NULL;
+ f->jobs = NULL;
+ f->njobs = 0;
+}
+
static int drain_feed(glite_jp_context_t ctx, struct jpfeed *f,int done)
{
int ret = 0;
glite_jp_clear_error(ctx);
if (f->njobs) {
- int i,j;
- ret = glite_jpps_multi_feed(ctx,f->id,done,f->njobs,f->destination,f->jobs,f->job_attrs);
-
- for (i=0; i<f->njobs; i++) {
- for (j=0; f->job_attrs[i][j].name; j++)
- glite_jp_attrval_free(&f->job_attrs[i][j],0);
- free(f->job_attrs[i]);
- free(f->jobs[i]);
- }
- free(f->job_attrs);
- free(f->jobs);
- f->job_attrs = NULL;
- f->jobs = NULL;
- f->njobs = 0;
+ ret = glite_jpps_multi_feed(ctx,f->id,done,f->njobs,f->destination,f->jobs,f->owners,f->job_attrs);
+ drop_jobs(f);
}
return ret;
}
const glite_jp_attrval_t meta[],
void *arg)
{
- int i,j,nout = 0;
+ int i,j,nout = 0,ec;
glite_jp_error_t err;
struct jpfeed *f = arg;
glite_jp_attrval_t *other = NULL,*out = NULL;
glite_jp_clear_error(ctx);
/* retrieve other attributes */
- if (glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&other)) {
- err.code = EIO;
- err.desc = "retrieve job attributes";
- glite_jp_stack_error(ctx,&err);
- goto cleanup;
+ ec = glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&other);
+ switch (ec) {
+ case 0: break;
+ case ENOENT: glite_jp_clear_error(ctx); break;
+ default:
+ err.code = EIO;
+ err.desc = "retrieve job attributes";
+ glite_jp_stack_error(ctx,&err);
+ goto cleanup;
}
/* no attributes known -- can't match */
}
if (nout) {
+ int oi;
+
memset(out+nout,0,sizeof *out);
f->jobs = realloc(f->jobs,(f->njobs+1)*sizeof *f->jobs);
f->jobs[f->njobs] = strdup(job);
f->job_attrs = realloc(f->job_attrs,(f->njobs+1)*sizeof *f->job_attrs);
f->job_attrs[f->njobs] = out;
out = NULL;
+
+ for (oi=0; strcmp(meta[oi].name,GLITE_JP_ATTR_OWNER); oi++);
+ assert(meta[oi].name);
+ f->owners = realloc(f->owners,(f->njobs+1)*sizeof *f->owners);
+ f->owners[f->njobs] = strdup(meta[oi].value);
+
f->njobs++;
}
{
struct jpfeed *f = feed;
int i,m,o,cnt,ret = 0;
+ char **meta;
glite_jp_clear_error(ctx);
/* count "meta" attributes */
if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) cnt++;
f->meta_attr = cnt ? malloc((cnt+1) * sizeof *f->meta_attr) : NULL;
- f->meta_attr[cnt] = NULL;
f->nmeta_attr = cnt;
f->other_attr = i-cnt ? malloc((i-cnt+1) * sizeof *f->other_attr) : NULL;
+ f->nother_attr = i-cnt;
/* sort attributes to "meta" and others */
m = o = 0;
for (i=0; f->attrs[i]; i++)
- if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) f->meta_attr[m++] = f->attrs[i];
- else f->other_attr[o++] = f->attrs[i];
+ if (glite_jppsbe_is_metadata(ctx,f->attrs[i]))
+ if (!strcmp(f->attrs[i],GLITE_JP_ATTR_OWNER)) {
+ free(f->attrs[i]);
+ f->nmeta_attr--;
+ }
+ else f->meta_attr[m++] = f->attrs[i];
+ else {
+ /* XXX: jobid and owner are sent anyway */
+ if (!strcmp(f->attrs[i],GLITE_JP_ATTR_JOBID)) {
+ free(f->attrs[i]);
+ f->nother_attr--;
+ }
+ else f->other_attr[o++] = f->attrs[i];
+ }
+
+ if (f->nmeta_attr == 0) { free(f->meta_attr); f->meta_attr = NULL; }
+ if (f->nother_attr == 0) { free(f->other_attr); f->other_attr = NULL; }
if (f->meta_attr) f->meta_attr[m] = NULL;
if (f->other_attr) f->other_attr[o] = NULL;
+
/* the same for query records */
cnt = 0;
for (i=0; f->qry[i].attr; i++)
if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) cnt++;
- f->meta_qry = malloc((cnt+1) * sizeof *f->meta_qry);
- memset(f->meta_qry+cnt,0,sizeof *f->meta_qry);
+ f->meta_qry = cnt ? malloc((cnt+1) * sizeof *f->meta_qry) : NULL;
+ if (f->meta_qry) memset(f->meta_qry+cnt,0,sizeof *f->meta_qry);
f->nmeta_qry = cnt;
- f->other_qry = malloc((i-cnt+1) * sizeof *f->other_qry);
+ f->other_qry = i-cnt ? malloc((i-cnt+1) * sizeof *f->other_qry) : NULL;
f->nother_qry = i-cnt;
m = o = 0;
if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) memcpy(f->meta_qry+m++,f->qry+i,sizeof *f->meta_qry);
else memcpy(f->other_qry+o++,f->qry+i,sizeof *f->other_qry);
- memset(f->meta_qry+m,0,sizeof *f->meta_qry);
- memset(f->other_qry+m,0,sizeof *f->other_qry);
-
free(f->attrs); free(f->qry);
f->attrs = NULL;
f->qry = NULL;
+ if (f->meta_qry) memset(f->meta_qry+m,0,sizeof *f->meta_qry);
+ else {
+ glite_jp_error_t err;
+ err.code = EINVAL;
+ err.source = __FUNCTION__;
+ err.desc = "at least one metadata query item required";
+ ret = glite_jp_stack_error(ctx,&err);
+ goto cleanup;
+ }
+
+ if (f->other_qry) memset(f->other_qry+o,0,sizeof *f->other_qry);
+
/* extract other_qry items that are not present in other_attr */
- f->int_other_attr = o;
+ f->int_other_attr = o = f->nother_attr;
for (i=0; i<f->nother_qry; i++) {
int j;
for (j=0; j<f->int_other_attr && strcmp(f->other_attr[j],f->other_qry[i].attr); j++);
if (f->other_attr) f->other_attr[o] = NULL;
f->nother_attr = o;
- ret = glite_jppsbe_query(ctx,f->meta_qry,f->meta_attr,f,feed_query_callback);
+ meta = calloc(f->nmeta_attr+2,sizeof *meta);
+ meta[0] = GLITE_JP_ATTR_OWNER;
+ if (f->meta_attr) memcpy(meta+1,f->meta_attr,(f->nmeta_attr+1) * sizeof *meta);
+ else meta[1] = NULL;
+
+ ret = glite_jppsbe_query(ctx,f->meta_qry,meta,f,feed_query_callback);
if (!ret) ret = drain_feed(ctx,f,1);
+ else drop_jobs(f);
cleanup:
return 0;
}
+static check_fault(glite_jp_context_t ctx,struct soap *soap,int ec)
+{
+ glite_jp_error_t err;
+ char buf[1000] = "unknown fault";
+
+ switch (ec) {
+ case SOAP_OK: return 0;
+ default:
+ err.code = EIO;
+ err.source = __FUNCTION__;
+ err.desc = buf;
+ if (soap->fault) snprintf(buf,sizeof buf,"%s %s\n",
+ soap->fault->faultcode,
+ soap->fault->faultstring);
+ buf[999] = 0;
+ glite_jp_stack_error(ctx,&err);
+ return err.code;
+ }
+}
+
+
int glite_jpps_single_feed(
glite_jp_context_t ctx,
const char *feed,
int njobs,
const char *destination,
char **jobs,
+ char **owners,
glite_jp_attrval_t **attrs)
{
int i,j;
in.jobAttributes[i] = jr = malloc(sizeof *jr);
jr->jobid = jobs[i];
+ jr->owner = owners[i];
jr->__sizeattributes = jp2s_attrValues(ctx->other_soap,
attrs[i],
jr->primaryStorage = &ctx->myURL;
}
- if (soap_call___jpsrv__UpdateJobs(ctx->other_soap,destination,"",
- &in,&out
- )) {
- char buf[1000];
- err.code = EIO;
- err.source = __FUNCTION__;
- err.desc = buf;
- snprintf(buf,sizeof buf,"%s %s\n",
- ctx->other_soap->fault->faultcode,
- ctx->other_soap->fault->faultstring);
- buf[999] = 0;
- glite_jp_stack_error(ctx,&err);
- }
-
+ check_fault(ctx,ctx->other_soap,
+ soap_call___jpsrv__UpdateJobs(ctx->other_soap,destination,"", &in,&out));
for (i=0; i<njobs; i++) {
jr = in.jobAttributes[i];
attrValues_free(ctx->other_soap,jr->attributes,jr->__sizeattributes);