start rewrite of store & auth logic; half done, dedicated branch
authorAleš Křenek <ljocha@ics.muni.cz>
Tue, 8 May 2007 08:52:04 +0000 (08:52 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Tue, 8 May 2007 08:52:04 +0000 (08:52 +0000)
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/store.c.T

index c50d946..0630f71 100644 (file)
@@ -146,6 +146,7 @@ char                                   *cadir = NULL,
                                           *server_key = NULL,
                                           *server_cert = NULL;
 
+static void *gridmap;
 
 static struct option opts[] = {
        {"cert",        1, NULL,        'c'},
@@ -184,10 +185,11 @@ static struct option opts[] = {
 #endif
        {"transactions",        1,      NULL,   'b'},
        {"greyjobs",    0,      NULL,   'g'},
+       {"grid-mapfile",        1, NULL,        'G'},
        {NULL,0,NULL,0}
 };
 
-static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:g"
+static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:gG:"
 #ifdef GLITE_LB_SERVER_WITH_WS
        "w:"
 #endif
@@ -237,7 +239,7 @@ static void usage(char *me)
                "\t--perf-sink\t where to sink events\n"
 #endif
                "\t-g,--greyjobs\t allow delayed registration (grey jobs), implies --strict-locking\n"
-
+               "\t-G,--grid-mapfile\t grid-mapfile to map X509 identities to unix users\n"
        ,me);
 }
 
@@ -248,6 +250,7 @@ static int asyn_gethostbyaddr(char **, const char *, int, int, struct timeval *)
 static int amIroot(const char *);
 static int parse_limits(char *, int *, int *, int *);
 static int check_mkdir(const char *);
+static void read_gridmap(const char *);
 
 
 /*
@@ -405,6 +408,8 @@ int main(int argc, char *argv[])
 #endif
                case 'g': greyjobs = strict_locking = 1;
                          break;
+               case 'G': read_gridmap(optarg);
+                         break;
                case '?': usage(name); return 1;
        }
 
@@ -1531,3 +1536,6 @@ static int decrement_timeout(struct timeval *timeout, struct timeval before, str
         else return(0);
 }
 
+static void read_gridmap(const char *fname)
+{
+}
index 052bb62..7b63b47 100644 (file)
 #include <regex.h>
 #include <syslog.h>
 
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <search.h>
+
 #include "glite/lb/producer.h"
 #include "glite/lb/context-int.h"
 #include "glite/lb/trio.h"
@@ -93,6 +99,7 @@ int edg_wll_JobStatus(
        edg_wll_Stmt sh;
        int num_sub, num_f, i, ii;
 
+
        edg_wll_ResetError(ctx);
 
        string_jobid = edg_wlc_JobIdUnparse(job);
@@ -113,21 +120,34 @@ int edg_wll_JobStatus(
                return edg_wll_Error(ctx,NULL,NULL);
        }
 
+
+
        /* authorization check */
        if ( !(ctx->noAuth) &&
-           (!(ctx->peerName) ||  strcmp(ctx->peerName, jobstat.pub.owner))) {
-               intErr = (acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ);
-             if (intErr) {
-                free(string_jobid);
-                free(md5_jobid);
-                free(jobstat.pub.owner); jobstat.pub.owner = NULL;
-                if (acl) {
-                       edg_wll_FreeAcl(acl);
-                       return edg_wll_Error(ctx, NULL, NULL);
-                } else {
-                       return edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set");
-                }
-             }
+           (!(ctx->peerName) ||  strcmp(ctx->peerName, jobstat.pub.owner) )
+       ) {
+               ENTRY   gmap_search, *gmap_return;
+
+               gmap_search.key = ctx->peerName;
+               gmap_search.data = NULL;
+
+               if (!ctx->gridmap || 
+                       !hsearch_r(gmap_search,FIND,&gmap_return,ctx->gridmap) ||
+                       strcmp(jobstat.pub.owner,(const char *) gmap_return->data)
+               ) {
+                       intErr = (acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ);
+                       if (intErr) {
+                               free(string_jobid);
+                               free(md5_jobid);
+                               free(jobstat.pub.owner); jobstat.pub.owner = NULL;
+                               if (acl) {
+                                       edg_wll_FreeAcl(acl);
+                                       return edg_wll_Error(ctx, NULL, NULL);
+                               } else {
+                                       return edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set");
+                               }
+                       }
+               }
        }
 
        intErr = edg_wll_LoadIntState(ctx, job, -1 /*all events*/, &ijsp);
