return str2md5base64(buf);
}
+static struct jpfeed *make_jpfeed(
+ const char *destination,
+ char const *const *attrs,
+ const glite_jp_query_rec_t *qry,
+ char *id,
+ time_t expires)
+{
+ int i;
+ struct jpfeed *f = calloc(1,sizeof *f);
+
+ f->id = id ? strdup(id) : NULL;
+ f->destination = strdup(destination);
+ f->expires = expires;
+ for (i=0; attrs[i]; i++) {
+ f->attrs = realloc(f->attrs,(i+2) * sizeof *f->attrs);
+ f->attrs[i] = strdup(attrs[i]);
+ f->attrs[i+1] = NULL;
+ }
+ for (i=0; qry[i].attr; i++) {
+ f->qry = realloc(f->qry,(i+2) * sizeof *f->qry);
+ glite_jp_queryrec_copy(f->qry+i,qry+i);
+ memset(f->qry+i+1,0,sizeof *f->qry);
+ }
+
+ return f;
+}
+
+void jpfeed_free(struct jpfeed *f)
+{
+ /* TODO */
+ abort();
+}
+
+static int feed_query_callback(
+ glite_jp_context_t ctx,
+ const char *job,
+ const glite_jp_attrval_t attr[],
+ void *arg)
+{
+ struct jpfeed *f = arg;
+ glite_jp_attrval_t *attrs = NULL;
+
+ glite_jpps_get_attrs(ctx,job,f->other_attr,f->nother_attr,&attrs);
+}
+
+static int run_feed_deferred(glite_jp_context_t ctx,void *feed)
+{
+ struct jpfeed *f = feed;
+ int i,m,o,cnt,ret = 0;
+
+ glite_jp_clear_error(ctx);
+/* count "meta" attributes */
+ cnt = 0;
+ for (i=0; f->attrs[i]; f++)
+ if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) cnt++;
+
+ f->meta_attr = cnt ? malloc(cnt * sizeof *f->meta_attr) : NULL;
+ f->nmeta_attr = cnt;
+
+ f->other_attr = i-cnt ? malloc((i-cnt) * sizeof *f->other_attr) : NULL;
+
+/* sort attributes to "meta" and others */
+ m = o = 0;
+ for (i=0; f->attrs[i]; i++)
+ if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) f->meta_attr[m++] = f->attrs[i];
+ else f->other_attr[o++] = f->attrs[i];
+
+ if (f->meta_attr) f->meta_attr[m] = NULL;
+ if (f->other_attr) f->other_attr[o] = NULL;
+
+/* the same for query records */
+ cnt = 0;
+ for (i=0; f->qry[i].attr; i++)
+ if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) cnt++;
+
+ f->meta_qry = malloc(cnt * sizeof *f->meta_qry);
+ f->nmeta_qry = cnt;
+ f->other_qry = malloc((i-cnt) * sizeof *f->other_qry);
+ f->nother_qry = i-cnt;
+
+ m = o = 0;
+ for (i=0; f->qry[i].attr; i++)
+ if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) memcpy(f->meta_qry+m++,f->qry+i,sizeof *f->meta_qry);
+ else memcpy(f->other_qry+o++,f->qry+i,sizeof *f->other_qry);
+
+ memset(f->meta_qry+m,0,sizeof *f->meta_qry);
+ memset(f->other_qry+m,0,sizeof *f->other_qry);
+
+ free(f->attrs); free(f->qry);
+ f->attrs = NULL;
+ f->qry = NULL;
+
+/* extract other_qry items that are not present in other_attr */
+ f->int_other_attr = o;
+ for (i=0; i<f->nother_qry; i++) {
+ int j;
+ for (j=0; j<f->int_other_attr && strcmp(f->other_attr[j],f->other_qry[i].attr); j++);
+ if (j == f->int_other_attr) {
+ f->other_attr = realloc(f->other_attr,(o+1) * sizeof *f->other_attr);
+ f->other_attr[o++] = strdup(f->other_qry[i].attr);
+ }
+ }
+ if (f->other_attr) f->other_attr[o] = NULL;
+ f->nother_attr = o;
+
+ ret = glite_jppsbe_query(ctx,f->meta_qry,f->meta_attr,f,feed_query_callback);
+
+cleanup:
+
+ jpfeed_free(f);
+ return ret;
+}
int glite_jpps_run_feed(
glite_jp_context_t ctx,
const glite_jp_query_rec_t *qry,
char **feed_id)
{
+ struct jpfeed *f;
+
fprintf(stderr,"%s: \n",__FUNCTION__);
+ if (!*feed_id) *feed_id = generate_feedid();
+
+ f = make_jpfeed(destination,attrs,qry,*feed_id,(time_t) 0);
+ glite_jp_add_deferred(ctx,run_feed_deferred,f);
+
return 0;
}
return 0;
}
+
/* FIXME:
* - volatile implementation: should store the registrations in a file
* and recover after restart
char **feed_id,
time_t *expires)
{
- int i;
- struct jpfeed *f = calloc(1,sizeof *f);
+ struct jpfeed *f;
if (!*feed_id) *feed_id = generate_feedid();
time(expires); *expires += FEED_TTL;
- f->id = strdup(*feed_id);
- f->destination = strdup(destination);
- f->expires = *expires;
- for (i=0; attrs[i]; i++) {
- f->attrs = realloc(f->attrs,(i+2) * sizeof *f->attrs);
- f->attrs[i] = strdup(attrs[i]);
- f->attrs[i+1] = NULL;
- }
- for (i=0; qry[i].attr; i++) {
- f->qry = realloc(f->qry,(i+2) * sizeof *f->qry);
- glite_jp_queryrec_copy(f->qry+i,qry+i);
- memset(f->qry+i+1,0,sizeof *f->qry);
- }
-
+ f = make_jpfeed(destination,attrs,qry,*feed_id,*expires);
glite_jp_add_deferred(ctx,register_feed_deferred,f);
return 0;