From: Aleš Křenek Date: Wed, 21 Nov 2007 12:59:18 +0000 (+0000) Subject: first implementation of indexed notifications X-Git-Tag: glite-yaim-lb_R_4_0_1_1~31 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=435d43b64277a029c4a68f4e1892242cbe204b84;p=jra1mw.git first implementation of indexed notifications - roughly works - no configuration tools yet --- diff --git a/org.glite.lb.server/config/glite-lb-dbsetup.sql b/org.glite.lb.server/config/glite-lb-dbsetup.sql index a533ec1..f8d5495 100644 --- a/org.glite.lb.server/config/glite-lb-dbsetup.sql +++ b/org.glite.lb.server/config/glite-lb-dbsetup.sql @@ -112,7 +112,12 @@ create table notif_registrations ( userid char(32) binary not null, conditions mediumblob not null, - primary key (notifid) + `STD_owner` varchar(200) null, + `STD_network_server` varchar(200) null, + + primary key (notifid), + index (`STD_owner`); + index (`STD_network_server`); ) engine=innodb; create table notif_jobs ( diff --git a/org.glite.lb.server/interface/index.h b/org.glite.lb.server/interface/index.h index 01341ba..761465c 100644 --- a/org.glite.lb.server/interface/index.h +++ b/org.glite.lb.server/interface/index.h @@ -6,6 +6,7 @@ #include "glite/lb/query_rec.h" int edg_wll_QueryJobIndices(edg_wll_Context,edg_wll_QueryRec ***,char ***); +int edg_wll_QueryNotifIndices(edg_wll_Context,edg_wll_QueryRec ***,char ***); int edg_wll_ColumnToQueryRec(const char *,edg_wll_QueryRec *); char * edg_wll_QueryRecToColumn(const edg_wll_QueryRec *); char * edg_wll_QueryRecToColumnExt(const edg_wll_QueryRec *); @@ -15,6 +16,9 @@ int edg_wll_DumpIndexConfig(edg_wll_Context,const char *,edg_wll_QueryRec * cons int edg_wll_CmpColumn(const edg_wll_QueryRec *,const edg_wll_QueryRec *); +char *edg_wll_StatToSQL(edg_wll_JobStat const *stat,edg_wll_QueryAttr attr); + + typedef struct _edg_wll_IColumnRec { edg_wll_QueryRec qrec; char * colname; diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index 4e316cc..7d5b844 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -357,8 +357,8 @@ struct clnt_data_t { #endif /* GLITE_LB_SERVER_WITH_WS */ glite_lbu_DBContext dbctx; int dbcaps; - edg_wll_QueryRec **job_index; - edg_wll_IColumnRec *job_index_cols; + edg_wll_QueryRec **job_index,**notif_index; + edg_wll_IColumnRec *job_index_cols,*notif_index_cols;; int mode; }; @@ -366,8 +366,7 @@ struct clnt_data_t { int main(int argc, char *argv[]) { - int fd, i; - int dtablesize; + int i; struct sockaddr_in a; int opt; char pidfile[PATH_MAX] = EDG_BKSERVERD_PIDFILE, @@ -798,13 +797,49 @@ int main(int argc, char *argv[]) return 0; } +static void list_index_cols(edg_wll_QueryRec **index,edg_wll_IColumnRec **index_cols_out) +{ + int i,j, k, maxncol, ncol; + edg_wll_IColumnRec *index_cols; + + ncol = maxncol = 0; + for ( i = 0; index[i]; i++ ) + for ( j = 0; index[i][j].attr; j++ ) + maxncol++; + + index_cols = calloc(maxncol+1, sizeof(edg_wll_IColumnRec)); + for ( i = 0; index[i]; i++ ) + { + for ( j = 0; index[i][j].attr; j++) + { + for ( k = 0; + k < ncol && edg_wll_CmpColumn(&index_cols[k].qrec, &index[i][j]); + k++); + + if ( k == ncol) + { + index_cols[ncol].qrec = index[i][j]; + if ( index[i][j].attr == EDG_WLL_QUERY_ATTR_USERTAG ) + { + index_cols[ncol].qrec.attr_id.tag = + strdup(index[i][j].attr_id.tag); + } + index_cols[ncol].colname = + edg_wll_QueryRecToColumn(&index_cols[ncol].qrec); + ncol++; + } + } + } + index_cols[ncol].qrec.attr = EDG_WLL_QUERY_ATTR_UNDEF; + index_cols[ncol].colname = NULL; + *index_cols_out = index_cols; +} int bk_clnt_data_init(void **data) { edg_wll_Context ctx; struct clnt_data_t *cdata; - edg_wll_QueryRec **job_index; - edg_wll_IColumnRec *job_index_cols; + edg_wll_QueryRec **job_index,**notif_index; if ( !(cdata = calloc(1, sizeof(*cdata))) ) @@ -834,46 +869,21 @@ int bk_clnt_data_init(void **data) free(et); free(ed); } - edg_wll_FreeContext(ctx); cdata->job_index = job_index; + if ( job_index ) list_index_cols(job_index,&cdata->job_index_cols); - if ( job_index ) - { - int i,j, k, maxncol, ncol; - - ncol = maxncol = 0; - for ( i = 0; job_index[i]; i++ ) - for ( j = 0; job_index[i][j].attr; j++ ) - maxncol++; - - job_index_cols = calloc(maxncol+1, sizeof(edg_wll_IColumnRec)); - for ( i = 0; job_index[i]; i++ ) - { - for ( j = 0; job_index[i][j].attr; j++) - { - for ( k = 0; - k < ncol && edg_wll_CmpColumn(&job_index_cols[k].qrec, &job_index[i][j]); - k++); + if (edg_wll_QueryNotifIndices(ctx,¬if_index,NULL)) { + char *et,*ed; + edg_wll_Error(ctx,&et,&ed); - if ( k == ncol) - { - job_index_cols[ncol].qrec = job_index[i][j]; - if ( job_index[i][j].attr == EDG_WLL_QUERY_ATTR_USERTAG ) - { - job_index_cols[ncol].qrec.attr_id.tag = - strdup(job_index[i][j].attr_id.tag); - } - job_index_cols[ncol].colname = - edg_wll_QueryRecToColumn(&job_index_cols[ncol].qrec); - ncol++; - } - } - } - job_index_cols[ncol].qrec.attr = EDG_WLL_QUERY_ATTR_UNDEF; - job_index_cols[ncol].colname = NULL; - cdata->job_index_cols = job_index_cols; + dprintf(("[%d]: query notif indices: %s: %s\n",getpid(),et,ed)); + free(et); free(ed); } + cdata->notif_index = notif_index; + if (notif_index) list_index_cols(notif_index,&cdata->notif_index_cols); + edg_wll_FreeContext(ctx); + #ifdef LB_PERF glite_wll_perftest_init(NULL, NULL, NULL, NULL, 0); #endif @@ -933,6 +943,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) ctx->dbcaps = cdata->dbcaps; ctx->job_index_cols = cdata->job_index_cols; ctx->job_index = cdata->job_index; + ctx->notif_index_cols = cdata->notif_index_cols; + ctx->notif_index = cdata->notif_index; /* set globals */ diff --git a/org.glite.lb.server/src/index.c.T b/org.glite.lb.server/src/index.c.T index 2886dc1..61c1043 100644 --- a/org.glite.lb.server/src/index.c.T +++ b/org.glite.lb.server/src/index.c.T @@ -5,6 +5,7 @@ #include #include "glite/lb/context-int.h" +#include "glite/lbu/trio.h" #include "db_supp.h" #include "index.h" @@ -42,6 +43,33 @@ int edg_wll_QueryJobIndices(edg_wll_Context ctx,edg_wll_QueryRec *** index_out,c } +int edg_wll_QueryNotifIndices(edg_wll_Context ctx,edg_wll_QueryRec *** index_out,char ***keys_out) +{ + static const char *built_in_indices[] = { + "PRIMARY", + NULL + }; + char **keys = NULL; + char ***column_names = NULL; + + if (glite_lbu_QueryIndices(ctx->dbctx, "notif_registrations", &keys, &column_names) != 0) { + edg_wll_SetErrorDB(ctx); + return EIO; + } + + if (!keys) { + *index_out = NULL; + if (keys_out) *keys_out = NULL; + return edg_wll_ResetError(ctx); + } + +/* XXX: keys are passed up or freed, column_names are freed there */ + if (parse_indices(ctx,keys,column_names,built_in_indices,index_out,keys_out)) + return edg_wll_Error(ctx,NULL,NULL); + else return edg_wll_ResetError(ctx); + +} + /* TODO: - better error recovery (skip unrecognised indices etc.) - leaks memory on errors @@ -123,22 +151,52 @@ int edg_wll_CmpColumn(const edg_wll_QueryRec *r1,const edg_wll_QueryRec *r2) } } +static char *to_sql_string(edg_wll_JobStat const *stat,int offset) +{ + char *out = NULL, + *in = *((char **) (((char *) stat) + offset)); + + if (in) trio_asprintf(&out,"'%|Ss'",in); + return out; +} + +static char *to_sql_timeval(edg_wll_JobStat const *stat,int offset) +{ + char *out; + glite_lbu_TimeToDB( ((struct timeval *) (((char *) stat) + offset))->tv_sec, &out ); + return out; +} + +static edg_wll_JobStat fakestat; + static struct { const char *name; edg_wll_QueryAttr attr; + int offset; + char * (*to_sql)(edg_wll_JobStat const *,int); } std_attrs[] = { @@@{ for my $n ($status->getAllFieldsOrdered) { my $f = selectField $status $n; next unless $f->{index}; - my $u = uc getName $f; - gen "\t{ \"$n\", EDG_WLL_QUERY_ATTR_$u },\n"; + my $u = uc $n; + gen "\t{ \"$n\", EDG_WLL_QUERY_ATTR_$u, ((char *) \&fakestat.$n) - ((char *) \&fakestat),\&to_sql_$f->{type} },\n"; } @@@} { NULL, }, }; +/* TODO: use in queries */ +char *edg_wll_StatToSQL(edg_wll_JobStat const *stat,edg_wll_QueryAttr attr) +{ + int i; + + for (i=0; std_attrs[i].attr && std_attrs[i].attr != attr; i++); + + return std_attrs[i].attr ? std_attrs[i].to_sql(stat,std_attrs[i].offset) : (char *) -1; +} + int edg_wll_ColumnToQueryRec(const char *col_name,edg_wll_QueryRec *rec) { int i; diff --git a/org.glite.lb.server/src/notif_match.c b/org.glite.lb.server/src/notif_match.c index 6954152..a50a9db 100644 --- a/org.glite.lb.server/src/notif_match.c +++ b/org.glite.lb.server/src/notif_match.c @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "glite/lb/context-int.h" #include "glite/lbu/trio.h" @@ -15,6 +17,7 @@ #include "query.h" #include "il_notification.h" #include "db_supp.h" +#include "index.h" static int notif_match_conditions(edg_wll_Context,const edg_wll_JobStat *,const char *); static int notif_check_acl(edg_wll_Context,const edg_wll_JobStat *,const char *); @@ -28,20 +31,44 @@ int edg_wll_NotifMatch(edg_wll_Context ctx, const edg_wll_JobStat *stat) glite_lbu_Statement jobs = NULL; int ret,i; time_t now = time(NULL); + + char *cond_where = NULL; edg_wll_ResetError(ctx); + if (ctx->notif_index) { + cond_where = strdup(""); + edg_wll_IColumnRec *notif_index_cols = ctx->notif_index_cols; + + for (i=0; notif_index_cols[i].qrec.attr; i++) { + char *val = NULL; + + val = edg_wll_StatToSQL(stat,notif_index_cols[i].qrec.attr); + assert(val != (char *) -1); + + if (val) { + char *aux; + trio_asprintf(&aux, "%s or %s = %s",cond_where, + notif_index_cols[i].colname,val); + free(cond_where); + cond_where = aux; + free(val); + } + } + } + if ( (ret = edg_wll_NotifIdCreate(ctx->srvName, ctx->srvPort, &nid)) ) { edg_wll_SetError(ctx, ret, "edg_wll_NotifMatch()"); goto err; } + trio_asprintf(&jobq, "select distinct n.notifid,n.destination,n.valid,u.cert_subj,n.conditions " "from notif_jobs j,users u,notif_registrations n " "where j.notifid=n.notifid and n.userid=u.userid " - " and (j.jobid = '%|Ss' or j.jobid = '%|Ss')", - ju = edg_wlc_JobIdGetUnique(stat->jobId),NOTIF_ALL_JOBS); + " and (j.jobid = '%|Ss' or j.jobid = '%|Ss' %s)", + ju = edg_wlc_JobIdGetUnique(stat->jobId),NOTIF_ALL_JOBS,cond_where ? cond_where : ""); free(ju); @@ -94,6 +121,7 @@ int edg_wll_NotifMatch(edg_wll_Context ctx, const edg_wll_JobStat *stat) if (ret < 0) goto err; err: + free(ctx->p_instance); ctx->p_instance = NULL; if ( nid ) edg_wll_NotifIdFree(nid); free(jobq); glite_lbu_FreeStmt(&jobs); diff --git a/org.glite.lb.server/src/notification.c b/org.glite.lb.server/src/notification.c index f950bba..b26e9b4 100644 --- a/org.glite.lb.server/src/notification.c +++ b/org.glite.lb.server/src/notification.c @@ -14,6 +14,7 @@ #include "il_notification.h" #include "query.h" #include "db_supp.h" +#include "index.h" static char *get_user(edg_wll_Context ctx, int create); @@ -23,6 +24,8 @@ static int split_cond_list(edg_wll_Context, edg_wll_QueryRec const * const *, static int update_notif(edg_wll_Context, const edg_wll_NotifId, const char *, const char *, const char *); +static int get_indexed_cols(edg_wll_Context,char const *,edg_wll_QueryRec **,char **); + int edg_wll_NotifNewServer( edg_wll_Context ctx, @@ -40,6 +43,7 @@ int edg_wll_NotifNewServer( *owner = NULL, **jobs = NULL; edg_wll_QueryRec **nconds = NULL; + char *add_index = NULL; /* Format notification ID @@ -59,6 +63,7 @@ int edg_wll_NotifNewServer( if ( split_cond_list(ctx, conditions, &nconds, &jobs) ) goto cleanup; + /* * encode new cond. list into a XML string */ @@ -112,6 +117,11 @@ int edg_wll_NotifNewServer( if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) goto cleanup; + if (get_indexed_cols(ctx,nid_s,nconds,&add_index) || + edg_wll_ExecSQL(ctx,add_index,NULL) < 0 + ) goto cleanup; + + if (jobs) for ( i = 0; jobs[i]; i++ ) { free(q); @@ -154,6 +164,7 @@ cleanup: free(jobs); } if ( nconds ) free(nconds); + free(add_index); return edg_wll_Error(ctx, NULL, NULL); } @@ -662,3 +673,38 @@ cleanup: return edg_wll_Error(ctx, NULL, NULL); } + +static int get_indexed_cols(edg_wll_Context ctx,char const *notif,edg_wll_QueryRec **conds,char **update_out) +{ + int i,j; + edg_wll_IColumnRec * notif_cols = ctx->notif_index_cols; + char *cols = NULL,*aux; + + for (i=0; conds[i]; i++) { + for (j=0; notif_cols[j].qrec.attr && notif_cols[j].qrec.attr != conds[i]->attr; j++); + if (notif_cols[j].qrec.attr) { + if (conds[i][1].attr && conds[i][0].op != EDG_WLL_QUERY_OP_EQUAL) { + char buf[1000]; + sprintf(buf,"%s: indexed, only one and only `equals' condition supported", + notif_cols[j].colname); + + return edg_wll_SetError(ctx,EINVAL,buf); + } + trio_asprintf(&aux,"%s%c %s = '%|Ss'", + cols ? cols : "", + cols ? ',': ' ', + notif_cols[j].colname,conds[i]->value.c + ); + free(cols); + cols = aux; + + } + } + if (cols) trio_asprintf(&aux,"update notif_registrations set %s where notifid = '%s'", + cols,notif); + else aux = NULL; + + free(cols); + *update_out = aux; + return edg_wll_ResetError(ctx); +}