From 4d9651a024140d49452f5aee549a78033930f8da Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Mon, 26 Sep 2005 18:24:07 +0000 Subject: [PATCH] store into and read feeds from database --- .../config/glite-jp-primary-dbsetup.sql | 17 ++ org.glite.jp.primary/examples/jpps-test.c | 61 +++-- org.glite.jp.primary/src/attrs.c | 1 + org.glite.jp.primary/src/backend.h | 33 ++- org.glite.jp.primary/src/bones_server.c | 8 + org.glite.jp.primary/src/feed.c | 77 ++++-- org.glite.jp.primary/src/new_ftp_backend.c | 270 ++++++++++++++++++++- 7 files changed, 413 insertions(+), 54 deletions(-) diff --git a/org.glite.jp.primary/config/glite-jp-primary-dbsetup.sql b/org.glite.jp.primary/config/glite-jp-primary-dbsetup.sql index f253f1d..0e2dc70 100644 --- a/org.glite.jp.primary/config/glite-jp-primary-dbsetup.sql +++ b/org.glite.jp.primary/config/glite-jp-primary-dbsetup.sql @@ -44,3 +44,20 @@ create table users ( create table backend_info ( version char(32) binary not null ); + +create table feeds ( + feedid char(32) binary not null, + destination varchar(255) binary not null, + expires datetime not null, + cols mediumblob not null, + query mediumblob not null, + + primary key (feedid) +); + +create table fed_jobs ( + feedid char(32) binary not null, + jobid char(32) binary not null, + + primary key (jobid,feedid) +); diff --git a/org.glite.jp.primary/examples/jpps-test.c b/org.glite.jp.primary/examples/jpps-test.c index 5a9077a..23bc8ad 100644 --- a/org.glite.jp.primary/examples/jpps-test.c +++ b/org.glite.jp.primary/examples/jpps-test.c @@ -32,7 +32,7 @@ static void usage(const char *me) " RecordTag jobid tagname stringvalue\n" " GetJobFiles jobid\n" " GetJobAttr jobid attr\n" - " FeedIndex destination query_number history continuous\n" + " FeedIndex \n" " FeedIndexRefresh feedid\n" ,me); @@ -185,32 +185,45 @@ int main(int argc,char *argv[]) /* OK */ } } -/* FIXME: new wsdl */ -#if 0 else if (!strcasecmp(argv[1],"FeedIndex")) { - struct jpsrv__FeedIndexResponse r; - struct jptype__Attribute *ap[2]; - struct jptype__Attributes attr = { 2, ap }; - struct jptype__PrimaryQueryElement *qp[100]; - struct jptype__PrimaryQuery qry = { 0, qp }; - - int i,j,qi = atoi(argv[3])-1; - - if (argc != 6) usage(argv[0]); - - for (i=0; i #include "glite/security/glite_gsplugin.h" +#include "feed.h" #include "backend.h" #include "file_plugin.h" @@ -46,6 +47,8 @@ static int call_opts(glite_jp_context_t,char *,char *,int (*)(glite_jp_context_t char *glite_jp_default_namespace; +pid_t master; + int main(int argc, char *argv[]) { int one = 1,opt,i; @@ -149,6 +152,9 @@ int main(int argc, char *argv[]) /* XXX: daemonise */ + setpgrp(); /* needs for signalling */ + master = getpid(); + glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT,1); glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug); @@ -161,6 +167,8 @@ static int data_init(void **data) printf("[%d] slave started\n",getpid()); glite_jppsbe_init_slave(ctx); /* XXX: global but slave's */ + sleep(10); + if (glite_jppsbe_read_feeds(ctx)) fputs(glite_jp_error_chain(ctx),stderr); return 0; } diff --git a/org.glite.jp.primary/src/feed.c b/org.glite.jp.primary/src/feed.c index 8283ae8..b4bf9dd 100644 --- a/org.glite.jp.primary/src/feed.c +++ b/org.glite.jp.primary/src/feed.c @@ -5,6 +5,7 @@ #include #include #include +#include #include "glite/jp/types.h" #include "glite/jp/strmd5.h" @@ -12,6 +13,9 @@ #include "file_plugin.h" #include "builtin_plugins.h" #include "is_client.h" +#include "backend.h" + +extern pid_t master; /* * seconds before feed expires: should be @@ -61,6 +65,17 @@ static int check_qry_item( } } +/* retrieve all attributes for a feed */ +int full_feed( + glite_jp_context_t ctx, + const struct jpfeed *feed, + const char *job, + glite_jp_attrval_t **attrs) +{ + /* TODO: */ + abort(); +} + /* XXX: limit on query size -- I'm lazy to malloc() */ #define QUERY_MAX 100 @@ -74,7 +89,7 @@ static int match_feed( const glite_jp_attrval_t attrs[] ) { - int i; + int i,fed; int qi[QUERY_MAX]; glite_jp_attrval_t *newattr = NULL; @@ -135,7 +150,15 @@ static int match_feed( } /* matched completely */ - return glite_jpps_single_feed(ctx,feed->destination,job,attrs); + glite_jppsbe_check_fed(ctx,feed->id,job,&fed); + if (!fed) { + glite_jp_attrval_t *a; + full_feed(ctx,feed,job,&a); + glite_jpps_single_feed(ctx,feed->destination,job,a); + for (i=0; a[i].name; i++) glite_jp_attrval_free(a+i,0); + free(a); + } + else glite_jpps_single_feed(ctx,feed->destination,job,attrs); return 0; } @@ -259,26 +282,34 @@ int glite_jpps_match_file( free(attrs); + if (bh) glite_jppsbe_close_file(ctx,bh); + free(pd); + for (f = ctx->feeds; f; f=f->next) { - int k; - glite_jp_attrval_t * fattr = malloc((nvals+1) * sizeof *fattr); + int k,fed; + glite_jp_attrval_t * fattr; - j = 0; - for (i=0; iattrs[k]; k++) - if (!strcmp(f->attrs[k],vals[i].name)) - memcpy(fattr+j++,vals+i,sizeof *fattr); + glite_jppsbe_check_fed(ctx,f->id,job,&fed); + if (!fed) full_feed(ctx,f,job,&fattr); + else { + fattr = malloc((nvals+1) * sizeof *fattr); - memset(fattr+j,0,sizeof *fattr); + j = 0; + for (i=0; iattrs[k]; k++) + if (!strcmp(f->attrs[k],vals[i].name)) + memcpy(fattr+j++,vals+i,sizeof *fattr); + + memset(fattr+j,0,sizeof *fattr); + + } glite_jpps_single_feed(ctx,f->destination,job,fattr); + if (!fed) for (i=0; fattr[i].name; i++) glite_jp_attrval_free(fattr+i,0); free(fattr); } for (i=0; vals[i].name; i++) glite_jp_attrval_free(vals+i,0); free(vals); - if (bh) glite_jppsbe_close_file(ctx,bh); - free(pd); - return 0; } @@ -415,7 +446,8 @@ static int run_feed_deferred(glite_jp_context_t ctx,void *feed) for (i=0; f->attrs[i]; f++) if (glite_jppsbe_is_metadata(ctx,f->attrs[i])) cnt++; - f->meta_attr = cnt ? malloc(cnt * sizeof *f->meta_attr) : NULL; + f->meta_attr = cnt ? malloc((cnt+1) * sizeof *f->meta_attr) : NULL; + f->meta_attr[cnt] = NULL; f->nmeta_attr = cnt; f->other_attr = i-cnt ? malloc((i-cnt) * sizeof *f->other_attr) : NULL; @@ -434,7 +466,8 @@ static int run_feed_deferred(glite_jp_context_t ctx,void *feed) for (i=0; f->qry[i].attr; i++) if (glite_jppsbe_is_metadata(ctx,f->qry[i].attr)) cnt++; - f->meta_qry = malloc(cnt * sizeof *f->meta_qry); + f->meta_qry = malloc((cnt+1) * sizeof *f->meta_qry); + memset(f->meta_qry+cnt,0,sizeof *f->meta_qry); f->nmeta_qry = cnt; f->other_qry = malloc((i-cnt) * sizeof *f->other_qry); f->nother_qry = i-cnt; @@ -495,17 +528,23 @@ static int register_feed_deferred(glite_jp_context_t ctx,void *feed) { struct jpfeed *f = feed; +/* FIXME: + * - volatile implementation: should store the registrations in a file + * and recover after restart + * - should communicate the data among all server slaves + f->next = ctx->feeds; ctx->feeds = f; + */ + + if (glite_jppsbe_store_feed(ctx,f)) fputs(glite_jp_error_chain(ctx),stderr); + else kill(-master,SIGUSR1); /* gracefully terminate slaves + and let master restart them */ + return 0; } -/* FIXME: - * - volatile implementation: should store the registrations in a file - * and recover after restart - * - should communicate the data among all server slaves - */ int glite_jpps_register_feed( glite_jp_context_t ctx, const char *destination, diff --git a/org.glite.jp.primary/src/new_ftp_backend.c b/org.glite.jp.primary/src/new_ftp_backend.c index fc57771..eb68804 100644 --- a/org.glite.jp.primary/src/new_ftp_backend.c +++ b/org.glite.jp.primary/src/new_ftp_backend.c @@ -18,7 +18,9 @@ #include "glite/jp/strmd5.h" #include "glite/jp/known_attr.h" #include "glite/jp/attr.h" +#include "glite/jp/escape.h" +#include "feed.h" #include "tags.h" #include "backend.h" #include "db.h" @@ -1754,7 +1756,7 @@ error_out: int glite_jppsbe_query( glite_jp_context_t ctx, const glite_jp_query_rec_t query[], - const glite_jp_attrval_t metadata[], + char * attrs[], void *arg, int (*callback)( glite_jp_context_t ctx, @@ -1771,14 +1773,17 @@ int glite_jppsbe_query( char *qres[3] = { NULL, NULL, NULL }; int cmask = 0, owner_idx = -1, reg_idx = -1; glite_jp_db_stmt_t q; - glite_jp_attrval_t meta_clone[2]; + glite_jp_attrval_t metadata[2]; memset(&err,0,sizeof err); glite_jp_clear_error(ctx); err.source = __FUNCTION__; /* XXX: assuming not more than 2 */ - memcpy(meta_clone,metadata,sizeof meta_clone); + memset(metadata,0,sizeof metadata); + + /* XXX: const discarding is OK */ + for (i=0;attrs[i]; i++) metadata[i].name = (char *) attrs[i]; for (i=0; query[i].attr; i++) { char *qitem; @@ -1877,17 +1882,17 @@ int glite_jppsbe_query( while ((ret = glite_jp_db_fetchrow(q,qres)) > 0) { if (cmask & 1) { /* XXX: owner always first */ - meta_clone[owner_idx].value = qres[1]; + metadata[owner_idx].value = qres[1]; qres[1] = NULL; } if (cmask & 2) { int qi = cmask == 2 ? 1 : 2; time_t t = glite_jp_db_dbtotime(qres[qi]); - meta_clone[reg_idx].value = glite_jp_time2attr(t); + metadata[reg_idx].value = glite_jp_time2attr(t); free(qres[qi]); qres[qi] = NULL; } - if (callback(ctx,qres[0],meta_clone,arg)) { + if (callback(ctx,qres[0],metadata,arg)) { err.code = EIO; err.desc = qres[0]; glite_jp_stack_error(ctx,&err); @@ -1895,9 +1900,9 @@ int glite_jppsbe_query( } free(qres[0]); - free(meta_clone[0].value); - free(meta_clone[1].value); - qres[0] = meta_clone[0].value = meta_clone[1].value = NULL; + free(metadata[0].value); + free(metadata[1].value); + qres[0] = metadata[0].value = metadata[1].value = NULL; } @@ -1915,7 +1920,7 @@ cleanup: free(aux); free(stmt); free(qres[0]); free(qres[1]); free(qres[2]); - free(meta_clone[0].value); free(meta_clone[1].value); + free(metadata[0].value); free(metadata[1].value); if (q) glite_jp_db_freestmt(&q); return err.code; @@ -2003,6 +2008,251 @@ cleanup: } +/** mark the job as sent to this feed */ +int glite_jppsbe_set_fed( + glite_jp_context_t ctx, + const char *feed, + const char *job +) +{ + char *stmt = NULL; + int rows; + glite_jp_error_t err; + memset(&err,0,sizeof err); + + trio_asprintf(&stmt,"insert into fed_job(feedid,jobid) " + "values ('%|Ss','%|Ss')", feed,job); + + if ((rows = glite_jp_db_execstmt(ctx,stmt,NULL)) < 0) { + err.source = __FUNCTION__; + err.code = EIO; + err.desc = "insert into fed_jobs"; + glite_jp_stack_error(ctx,&err); + goto cleanup; + } + + if (rows != 1) { + err.source = __FUNCTION__; + err.code = EIO; + err.desc = "inserted rows != 1"; + glite_jp_stack_error(ctx,&err); + } + +cleanup: + free(stmt); + return err.code; +} + + +/** check whether the job has been already sent to this feed */ +int glite_jppsbe_check_fed( + glite_jp_context_t ctx, + const char *feed, + const char *job, + int *result +) +{ + char *stmt = NULL; + int rows; + glite_jp_error_t err; + memset(&err,0,sizeof err); + trio_asprintf(&stmt,"select 'x' from fed_jobs " + "where jobid = '%|Ss' and feedid = '%|Ss'", + job,feed); + + if ((rows = glite_jp_db_execstmt(ctx,stmt,NULL)) < 0) { + err.source = __FUNCTION__; + err.code = EIO; + err.desc = "select from fed_jobs"; + glite_jp_stack_error(ctx,&err); + goto cleanup; + } + + *result = rows; + +cleanup: + free(stmt); + return err.code; +} + + +/** store the feed to database */ +int glite_jppsbe_store_feed( + glite_jp_context_t ctx, + struct jpfeed *feed +) +{ + char *stmt,*aux,*alist,*qlist,*e; + int i,rows; + glite_jp_error_t err; + + memset(&err,0,sizeof err); + + qlist, alist = stmt = aux = e = NULL; + + for (i=0; feed->attrs[i]; i++) { + char *e; + trio_asprintf(&aux,"%s%s%s", + alist ? alist : "", + alist ? "\n" : "", + e = edg_wll_LogEscape(feed->attrs[i])); + free(e); + free(alist); + alist = aux; + aux = NULL; + } + + for (i=0; feed->qry[i].attr; i++) { + char op,*e1,*e2 = NULL; + + /* XXX */ + assert(!feed->qry[i].binary); + + switch (feed->qry[i].op) { + case GLITE_JP_QUERYOP_EQUAL: op = '='; break; + case GLITE_JP_QUERYOP_UNEQUAL: op = '!'; break; + case GLITE_JP_QUERYOP_LESS: op = '<'; break; + case GLITE_JP_QUERYOP_GREATER: op = '>'; break; + case GLITE_JP_QUERYOP_EXISTS: op = 'E'; break; + default: abort(); /* XXX */ + } + + trio_asprintf(&aux,"%s%s%s\n%c\n%s", + qlist ? qlist : "", + qlist ? "\n" : "", + e1 = edg_wll_LogEscape(feed->qry[i].attr), + op, + op != 'E' ? e2 = edg_wll_LogEscape(feed->qry[i].value) : "E"); + free(e1); free(e2); + + free(qlist); + qlist = aux; + aux = NULL; + } + + trio_asprintf(&stmt,"insert into feeds(feedid,destination,expires,cols,query) " + "values ('%|Ss','%|Ss',%s,'%|Ss','%|Ss')", + feed->id,feed->destination, + e = glite_jp_db_timetodb(feed->expires), + alist,qlist); + + free(alist); free(qlist); free(e); + + if ((rows = glite_jp_db_execstmt(ctx,stmt,NULL)) < 0) { + err.source = __FUNCTION__; + err.code = EIO; + err.desc = "insert into fed_jobs"; + glite_jp_stack_error(ctx,&err); + goto cleanup; + } + + if (rows != 1) { + err.source = __FUNCTION__; + err.code = EIO; + err.desc = "inserted rows != 1"; + glite_jp_stack_error(ctx,&err); + } + +cleanup: + free(stmt); + return err.code; + +} + + +/** purge expired feeds */ +int glite_jppsbe_purge_feeds( + glite_jp_context_t ctx +) +{ + /* TODO */ + abort(); +} + + +/** read stored feed into context */ +int glite_jppsbe_read_feeds( + glite_jp_context_t ctx +) +{ + char *stmt,*res[5],*expires; + glite_jp_error_t err; + glite_jp_db_stmt_t q = NULL; + int rows; + + stmt = expires = NULL; + memset(&err,0,sizeof err); + memset(&res,0,sizeof res); + err.source = __FUNCTION__; + + expires = glite_jp_db_timetodb(time(NULL)); + trio_asprintf(&stmt,"select feedid,destination,expires,cols,query " + "from feeds " + "where expires > %s",expires); + free(expires); expires = NULL; + + if ((rows = glite_jp_db_execstmt(ctx, stmt, &q)) < 0) { + err.code = EIO; + err.desc = "select from feeds"; + glite_jp_stack_error(ctx,&err); + goto cleanup; + } + + while ((rows = glite_jp_db_fetchrow(q,res)) > 0) { + struct jpfeed *f = calloc(1,sizeof *f); + int n; + char *p; + + f->id = res[0]; res[0] = NULL; + f->destination = res[1]; res[1] = NULL; + f->expires = glite_jp_db_dbtotime(res[2]); free(res[2]); res[2] = NULL; + + n = 0; + for (p = strtok(res[3],"\n"); p; p = strtok(NULL,"\n")) { + f->attrs = realloc(f->attrs,(n+2) * sizeof *f->attrs); + f->attrs[n] = edg_wll_LogUnescape(p); + f->attrs[++n] = NULL; + } + + n = 0; + for (p = strtok(res[4],"\n"); p; p = strtok(NULL,"\n")) { + f->qry = realloc(f->qry,(n+2) * sizeof *f->qry); + memset(&f->qry[n],0,sizeof *f->qry); + f->qry[n].attr = edg_wll_LogUnescape(p); + p = strtok(NULL,"\n"); + switch (*p) { + case '=': f->qry[n].op = GLITE_JP_QUERYOP_EQUAL; break; + case '<': f->qry[n].op = GLITE_JP_QUERYOP_LESS; break; + case '>': f->qry[n].op = GLITE_JP_QUERYOP_GREATER; break; + case '!': f->qry[n].op = GLITE_JP_QUERYOP_UNEQUAL; break; + case 'E': f->qry[n].op = GLITE_JP_QUERYOP_EXISTS; break; + default: abort(); /* XXX: internal inconsistency */ + } + p = strtok(NULL,"\n"); + if (f->qry[n].op != GLITE_JP_QUERYOP_EXISTS) + f->qry[n].value = edg_wll_LogUnescape(p); + + memset(&f->qry[++n],0,sizeof *f->qry); + } + f->next = ctx->feeds; + ctx->feeds = f; + } + + if (rows < 0) { + err.code = EIO; + err.desc = "fetch from feeds"; + glite_jp_stack_error(ctx,&err); + goto cleanup; + } + +cleanup: + glite_jp_db_freestmt(&q); + free(res[0]); free(res[1]); free(res[2]); free(res[3]); free(res[4]); + return err.code; +} + + + /* XXX: - no primary authorization yet -- 1.8.2.3