*/
#define FEED_TTL 120
+/* XXX: configurable */
+#define BATCH_FEED_SIZE 200
+
static int check_qry_item(
glite_jp_context_t ctx,
const glite_jp_query_rec_t *qry,
return 0;
}
+/* TODO: overit, ze do dalsich atributu se leze az kdyz matchuji metadata
+ * kdyby ne, stejne se to nepovede ;
+ * totez pro match_file */
+
int glite_jpps_match_attr(
glite_jp_context_t ctx,
const char *job,
abort();
}
+static int drain_feed(glite_jp_context_t ctx, struct jpfeed *f)
+{
+ /* TODO */ abort();
+}
+
static int feed_query_callback(
glite_jp_context_t ctx,
const char *job,
- const glite_jp_attrval_t attr[],
+ const glite_jp_attrval_t meta[],
void *arg)
{
+ int i,j,nout = 0;
+ glite_jp_error_t err;
struct jpfeed *f = arg;
- glite_jp_attrval_t *attrs = NULL;
+ glite_jp_attrval_t *other = NULL,*out = NULL;
+
+ memset(&err,0,sizeof err);
+ err.source = __FUNCTION__;
+ glite_jp_clear_error(ctx);
+
+/* retrieve other attributes */
+ if (glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&other)) {
+ err.code = EIO;
+ err.desc = "retrieve job attributes";
+ glite_jp_stack_error(ctx,&err);
+ goto cleanup;
+ }
+
+/* no attributes known -- can't match */
+ if (!other) goto cleanup;
+
+/* filter on non-meta query items */
+ for (i=0; i<f->nother_qry; i++) {
+ for (j=0; other[j].name; j++)
+ if (check_qry_item(ctx,f->other_qry+i,other+j)) break;
+ if (!other[j].name) goto cleanup; /* no match is not an error */
+ }
+
+/* extract attributes to be fed, stack the job for a batch feed */
+ for (i=0; meta && meta[i].name; i++)
+ for (j=0; j<f->nmeta_attr; j++)
+ if (!strcmp(meta[i].name,f->meta_attr[j])) {
+ out = realloc(out,(nout+2) * sizeof(out));
+ glite_jp_attrval_copy(out+nout,meta+i);
+ nout++;
+ }
+
+ for (i=0; other[i].name; i++)
+ for (j=0; j<f->int_other_attr; j++)
+ if (!strcmp(other[i].name,f->other_attr[j])) {
+ out = realloc(out,(nout+2) * sizeof(out));
+ glite_jp_attrval_copy(out+nout,other+i);
+ nout++;
+ }
+
+ if (nout) {
+ f->jobs = realloc(f->jobs,(f->njobs+1)*sizeof *f->jobs);
+ f->jobs[f->njobs] = strdup(job);
+ f->job_attrs = realloc(f->job_attrs,(f->njobs+1)*sizeof *f->job_attrs);
+ f->job_attrs[f->njobs] = out;
+ out = NULL;
+ f->njobs++;
+ }
+
+/* run the feed eventually */
+ if (f->njobs >= BATCH_FEED_SIZE && drain_feed(ctx,f)) {
+ err.code = EIO;
+ err.desc = "sending batch feed";
+ glite_jp_stack_error(ctx,&err);
+ }
- glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&attrs);
+cleanup:
+ for (i=0; other && other[i].name; i++) glite_jp_attrval_free(other+i,0);
+ free(other);
+
+ return err.code;
}
+
static int run_feed_deferred(glite_jp_context_t ctx,void *feed)
{
struct jpfeed *f = feed;
f->nother_attr = o;
ret = glite_jppsbe_query(ctx,f->meta_qry,f->meta_attr,f,feed_query_callback);
+ if (!ret) ret = drain_feed(ctx,f);
cleanup: