first implementation of indexed notifications
authorAleš Křenek <ljocha@ics.muni.cz>
Wed, 21 Nov 2007 12:59:18 +0000 (12:59 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Wed, 21 Nov 2007 12:59:18 +0000 (12:59 +0000)
- roughly works
- no configuration tools yet

org.glite.lb.server/config/glite-lb-dbsetup.sql
org.glite.lb.server/interface/index.h
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/index.c.T
org.glite.lb.server/src/notif_match.c
org.glite.lb.server/src/notification.c

index a533ec1..f8d5495 100644 (file)
@@ -112,7 +112,12 @@ create table notif_registrations (
        userid          char(32)        binary not null,
        conditions      mediumblob      not null,
 
-       primary key (notifid)
+       `STD_owner`     varchar(200)    null,
+       `STD_network_server`    varchar(200)    null,
+
+       primary key (notifid),
+       index (`STD_owner`);
+       index (`STD_network_server`);
 ) engine=innodb;
 
 create table notif_jobs (
index 01341ba..761465c 100644 (file)
@@ -6,6 +6,7 @@
 #include "glite/lb/query_rec.h"
 
 int edg_wll_QueryJobIndices(edg_wll_Context,edg_wll_QueryRec ***,char ***);
+int edg_wll_QueryNotifIndices(edg_wll_Context,edg_wll_QueryRec ***,char ***);
 int edg_wll_ColumnToQueryRec(const char *,edg_wll_QueryRec *);
 char * edg_wll_QueryRecToColumn(const edg_wll_QueryRec *);
 char * edg_wll_QueryRecToColumnExt(const edg_wll_QueryRec *);
@@ -15,6 +16,9 @@ int edg_wll_DumpIndexConfig(edg_wll_Context,const char *,edg_wll_QueryRec * cons
 
 int edg_wll_CmpColumn(const edg_wll_QueryRec *,const edg_wll_QueryRec *);
 
+char *edg_wll_StatToSQL(edg_wll_JobStat const *stat,edg_wll_QueryAttr attr);
+
+
 typedef struct _edg_wll_IColumnRec {
        edg_wll_QueryRec        qrec;
        char *                  colname;
index 4e316cc..7d5b844 100644 (file)
@@ -357,8 +357,8 @@ struct clnt_data_t {
 #endif /* GLITE_LB_SERVER_WITH_WS */
        glite_lbu_DBContext     dbctx;
        int                     dbcaps;
-       edg_wll_QueryRec          **job_index;
-       edg_wll_IColumnRec         *job_index_cols;
+       edg_wll_QueryRec          **job_index,**notif_index;
+       edg_wll_IColumnRec         *job_index_cols,*notif_index_cols;;
        int                     mode;
 };
 
@@ -366,8 +366,7 @@ struct clnt_data_t {
 
 int main(int argc, char *argv[])
 {
-       int                                     fd, i;
-       int                     dtablesize;
+       int                     i;
        struct sockaddr_in      a;
        int                                     opt;
        char                            pidfile[PATH_MAX] = EDG_BKSERVERD_PIDFILE,
@@ -798,13 +797,49 @@ int main(int argc, char *argv[])
        return 0;
 }
 
+static void list_index_cols(edg_wll_QueryRec **index,edg_wll_IColumnRec **index_cols_out)
+{
+       int i,j, k, maxncol, ncol;
+       edg_wll_IColumnRec      *index_cols;
+
+       ncol = maxncol = 0;
+       for ( i = 0; index[i]; i++ )
+               for ( j = 0; index[i][j].attr; j++ )
+                       maxncol++;
+
+       index_cols = calloc(maxncol+1, sizeof(edg_wll_IColumnRec));
+       for ( i = 0; index[i]; i++ )
+       {
+               for ( j = 0; index[i][j].attr; j++)
+               {
+                       for ( k = 0;
+                                 k < ncol && edg_wll_CmpColumn(&index_cols[k].qrec, &index[i][j]);
+                                 k++);
+
+                       if ( k == ncol)
+                       {
+                               index_cols[ncol].qrec = index[i][j];
+                               if ( index[i][j].attr == EDG_WLL_QUERY_ATTR_USERTAG )
+                               {
+                                       index_cols[ncol].qrec.attr_id.tag =
+                                                       strdup(index[i][j].attr_id.tag);
+                               }
+                               index_cols[ncol].colname =
+                                               edg_wll_QueryRecToColumn(&index_cols[ncol].qrec);
+                               ncol++;
+                       }
+               }
+       }
+       index_cols[ncol].qrec.attr = EDG_WLL_QUERY_ATTR_UNDEF;
+       index_cols[ncol].colname = NULL;
+       *index_cols_out = index_cols;
+}
 
 int bk_clnt_data_init(void **data)
 {
        edg_wll_Context                 ctx;
        struct clnt_data_t         *cdata;
-       edg_wll_QueryRec          **job_index;
-       edg_wll_IColumnRec         *job_index_cols;
+       edg_wll_QueryRec          **job_index,**notif_index;
 
 
        if ( !(cdata = calloc(1, sizeof(*cdata))) )
@@ -834,46 +869,21 @@ int bk_clnt_data_init(void **data)
                free(et);
                free(ed);
        }
-       edg_wll_FreeContext(ctx);
        cdata->job_index = job_index;
+       if ( job_index ) list_index_cols(job_index,&cdata->job_index_cols);
 
-       if ( job_index )
-       {
-               int i,j, k, maxncol, ncol;
-
-               ncol = maxncol = 0;
-               for ( i = 0; job_index[i]; i++ )
-                       for ( j = 0; job_index[i][j].attr; j++ )
-                               maxncol++;
-
-               job_index_cols = calloc(maxncol+1, sizeof(edg_wll_IColumnRec));
-               for ( i = 0; job_index[i]; i++ )
-               {
-                       for ( j = 0; job_index[i][j].attr; j++)
-                       {
-                               for ( k = 0;
-                                         k < ncol && edg_wll_CmpColumn(&job_index_cols[k].qrec, &job_index[i][j]);
-                                         k++);
+       if (edg_wll_QueryNotifIndices(ctx,&notif_index,NULL)) {
+               char    *et,*ed;
+               edg_wll_Error(ctx,&et,&ed);
 
-                               if ( k == ncol)
-                               {
-                                       job_index_cols[ncol].qrec = job_index[i][j];
-                                       if ( job_index[i][j].attr == EDG_WLL_QUERY_ATTR_USERTAG )
-                                       {
-                                               job_index_cols[ncol].qrec.attr_id.tag =
-                                                               strdup(job_index[i][j].attr_id.tag);
-                                       }
-                                       job_index_cols[ncol].colname =
-                                                       edg_wll_QueryRecToColumn(&job_index_cols[ncol].qrec);
-                                       ncol++;
-                               }
-                       }
-               }
-               job_index_cols[ncol].qrec.attr = EDG_WLL_QUERY_ATTR_UNDEF;
-               job_index_cols[ncol].colname = NULL;
-               cdata->job_index_cols = job_index_cols;
+               dprintf(("[%d]: query notif indices: %s: %s\n",getpid(),et,ed));
+               free(et); free(ed);
        }
+       cdata->notif_index = notif_index;
+       if (notif_index) list_index_cols(notif_index,&cdata->notif_index_cols);
 
+       edg_wll_FreeContext(ctx);
+               
 #ifdef LB_PERF
        glite_wll_perftest_init(NULL, NULL, NULL, NULL, 0);
 #endif
@@ -933,6 +943,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
        ctx->dbcaps = cdata->dbcaps;
        ctx->job_index_cols = cdata->job_index_cols;
        ctx->job_index = cdata->job_index; 
+       ctx->notif_index_cols = cdata->notif_index_cols;
+       ctx->notif_index = cdata->notif_index; 
        
        /*      set globals
         */
index 2886dc1..61c1043 100644 (file)
@@ -5,6 +5,7 @@
 #include <errno.h>
 
 #include "glite/lb/context-int.h"
+#include "glite/lbu/trio.h"
 #include "db_supp.h"
 #include "index.h"
 
@@ -42,6 +43,33 @@ int edg_wll_QueryJobIndices(edg_wll_Context ctx,edg_wll_QueryRec *** index_out,c
 
 }
 
+int edg_wll_QueryNotifIndices(edg_wll_Context ctx,edg_wll_QueryRec *** index_out,char ***keys_out)
+{
+       static const char *built_in_indices[] = {
+               "PRIMARY",
+               NULL
+       };
+       char    **keys = NULL;
+       char    ***column_names = NULL;
+
+       if (glite_lbu_QueryIndices(ctx->dbctx, "notif_registrations", &keys, &column_names) != 0) {
+               edg_wll_SetErrorDB(ctx);
+               return EIO;
+       }
+
+       if (!keys) {
+               *index_out = NULL;
+               if (keys_out) *keys_out = NULL;
+               return edg_wll_ResetError(ctx);
+       }
+
+/* XXX: keys are passed up or freed, column_names are freed there */
+       if (parse_indices(ctx,keys,column_names,built_in_indices,index_out,keys_out)) 
+               return edg_wll_Error(ctx,NULL,NULL);
+       else return edg_wll_ResetError(ctx);
+
+}
+
 /* TODO: 
    - better error recovery (skip unrecognised indices etc.)
    - leaks memory on errors
@@ -123,22 +151,52 @@ int edg_wll_CmpColumn(const edg_wll_QueryRec *r1,const edg_wll_QueryRec *r2)
        }
 }
 
+static char *to_sql_string(edg_wll_JobStat const *stat,int offset)
+{
+       char    *out = NULL,
+               *in = *((char **) (((char *) stat) + offset));
+
+               if (in) trio_asprintf(&out,"'%|Ss'",in);
+       return out;
+}
+
+static char *to_sql_timeval(edg_wll_JobStat const *stat,int offset)
+{
+       char    *out;
+       glite_lbu_TimeToDB( ((struct timeval *) (((char *) stat) + offset))->tv_sec, &out );
+       return out;
+}
+
+static edg_wll_JobStat fakestat;
+
 static struct {
        const char      *name;
        edg_wll_QueryAttr       attr;
+       int             offset;
+       char    * (*to_sql)(edg_wll_JobStat const *,int);
 } std_attrs[] =
 {
 @@@{
        for my $n ($status->getAllFieldsOrdered) {
                my $f = selectField $status $n;
                next unless $f->{index};
-               my $u = uc getName $f;
-               gen "\t{ \"$n\", EDG_WLL_QUERY_ATTR_$u },\n";
+               my $u = uc $n;
+               gen "\t{ \"$n\", EDG_WLL_QUERY_ATTR_$u, ((char *) \&fakestat.$n) - ((char *) \&fakestat),\&to_sql_$f->{type} },\n";
        }
 @@@}
        { NULL, },
 };
 
+/* TODO: use in queries */
+char *edg_wll_StatToSQL(edg_wll_JobStat const *stat,edg_wll_QueryAttr attr)
+{
+       int     i;
+
+       for (i=0; std_attrs[i].attr && std_attrs[i].attr != attr; i++);
+
+       return std_attrs[i].attr ? std_attrs[i].to_sql(stat,std_attrs[i].offset) : (char *) -1;
+}
+
 int edg_wll_ColumnToQueryRec(const char *col_name,edg_wll_QueryRec *rec)
 {
        int     i;
index 6954152..a50a9db 100644 (file)
@@ -6,6 +6,8 @@
 #include <stdio.h>
 #include <syslog.h>
 #include <errno.h>
+#include <unistd.h>
+#include <assert.h>
 
 #include "glite/lb/context-int.h"
 #include "glite/lbu/trio.h"
@@ -15,6 +17,7 @@
 #include "query.h"
 #include "il_notification.h"
 #include "db_supp.h"
+#include "index.h"
 
 static int notif_match_conditions(edg_wll_Context,const edg_wll_JobStat *,const char *);
 static int notif_check_acl(edg_wll_Context,const edg_wll_JobStat *,const char *);
@@ -28,20 +31,44 @@ int edg_wll_NotifMatch(edg_wll_Context ctx, const edg_wll_JobStat *stat)
        glite_lbu_Statement     jobs = NULL;
        int     ret,i;
        time_t  now = time(NULL);
+       
+       char *cond_where = NULL;
 
        edg_wll_ResetError(ctx);
 
+       if (ctx->notif_index) {
+               cond_where = strdup("");
+               edg_wll_IColumnRec *notif_index_cols = ctx->notif_index_cols;
+
+               for (i=0; notif_index_cols[i].qrec.attr; i++) {
+                       char    *val = NULL;
+
+                       val = edg_wll_StatToSQL(stat,notif_index_cols[i].qrec.attr);
+                       assert(val != (char *) -1);
+
+                       if (val) {
+                               char    *aux;
+                               trio_asprintf(&aux, "%s or %s = %s",cond_where,
+                                               notif_index_cols[i].colname,val);
+                               free(cond_where);
+                               cond_where = aux;
+                               free(val);
+                       }
+               }
+       }
+
        if ( (ret = edg_wll_NotifIdCreate(ctx->srvName, ctx->srvPort, &nid)) )
        {
                edg_wll_SetError(ctx, ret, "edg_wll_NotifMatch()");
                goto err;
        }
+
        trio_asprintf(&jobq,
                "select distinct n.notifid,n.destination,n.valid,u.cert_subj,n.conditions "
                "from notif_jobs j,users u,notif_registrations n "
                "where j.notifid=n.notifid and n.userid=u.userid "
-               "   and (j.jobid = '%|Ss' or j.jobid = '%|Ss')",
-               ju = edg_wlc_JobIdGetUnique(stat->jobId),NOTIF_ALL_JOBS);
+               "   and (j.jobid = '%|Ss' or j.jobid = '%|Ss' %s)",
+               ju = edg_wlc_JobIdGetUnique(stat->jobId),NOTIF_ALL_JOBS,cond_where ? cond_where : "");
 
        free(ju);
 
@@ -94,6 +121,7 @@ int edg_wll_NotifMatch(edg_wll_Context ctx, const edg_wll_JobStat *stat)
        if (ret < 0) goto err;
        
 err:
+       free(ctx->p_instance); ctx->p_instance = NULL;
        if ( nid ) edg_wll_NotifIdFree(nid);
        free(jobq);
        glite_lbu_FreeStmt(&jobs);
index f950bba..b26e9b4 100644 (file)
@@ -14,6 +14,7 @@
 #include "il_notification.h"
 #include "query.h"
 #include "db_supp.h"
+#include "index.h"
 
 
 static char *get_user(edg_wll_Context ctx, int create);
@@ -23,6 +24,8 @@ static int split_cond_list(edg_wll_Context, edg_wll_QueryRec const * const *,
 static int update_notif(edg_wll_Context, const edg_wll_NotifId,
                                                const char *, const char *, const char *);
 
+static int get_indexed_cols(edg_wll_Context,char const *,edg_wll_QueryRec **,char **);
+
 
 int edg_wll_NotifNewServer(
        edg_wll_Context                                 ctx,
@@ -40,6 +43,7 @@ int edg_wll_NotifNewServer(
                                           *owner               = NULL,
                                          **jobs                = NULL;
        edg_wll_QueryRec  **nconds              = NULL;
+       char            *add_index = NULL;
 
 
        /*      Format notification ID
@@ -59,6 +63,7 @@ int edg_wll_NotifNewServer(
        if ( split_cond_list(ctx, conditions, &nconds, &jobs) )
                goto cleanup;
 
+
        /*
         *      encode new cond. list into a XML string
         */
@@ -112,6 +117,11 @@ int edg_wll_NotifNewServer(
        if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
                goto cleanup;
 
+       if (get_indexed_cols(ctx,nid_s,nconds,&add_index) ||
+               edg_wll_ExecSQL(ctx,add_index,NULL) < 0
+       ) goto cleanup;
+
+
        if (jobs) for ( i = 0; jobs[i]; i++ )
        {
                free(q);
@@ -154,6 +164,7 @@ cleanup:
                free(jobs);
        }
        if ( nconds ) free(nconds);
+       free(add_index);
 
        return edg_wll_Error(ctx, NULL, NULL);
 }
@@ -662,3 +673,38 @@ cleanup:
 
        return edg_wll_Error(ctx, NULL, NULL);
 }
+
+static int get_indexed_cols(edg_wll_Context ctx,char const *notif,edg_wll_QueryRec **conds,char **update_out)
+{
+       int     i,j;
+       edg_wll_IColumnRec      * notif_cols = ctx->notif_index_cols;
+       char    *cols = NULL,*aux;
+
+       for (i=0; conds[i]; i++) {
+               for (j=0; notif_cols[j].qrec.attr && notif_cols[j].qrec.attr != conds[i]->attr; j++);
+               if (notif_cols[j].qrec.attr) {
+                       if (conds[i][1].attr && conds[i][0].op != EDG_WLL_QUERY_OP_EQUAL) {
+                               char    buf[1000];
+                               sprintf(buf,"%s: indexed, only one and only `equals' condition supported",
+                                               notif_cols[j].colname);
+
+                               return edg_wll_SetError(ctx,EINVAL,buf);
+                       }
+                       trio_asprintf(&aux,"%s%c %s = '%|Ss'",
+                                       cols ? cols : "",
+                                       cols ? ',': ' ',
+                                       notif_cols[j].colname,conds[i]->value.c
+                       );
+                       free(cols);
+                       cols = aux; 
+
+               }
+       }
+       if (cols) trio_asprintf(&aux,"update notif_registrations set %s where notifid = '%s'",
+                                       cols,notif);
+       else aux = NULL;
+
+       free(cols);
+       *update_out = aux;
+       return edg_wll_ResetError(ctx);
+}