" RecordTag jobid tagname stringvalue\n"
" GetJobFiles jobid\n"
" GetJobAttr jobid attr\n"
- " FeedIndex destination query_number history continuous\n"
+ " FeedIndex \n"
" FeedIndexRefresh feedid\n"
,me);
/* OK */
}
}
-/* FIXME: new wsdl */
-#if 0
else if (!strcasecmp(argv[1],"FeedIndex")) {
- struct jpsrv__FeedIndexResponse r;
- struct jptype__Attribute *ap[2];
- struct jptype__Attributes attr = { 2, ap };
- struct jptype__PrimaryQueryElement *qp[100];
- struct jptype__PrimaryQuery qry = { 0, qp };
-
- int i,j,qi = atoi(argv[3])-1;
-
- if (argc != 6) usage(argv[0]);
-
- for (i=0; i<attr.__sizeitem; i++) ap[i] = sample_attr+i;
-
- for (i=0; sample_query[qi][i].attr; i++)
- qp[i] = &sample_query[qi][i];
- qry.__sizeitem = i;
-
- if (!check_fault(soap,soap_call_jpsrv__FeedIndex(soap,server,"",
- argv[2],&attr,&qry,!strcasecmp(argv[4],"true"),
- !strcasecmp(argv[5],"true"),
- &r)))
+ char *ap[2] = {
+ "http://egee.cesnet.cz/en/Schema/LB/Attributes:RB",
+ "http://egee.cesnet.cz/en/WSDL/jp-system:owner"
+ };
+
+ struct jptype__stringOrBlob vals[] = {
+ { "/O=CESNET/O=Masaryk University/CN=Ales Krenek", NULL },
+ { "Done", NULL }
+ };
+
+ struct jptype__primaryQuery q[] = {
+ {
+ "http://egee.cesnet.cz/en/WSDL/jp-system:owner",
+ jptype__queryOp__EQUAL,
+ NULL, vals, NULL
+ },
+ {
+ "http://egee.cesnet.cz/en/Schema/LB/Attributes:finalStatus",
+ jptype__queryOp__UNEQUAL,
+ NULL, vals+1, NULL
+ }
+ }, *qp[] = { q, q+1 };
+ struct _jpelem__FeedIndex in = {
+ "http://some.index//",
+ 2,ap,
+ 2,qp,
+ 0,
+ 1
+ };
+ struct _jpelem__FeedIndexResponse out;
+
+ if (!check_fault(soap,soap_call___jpsrv__FeedIndex(soap,server,"",&in,&out)))
{
- printf("FeedId: %s\nExpires: %s\n",r.feedId,ctime(&r.expires));
+ printf("FeedId: %s\nExpires: %s\n",out.feedId,ctime(&out.feedExpires));
}
+ }
+/* FIXME: new wsdl */
+#if 0
} else if (!strcasecmp(argv[1], "FeedIndexRefresh")) {
struct jpsrv__FeedIndexRefreshResponse r;
#include <errno.h>
#include <assert.h>
#include <fcntl.h>
+#include <signal.h>
#include "glite/jp/types.h"
#include "glite/jp/strmd5.h"
#include "file_plugin.h"
#include "builtin_plugins.h"
#include "is_client.h"
+#include "backend.h"
+
+extern pid_t master;
/*
* seconds before feed expires: should be
}
}
+/* retrieve all attributes for a feed */
+int full_feed(
+ glite_jp_context_t ctx,
+ const struct jpfeed *feed,
+ const char *job,
+ glite_jp_attrval_t **attrs)
+{
+ /* TODO: */
+ abort();
+}
+
/* XXX: limit on query size -- I'm lazy to malloc() */
#define QUERY_MAX 100
const glite_jp_attrval_t attrs[]
)
{
- int i;
+ int i,fed;
int qi[QUERY_MAX];
glite_jp_attrval_t *newattr = NULL;
}
/* matched completely */
- return glite_jpps_single_feed(ctx,feed->destination,job,attrs);
+ glite_jppsbe_check_fed(ctx,feed->id,job,&fed);
+ if (!fed) {
+ glite_jp_attrval_t *a;
+ full_feed(ctx,feed,job,&a);
+ glite_jpps_single_feed(ctx,feed->destination,job,a);
+ for (i=0; a[i].name; i++) glite_jp_attrval_free(a+i,0);
+ free(a);
+ }
+ else glite_jpps_single_feed(ctx,feed->destination,job,attrs);
return 0;
}
free(attrs);
+ if (bh) glite_jppsbe_close_file(ctx,bh);
+ free(pd);
+
for (f = ctx->feeds; f; f=f->next) {
- int k;
- glite_jp_attrval_t * fattr = malloc((nvals+1) * sizeof *fattr);
+ int k,fed;
+ glite_jp_attrval_t * fattr;
- j = 0;
- for (i=0; i<nvals; i++) for (k=0; f->attrs[k]; k++)
- if (!strcmp(f->attrs[k],vals[i].name))
- memcpy(fattr+j++,vals+i,sizeof *fattr);
+ glite_jppsbe_check_fed(ctx,f->id,job,&fed);
+ if (!fed) full_feed(ctx,f,job,&fattr);
+ else {
+ fattr = malloc((nvals+1) * sizeof *fattr);
- memset(fattr+j,0,sizeof *fattr);
+ j = 0;
+ for (i=0; i<nvals; i++) for (k=0; f->attrs[k]; k++)
+ if (!strcmp(f->attrs[k],vals[i].name))
+ memcpy(fattr+j++,vals+i,sizeof *fattr);
+
+ memset(fattr+j,0,sizeof *fattr);
+
+ }
glite_jpps_single_feed(ctx,f->destination,job,fattr);
+ if (!fed) for (i=0; fattr[i].name; i++) glite_jp_attrval_free(fattr+i,0);
free(fattr);
}
for (i=0; vals[i].name; i++) glite_jp_attrval_free(vals+i,0);
free(vals);
- if (bh) glite_jppsbe_close_file(ctx,bh);
- free(pd);
-
return 0;
}
for (i=0; f->attrs[i]; f++)
if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) cnt++;
- f->meta_attr = cnt ? malloc(cnt * sizeof *f->meta_attr) : NULL;
+ f->meta_attr = cnt ? malloc((cnt+1) * sizeof *f->meta_attr) : NULL;
+ f->meta_attr[cnt] = NULL;
f->nmeta_attr = cnt;
f->other_attr = i-cnt ? malloc((i-cnt) * sizeof *f->other_attr) : NULL;
for (i=0; f->qry[i].attr; i++)
if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) cnt++;
- f->meta_qry = malloc(cnt * sizeof *f->meta_qry);
+ f->meta_qry = malloc((cnt+1) * sizeof *f->meta_qry);
+ memset(f->meta_qry+cnt,0,sizeof *f->meta_qry);
f->nmeta_qry = cnt;
f->other_qry = malloc((i-cnt) * sizeof *f->other_qry);
f->nother_qry = i-cnt;
{
struct jpfeed *f = feed;
+/* FIXME:
+ * - volatile implementation: should store the registrations in a file
+ * and recover after restart
+ * - should communicate the data among all server slaves
+
f->next = ctx->feeds;
ctx->feeds = f;
+ */
+
+ if (glite_jppsbe_store_feed(ctx,f)) fputs(glite_jp_error_chain(ctx),stderr);
+ else kill(-master,SIGUSR1); /* gracefully terminate slaves
+ and let master restart them */
+
return 0;
}
-/* FIXME:
- * - volatile implementation: should store the registrations in a file
- * and recover after restart
- * - should communicate the data among all server slaves
- */
int glite_jpps_register_feed(
glite_jp_context_t ctx,
const char *destination,
#include "glite/jp/strmd5.h"
#include "glite/jp/known_attr.h"
#include "glite/jp/attr.h"
+#include "glite/jp/escape.h"
+#include "feed.h"
#include "tags.h"
#include "backend.h"
#include "db.h"
int glite_jppsbe_query(
glite_jp_context_t ctx,
const glite_jp_query_rec_t query[],
- const glite_jp_attrval_t metadata[],
+ char * attrs[],
void *arg,
int (*callback)(
glite_jp_context_t ctx,
char *qres[3] = { NULL, NULL, NULL };
int cmask = 0, owner_idx = -1, reg_idx = -1;
glite_jp_db_stmt_t q;
- glite_jp_attrval_t meta_clone[2];
+ glite_jp_attrval_t metadata[2];
memset(&err,0,sizeof err);
glite_jp_clear_error(ctx);
err.source = __FUNCTION__;
/* XXX: assuming not more than 2 */
- memcpy(meta_clone,metadata,sizeof meta_clone);
+ memset(metadata,0,sizeof metadata);
+
+ /* XXX: const discarding is OK */
+ for (i=0;attrs[i]; i++) metadata[i].name = (char *) attrs[i];
for (i=0; query[i].attr; i++) {
char *qitem;
while ((ret = glite_jp_db_fetchrow(q,qres)) > 0) {
if (cmask & 1) {
/* XXX: owner always first */
- meta_clone[owner_idx].value = qres[1];
+ metadata[owner_idx].value = qres[1];
qres[1] = NULL;
}
if (cmask & 2) {
int qi = cmask == 2 ? 1 : 2;
time_t t = glite_jp_db_dbtotime(qres[qi]);
- meta_clone[reg_idx].value = glite_jp_time2attr(t);
+ metadata[reg_idx].value = glite_jp_time2attr(t);
free(qres[qi]);
qres[qi] = NULL;
}
- if (callback(ctx,qres[0],meta_clone,arg)) {
+ if (callback(ctx,qres[0],metadata,arg)) {
err.code = EIO;
err.desc = qres[0];
glite_jp_stack_error(ctx,&err);
}
free(qres[0]);
- free(meta_clone[0].value);
- free(meta_clone[1].value);
- qres[0] = meta_clone[0].value = meta_clone[1].value = NULL;
+ free(metadata[0].value);
+ free(metadata[1].value);
+ qres[0] = metadata[0].value = metadata[1].value = NULL;
}
free(aux);
free(stmt);
free(qres[0]); free(qres[1]); free(qres[2]);
- free(meta_clone[0].value); free(meta_clone[1].value);
+ free(metadata[0].value); free(metadata[1].value);
if (q) glite_jp_db_freestmt(&q);
return err.code;
}
+/** mark the job as sent to this feed */
+int glite_jppsbe_set_fed(
+ glite_jp_context_t ctx,
+ const char *feed,
+ const char *job
+)
+{
+ char *stmt = NULL;
+ int rows;
+ glite_jp_error_t err;
+ memset(&err,0,sizeof err);
+
+ trio_asprintf(&stmt,"insert into fed_job(feedid,jobid) "
+ "values ('%|Ss','%|Ss')", feed,job);
+
+ if ((rows = glite_jp_db_execstmt(ctx,stmt,NULL)) < 0) {
+ err.source = __FUNCTION__;
+ err.code = EIO;
+ err.desc = "insert into fed_jobs";
+ glite_jp_stack_error(ctx,&err);
+ goto cleanup;
+ }
+
+ if (rows != 1) {
+ err.source = __FUNCTION__;
+ err.code = EIO;
+ err.desc = "inserted rows != 1";
+ glite_jp_stack_error(ctx,&err);
+ }
+
+cleanup:
+ free(stmt);
+ return err.code;
+}
+
+
+/** check whether the job has been already sent to this feed */
+int glite_jppsbe_check_fed(
+ glite_jp_context_t ctx,
+ const char *feed,
+ const char *job,
+ int *result
+)
+{
+ char *stmt = NULL;
+ int rows;
+ glite_jp_error_t err;
+ memset(&err,0,sizeof err);
+ trio_asprintf(&stmt,"select 'x' from fed_jobs "
+ "where jobid = '%|Ss' and feedid = '%|Ss'",
+ job,feed);
+
+ if ((rows = glite_jp_db_execstmt(ctx,stmt,NULL)) < 0) {
+ err.source = __FUNCTION__;
+ err.code = EIO;
+ err.desc = "select from fed_jobs";
+ glite_jp_stack_error(ctx,&err);
+ goto cleanup;
+ }
+
+ *result = rows;
+
+cleanup:
+ free(stmt);
+ return err.code;
+}
+
+
+/** store the feed to database */
+int glite_jppsbe_store_feed(
+ glite_jp_context_t ctx,
+ struct jpfeed *feed
+)
+{
+ char *stmt,*aux,*alist,*qlist,*e;
+ int i,rows;
+ glite_jp_error_t err;
+
+ memset(&err,0,sizeof err);
+
+ qlist, alist = stmt = aux = e = NULL;
+
+ for (i=0; feed->attrs[i]; i++) {
+ char *e;
+ trio_asprintf(&aux,"%s%s%s",
+ alist ? alist : "",
+ alist ? "\n" : "",
+ e = edg_wll_LogEscape(feed->attrs[i]));
+ free(e);
+ free(alist);
+ alist = aux;
+ aux = NULL;
+ }
+
+ for (i=0; feed->qry[i].attr; i++) {
+ char op,*e1,*e2 = NULL;
+
+ /* XXX */
+ assert(!feed->qry[i].binary);
+
+ switch (feed->qry[i].op) {
+ case GLITE_JP_QUERYOP_EQUAL: op = '='; break;
+ case GLITE_JP_QUERYOP_UNEQUAL: op = '!'; break;
+ case GLITE_JP_QUERYOP_LESS: op = '<'; break;
+ case GLITE_JP_QUERYOP_GREATER: op = '>'; break;
+ case GLITE_JP_QUERYOP_EXISTS: op = 'E'; break;
+ default: abort(); /* XXX */
+ }
+
+ trio_asprintf(&aux,"%s%s%s\n%c\n%s",
+ qlist ? qlist : "",
+ qlist ? "\n" : "",
+ e1 = edg_wll_LogEscape(feed->qry[i].attr),
+ op,
+ op != 'E' ? e2 = edg_wll_LogEscape(feed->qry[i].value) : "E");
+ free(e1); free(e2);
+
+ free(qlist);
+ qlist = aux;
+ aux = NULL;
+ }
+
+ trio_asprintf(&stmt,"insert into feeds(feedid,destination,expires,cols,query) "
+ "values ('%|Ss','%|Ss',%s,'%|Ss','%|Ss')",
+ feed->id,feed->destination,
+ e = glite_jp_db_timetodb(feed->expires),
+ alist,qlist);
+
+ free(alist); free(qlist); free(e);
+
+ if ((rows = glite_jp_db_execstmt(ctx,stmt,NULL)) < 0) {
+ err.source = __FUNCTION__;
+ err.code = EIO;
+ err.desc = "insert into fed_jobs";
+ glite_jp_stack_error(ctx,&err);
+ goto cleanup;
+ }
+
+ if (rows != 1) {
+ err.source = __FUNCTION__;
+ err.code = EIO;
+ err.desc = "inserted rows != 1";
+ glite_jp_stack_error(ctx,&err);
+ }
+
+cleanup:
+ free(stmt);
+ return err.code;
+
+}
+
+
+/** purge expired feeds */
+int glite_jppsbe_purge_feeds(
+ glite_jp_context_t ctx
+)
+{
+ /* TODO */
+ abort();
+}
+
+
+/** read stored feed into context */
+int glite_jppsbe_read_feeds(
+ glite_jp_context_t ctx
+)
+{
+ char *stmt,*res[5],*expires;
+ glite_jp_error_t err;
+ glite_jp_db_stmt_t q = NULL;
+ int rows;
+
+ stmt = expires = NULL;
+ memset(&err,0,sizeof err);
+ memset(&res,0,sizeof res);
+ err.source = __FUNCTION__;
+
+ expires = glite_jp_db_timetodb(time(NULL));
+ trio_asprintf(&stmt,"select feedid,destination,expires,cols,query "
+ "from feeds "
+ "where expires > %s",expires);
+ free(expires); expires = NULL;
+
+ if ((rows = glite_jp_db_execstmt(ctx, stmt, &q)) < 0) {
+ err.code = EIO;
+ err.desc = "select from feeds";
+ glite_jp_stack_error(ctx,&err);
+ goto cleanup;
+ }
+
+ while ((rows = glite_jp_db_fetchrow(q,res)) > 0) {
+ struct jpfeed *f = calloc(1,sizeof *f);
+ int n;
+ char *p;
+
+ f->id = res[0]; res[0] = NULL;
+ f->destination = res[1]; res[1] = NULL;
+ f->expires = glite_jp_db_dbtotime(res[2]); free(res[2]); res[2] = NULL;
+
+ n = 0;
+ for (p = strtok(res[3],"\n"); p; p = strtok(NULL,"\n")) {
+ f->attrs = realloc(f->attrs,(n+2) * sizeof *f->attrs);
+ f->attrs[n] = edg_wll_LogUnescape(p);
+ f->attrs[++n] = NULL;
+ }
+
+ n = 0;
+ for (p = strtok(res[4],"\n"); p; p = strtok(NULL,"\n")) {
+ f->qry = realloc(f->qry,(n+2) * sizeof *f->qry);
+ memset(&f->qry[n],0,sizeof *f->qry);
+ f->qry[n].attr = edg_wll_LogUnescape(p);
+ p = strtok(NULL,"\n");
+ switch (*p) {
+ case '=': f->qry[n].op = GLITE_JP_QUERYOP_EQUAL; break;
+ case '<': f->qry[n].op = GLITE_JP_QUERYOP_LESS; break;
+ case '>': f->qry[n].op = GLITE_JP_QUERYOP_GREATER; break;
+ case '!': f->qry[n].op = GLITE_JP_QUERYOP_UNEQUAL; break;
+ case 'E': f->qry[n].op = GLITE_JP_QUERYOP_EXISTS; break;
+ default: abort(); /* XXX: internal inconsistency */
+ }
+ p = strtok(NULL,"\n");
+ if (f->qry[n].op != GLITE_JP_QUERYOP_EXISTS)
+ f->qry[n].value = edg_wll_LogUnescape(p);
+
+ memset(&f->qry[++n],0,sizeof *f->qry);
+ }
+ f->next = ctx->feeds;
+ ctx->feeds = f;
+ }
+
+ if (rows < 0) {
+ err.code = EIO;
+ err.desc = "fetch from feeds";
+ glite_jp_stack_error(ctx,&err);
+ goto cleanup;
+ }
+
+cleanup:
+ glite_jp_db_freestmt(&q);
+ free(res[0]); free(res[1]); free(res[2]); free(res[3]); free(res[4]);
+ return err.code;
+}
+
+
+
/* XXX:
- no primary authorization yet