index a8339c1..9666b94 100644 (file)
@@ -28,6 +28,7 @@
 #include "glite/lb/trio.h"
 
 #include "store.h"
+#include "store_int.h"
 #include "get_events.h"
 #include "lbs_db.h"
 #include "lock.h"
 #include "jobstat.h"
 
 static int store_user(edg_wll_Context,const char *,const char *); 
-static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *);
+static int store_job(edg_wll_Context,const edg_wll_Event *,struct glite_lb_StoreAux *);
+static int store_job_grey(edg_wll_Context,const edg_wll_Event *,struct glite_lb_StoreAux *);
 #ifdef LB_BUF
 static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *);
 #endif
-static int store_job_grey(edg_wll_Context,const edg_wlc_JobId,time_t);
 static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int);
 static int store_seq(edg_wll_Context,edg_wll_Event *,int);
 static int check_dup(edg_wll_Context,edg_wll_Event *);
@@ -49,12 +50,176 @@ static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
 #endif
 static int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *);
 
+static int store_event(edg_wll_Context ctx,edg_wll_Event *e,struct glite_lb_StoreAux *aux);
+
 void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) {
        ctx->allowAnonymous = anon;
 }
 
 int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
 {
+       int     err = 0;
+       struct glite_lb_StoreAux        aux;
+
+       edg_wll_ResetError(ctx);
+       memset(&aux,0,sizeof aux);
+
+       aux.now_s = edg_wll_TimeToDB(ctx->event_load ? e->any.arrived.tv_sec : time(NULL));
+       aux.jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
+
+       if (e->type == EDG_WLL_EVENT_REGJOB) switch(err = store_job(ctx,e,&aux)) {
+               case 0: break;
+               case EEXIST: 
+                       if ((err = glite_lb_AuthEvent(ctx,e,&aux))) goto clean;
+               default: goto clean;
+       }
+       else switch ((err = glite_lb_AuthEvent(ctx,e,&aux))) {
+               case 0: break;
+               case ENOENT:
+                       if ((err = store_job_grey(ctx,e,&aux))) goto clean;
+                       break;
+               default: goto clean;
+       }
+
+       if ((err = store_event(ctx,e,&aux)) ||
+               (err = glite_lb_AfterRegistration(ctx,e,&aux))
+       ) goto clean;
+
+       *seq = aux.seq;
+clean:
+       /* TODO: free aux */
+       free(aux.now_s);
+       free(aux.jobid);
+       return err;
+}
+
+static int store_event(edg_wll_Context ctx,edg_wll_Event *e,struct glite_lb_StoreAux *aux)
+{
+       char    *select_max,*ssrc,*stmt;
+       int     next,err = 0;
+       edg_wll_Stmt    sh = NULL;
+
+       edg_wll_ResetError(ctx);
+
+/* obtain next event sequence number */
+       trio_asprintf(&select_max,
+               "select max(event) from events "
+               "where jobid = '%|Ss'",aux->jobid);
+
+       ssrc = edg_wll_SourceToString(e->any.source);
+
+/* try to insert (someone else may be doing the same) */
+       while (1) {
+               char    *max;
+
+               if (edg_wll_ExecStmt(ctx,select_max,&sh) < 0 ||
+                   edg_wll_FetchRow(sh,&max) < 0)
+               {
+                       err = edg_wll_Error(ctx,NULL,NULL);
+                       goto clean;
+               }
+               edg_wll_FreeStmt(&sh); 
+               
+               next = max && *max ? atoi(max)+1 : 0;
+               
+       /* 
+        * 1) when using transactions:
+        *   Store the whole event right now.
+        *
+        * 2) when not using transaction:
+        *   Store an UNDEF event first in order to prevent race condition
+        *   with readers and update event code later.
+        */
+               trio_asprintf(&stmt,
+                       "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid) "
+                       "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss')",
+                       aux->jobid,next,
+                       ctx->use_transactions ? (int) e->any.type : EDG_WLL_EVENT_UNDEF,
+                       ssrc,e->any.host,
+                       edg_wll_TimeToDB(e->any.timestamp.tv_sec),e->any.timestamp.tv_usec,
+                       aux->now_s, e->any.level,aux->userid);
+
+               if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) {
+                       if ((err = edg_wll_Error(ctx,NULL,NULL)) != EEXIST)
+                               goto clean;
+               } else break; /* successful insert */
+
+       /* we were late -- try once again */
+               free(stmt); stmt = NULL;
+       }
+
+       free(ssrc); ssrc = NULL;
+       free(select_max); select_max = NULL;
+       free(stmt); stmt = NULL;
+
+       if ((err = store_seq(ctx,e,next)) ||
+               (err = store_flesh(ctx,e,aux->jobid,next))) {
+       /* attempt to cleanup, ignore new errors */
+               char            *desc;
+               edg_wll_ErrorCode       oerr = edg_wll_Error(ctx,NULL,&desc);
+
+               edg_wll_delete_event(ctx,aux->jobid,next);
+               edg_wll_SetError(ctx,oerr,desc);
+               free(desc);
+       } else
+       if (!ctx->use_transactions) {
+       /* emulate commit, i.e. swith to a real event type to make
+        * the record valid */
+               trio_asprintf(&stmt,
+                       "update events set code=%d "
+                       "where jobid='%|Ss' and event=%d",
+                       (int) e->any.type,aux->jobid,next);
+               switch (edg_wll_ExecStmt(ctx,stmt,NULL)) {
+                       case 0: if (ctx->strict_locking)
+                                       err = edg_wll_SetError(ctx,ENOENT,"event disappeared on store while strict locking");
+                       /* purge in progres: drop the garbage, ignore errors */
+                               else {
+                                       edg_wll_delete_event(ctx,aux->jobid,next);
+                                       err = edg_wll_SetError(ctx,ENOENT,"job being purged");
+                               }
+                               break;
+                       case 1: if (ctx->strict_locking) err = 0;
+                               else {
+                       /* check whether the job is still there to prevent garbage
+                        * left while there is a concurrent purge 
+                        */
+                                       free(stmt);
+                                       trio_asprintf(&stmt,
+                                               "select 'x' from jobs where jobid='%|Ss'",
+                                               aux->jobid);
+                                       switch (edg_wll_ExecStmt(ctx,stmt,NULL)) {
+                                               case 1: break;
+                                               case 0: /* purge in progres */
+                                                       edg_wll_delete_event(ctx,aux->jobid,next);
+                                                       err = edg_wll_SetError(ctx,ENOENT,"job being purged");
+                                                       break;
+                                               default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,
+                                                               "more job records, what is that?");
+                                                       break;
+                                       }
+                               }
+                               break;
+                       case -1: err = edg_wll_Error(ctx,NULL,NULL);
+                               break;
+
+                       default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,
+                               "more event records, what is that?");
+                               break;
+               }
+       } /* if !ctx->use_transactions */
+
+       aux->seq = next;
+
+clean:
+       free(ssrc);
+       free(select_max);
+       free(stmt);
+       edg_wll_FreeStmt(&sh);
+       return err;
+}
+
+int OLD_edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
+{
        edg_wll_ErrorCode       err = 0;
        char            *userid = NULL,*jobid,*stmt;
        char            *select_max,*ssrc;