switch to transactions
authorMiloš Mulač <mulac@civ.zcu.cz>
Thu, 14 Feb 2008 10:32:08 +0000 (10:32 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Thu, 14 Feb 2008 10:32:08 +0000 (10:32 +0000)
- compiles, but not completed yet
- no tests, only for sharing

14 files changed:
org.glite.lb.server/config/glite-lb-dbsetup.sql
org.glite.lb.server/interface/store.h
org.glite.lb.server/src/db_calls.c
org.glite.lb.server/src/db_calls.h
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/jobstat.h
org.glite.lb.server/src/load.c
org.glite.lb.server/src/notification.c
org.glite.lb.server/src/purge.h
org.glite.lb.server/src/query.c
org.glite.lb.server/src/srv_purge.c
org.glite.lb.server/src/store.c.T
org.glite.lb.server/src/userjobs.c

index 530353b..7ff36cb 100644 (file)
@@ -5,22 +5,14 @@ create table jobs (
        aclid           char(32)        binary null,
        proxy           bool            not null,
        server          bool            not null,
-       
+       grey            bool            not null,
+       nevents         int             not null,
 
        primary key (jobid),
        unique (dg_jobid),
        index (userid)
 ) engine=innodb;
 
-create table grey_jobs (
-       jobid           char(32)        binary not null,
-       dg_jobid        varchar(255)    binary not null,
-       time_stamp      datetime        not null,
-
-       primary key (jobid),
-       unique (dg_jobid)
-) engine=innodb;
-
 create table users (
        userid          char(32)        binary not null,
        cert_subj       varchar(255)    binary not null,
@@ -42,6 +34,7 @@ create table events (
 
        arrived         datetime        not null,
        ulm             mediumblob      not null,   -- testing (1)
+       seqcode         varchar(255)    binary not null,
 
        primary key (jobid,event),
        index (time_stamp),
index d3ec61c..9518e00 100644 (file)
@@ -45,8 +45,13 @@ int db_store(edg_wll_Context, char *);
 int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *);
 int handle_request(edg_wll_Context,char *);
 int create_reply(const edg_wll_Context,char **);
-int is_job_local(edg_wll_Context, edg_wlc_JobId jobId);
-int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event);
+int is_job_local(edg_wll_Context, glite_jobid_const_t jobId);
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP);
+#ifndef LB_DAG_EMBRIONIC
+int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
+#endif
+int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *);
+
 
 int edg_wll_delete_event(edg_wll_Context,const char *, int);
 
index 203ac2b..970ce55 100644 (file)
@@ -9,11 +9,11 @@
 #include "glite/lb/context-int.h"
 
 #include "db_calls.h"
-# include "db_supp.h"
+#include "db_supp.h"
 
 /** Returns bitmask of job membership in common server/proxy database 
  */
-int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job)
+int edg_wll_jobMembership(edg_wll_Context ctx, glite_jobid_const_t job)
 {
         char            *dbjob;
         char            *stmt = NULL;
@@ -25,7 +25,7 @@ int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job)
 
         dbjob = edg_wlc_JobIdGetUnique(job);
 
-        trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss'",dbjob);
+        trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss' for update",dbjob);
         ret = edg_wll_ExecSQL(ctx,stmt,&q);
         if (ret <= 0) {
                 if (ret == 0) {
@@ -53,3 +53,38 @@ clean:
         free(stmt);
         return(result);
 }
+
+
+/* just lock one row corresponding to job in table jobs
+ * lock_mode: 0 = in share mode / 1 = for update
+ */
+int edg_wll_LockJobRow(edg_wll_Context ctx, glite_jobid_const_t job, int lock_mode) 
+{
+       char                    *jobid_md5 = NULL;
+       char                    *stmt = NULL;
+       glite_lbu_Statement     sh;
+       int                     nr;
+
+
+       edg_wll_ResetError(ctx);
+       jobid_md5 = edg_wlc_JobIdGetUnique(job);
+
+       if (lock_mode) 
+               trio_asprintf(&stmt, "select count(*) from jobs where jobid='%|Ss' for update", jobid_md5);
+       else
+               trio_asprintf(&stmt, "select count(*) from jobs where jobid='%|Ss' in share mode", jobid_md5);
+
+       if ((nr = edg_wll_ExecSQL(ctx,stmt,&sh)) < 0) goto cleanup;
+       if (nr == 0) {
+                edg_wll_SetError(ctx,ENOENT,"no state in DB");
+                goto cleanup;
+       }
+       
+cleanup:
+       if (sh) glite_lbu_FreeStmt(&sh);
+       free(stmt); stmt = NULL;
+       free(jobid_md5);
+
+       return edg_wll_Error(ctx, NULL, NULL);
+}
+
index 1adcbcc..71a19da 100644 (file)
@@ -6,6 +6,11 @@
 #define DB_PROXY_JOB    1
 #define DB_SERVER_JOB   2
 
-int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job);
+int edg_wll_jobMembership(edg_wll_Context ctx, glite_jobid_const_t job);
+
+#define edg_wll_LockJobRowInShareMode(X,Y) edg_wll_LockJobRow(X,Y,0)
+#define edg_wll_LockJobRowForUpdate(X,Y) edg_wll_LockJobRow(X,Y,1)
+int edg_wll_LockJobRow(edg_wll_Context ctx, glite_jobid_const_t job, int lock_mode);
+
 
 #endif /* GLITE_LB_LB_CALLS_H */
index a0d0a2f..2654e34 100644 (file)
@@ -12,9 +12,9 @@
 #include "glite/lb/lb_maildir.h"
 #include "purge.h"
 #include "store.h"
-#include "lock.h"
 #include "il_lbproxy.h"
 #include "jobstat.h"
+#include "db_supp.h"
 
 #ifdef LB_PERF
 #include "glite/lb/lb_perftest.h"
 #endif
 
 
-/* XXX */
-#define use_db 1
-
 extern int unset_proxy_flag(edg_wll_Context, edg_wlc_JobId);
 extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *);
 
-static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP);
+static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int reg_to_JP);
 
 
 int
 db_store(edg_wll_Context ctx, char *event)
 {
-  edg_wll_Event *ev;
-  int  seq, reg_to_JP = 0;
-  int   err;
-  int  local_job;
+  edg_wll_Event        *ev = NULL;
+  int                  seq, reg_to_JP = 0, local_job;
   edg_wll_JobStat      newstat;
 
 
-  ev = NULL;
-
   edg_wll_ResetError(ctx);
   memset(&newstat,0,sizeof newstat);
 
-  if(edg_wll_ParseEvent(ctx, event, &ev))
-    goto err;
+  if(edg_wll_ParseEvent(ctx, event, &ev)) goto err;
+
   local_job = is_job_local(ctx, ev->any.jobId);
 
 #ifdef LB_PERF
@@ -59,101 +52,86 @@ db_store(edg_wll_Context ctx, char *event)
   }
 #endif
 
-  if(use_db) {
-    char       *ed;
-    int                code;
-
-    if (edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
-    store_job_server_proxy(ctx, ev);
-    code = edg_wll_Error(ctx,NULL,&ed);
-    edg_wll_UnlockJob(ctx,ev->any.jobId);      /* XXX: ignore error */
-    if (code) {
-           edg_wll_SetError(ctx,code,ed);
-           free(ed);
-           goto err;
-    }
-  }
+  do {
+       if (edg_wll_Transaction(ctx)) goto err;
+
+       if (store_job_server_proxy(ctx, ev, &reg_to_JP)) goto rollback;
 
+       /* events logged to proxy and server (DIRECT flag) may be ignored on proxy
+       * if jobid prefix hostname matches server hostname -> they will
+       * sooner or later arrive to server too and are stored in common DB 
+       */
+       if (ctx->isProxy && local_job && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT)) {
+               goto commit;
+       }
 
-  /* events logged to proxy and server (DIRECT flag) may be ignored on proxy
-   * if jobid prefix hostname matches server hostname -> they will
-   * sooner or later arrive to server too and are stored in common DB 
-   */
-  if (ctx->isProxy && local_job) {
-       if  (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
-               edg_wll_FreeEvent(ev);
-               free(ev);
-               return 0;
+       if (edg_wll_StoreEvent(ctx, ev, event, &seq)) goto rollback;
+       
+       if ( ev->any.type == EDG_WLL_EVENT_CHANGEACL ) {
+               if (edg_wll_UpdateACL(ctx, ev->any.jobId,
+                       ev->changeACL.user_id, ev->changeACL.user_id_type,
+                       ev->changeACL.permission, ev->changeACL.permission_type,
+                       ev->changeACL.operation)) goto rollback;                
+                       
        }
        else {
-               /* these are re-registrations of subjobs on proxy               */
-               /* embryonic registrations does not trigger registration in JP  */
-               reg_to_JP = 1;
+#ifdef LB_PERF
+               if(sink_mode == GLITE_LB_SINK_STATE) {
+                       glite_wll_perftest_consumeEvent(ev);
+                       goto commit;
+               }
+#endif
+
+               if ( newstat.state )  { /* prevent memleaks in case of transaction retry */
+                       edg_wll_FreeStatus(&newstat);
+                       newstat.state = EDG_WLL_JOB_UNDEF;
+               }
+               if (edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, &newstat)) goto rollback;
+               
+               if (newstat.remove_from_proxy) 
+                       if (edg_wll_PurgeServerProxy(ctx, ev->any.jobId)) goto rollback;
        }
-  }
 
-  /* XXX: if event type is user tag, convert the tag name to lowercase!
-   *     (not sure whether to convert a value too is reasonable
-   *     or keep it 'case sensitive')
-   */
-  if ( ev->any.type == EDG_WLL_EVENT_USERTAG )
-  {
-       int i;
-       for ( i = 0; ev->userTag.name[i] != '\0'; i++ )
-         ev->userTag.name[i] = tolower(ev->userTag.name[i]);
-  }
-  
-  if(use_db) {
-    if (ctx->strict_locking && edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
-    if(edg_wll_StoreEvent(ctx, ev, event, &seq)) {
-       edg_wll_UnlockJob(ctx,ev->any.jobId);
-       goto err;
-    }
-  }
+commit:
+rollback:;
+  } while (edg_wll_TransNeedRetry(ctx));
+       
+  if (edg_wll_Error(ctx, NULL, NULL)) goto err;
 
-  if (!ctx->strict_locking && edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
 
-  if ( ev->any.type == EDG_WLL_EVENT_CHANGEACL ) {
-    err = edg_wll_UpdateACL(ctx, ev->any.jobId,
-                       ev->changeACL.user_id, ev->changeACL.user_id_type,
-                       ev->changeACL.permission, ev->changeACL.permission_type,
-                       ev->changeACL.operation);
+  do {
+       if (edg_wll_Transaction(ctx)) goto err;
 
-    edg_wll_UnlockJob(ctx,ev->any.jobId);
-  }
-  else {
-#ifdef LB_PERF
-    if(sink_mode == GLITE_LB_SINK_STATE) {
-            glite_wll_perftest_consumeEvent(ev);
-            edg_wll_UnlockJob(ctx,ev->any.jobId);
-            goto err;
-    }
+       if (ev->any.type == EDG_WLL_EVENT_REGJOB &&
+               (ev->regJob.jobtype == EDG_WLL_REGJOB_DAG ||
+                ev->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED ||
+                ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) &&
+               ev->regJob.nsubjobs > 0)  
+
+#ifdef LB_DAG_EMBRIONIC
+                       if (register_subjobs_embryonic(ctx,&ev->regJob)) goto rollback2;
+#else
+                       if (register_subjobs(ctx,&ev->regJob)) goto rollback2;
 #endif
 
-    err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, &newstat);
-  }
+rollback2:;
+  } while (edg_wll_TransNeedRetry(ctx));
 
-  /* XXX: in edg_wll_StepIntState() 
-   * if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err;
-   */
-  if (err) goto err;
+  if (edg_wll_Error(ctx, NULL, NULL)) goto err;
 
-  db_store_finalize(ctx, event, ev, &newstat, seq, reg_to_JP);
 
-err:
+  db_store_finalize(ctx, event, ev, &newstat, reg_to_JP);
 
-  if(ev) {
-    edg_wll_FreeEvent(ev);
-    free(ev);
-  }
 
+err:
+  if(ev) { edg_wll_FreeEvent(ev); free(ev); }
   if ( newstat.state ) edg_wll_FreeStatus(&newstat);
 
-
   return edg_wll_Error(ctx,NULL,NULL);
 }
 
 
+
 /* Called only when CollectionStateEvent generated */
 int
 db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is)
@@ -167,7 +145,7 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is)
   edg_wll_ResetError(ctx);
   memset(&newstat,0,sizeof newstat);
 
-  /* Locked from load_parent_intJobStat() */
+  /* Transaction opened from db_store */
 
 #ifdef LB_PERF
   if (sink_mode == GLITE_LB_SINK_STORE) {
@@ -181,10 +159,8 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is)
 
   assert(ev->any.user);
 
-  if(use_db) {
     if(edg_wll_StoreEvent(ctx, ev, NULL, &seq))
       goto err;
-  }
 
 #ifdef LB_PERF
   if(sink_mode == GLITE_LB_SINK_STATE) {
@@ -202,7 +178,7 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is)
     assert(event);
   }
 
-  db_store_finalize(ctx, event, ev, &newstat, seq, 0);
+  db_store_finalize(ctx, event, ev, &newstat, 0);
 
 err:
 
@@ -213,42 +189,33 @@ err:
 }
 
 
-
-static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP) 
+/* Send regitration to JP 
+ */
+static int register_to_JP(edg_wll_Context ctx, edg_wll_Event *ev)
 {
-       int     local_job = is_job_local(ctx, ev->any.jobId);
-
-
-#ifdef LB_PERF
-       if( sink_mode == GLITE_LB_SINK_SEND ) {
-               glite_wll_perftest_consumeEvent(ev);
-               return edg_wll_Error(ctx,NULL,NULL);
-       }
-#endif
+       char *jids, *msg;
        
-       /* Send regitration to JP 
-        */
-       if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0 &&
-            (!ctx->isProxy || reg_to_JP) ) {
-               char *jids, *msg;
-               
-               if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
-                       return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP");
-               }
-               if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) {
-                       free(jids);
-                       return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP");
-               }
-               strcat(msg, "\n");
-               strcat(msg, ev->any.user);
-               if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) {
-                       free(msg);
-                       return edg_wll_SetError(ctx, errno, lbm_errdesc);
-               }
+       if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
+               return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP");
+       }
+       if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) {
+               free(jids);
+               return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP");
+       }
+       strcat(msg, "\n");
+       strcat(msg, ev->any.user);
+       if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) {
                free(msg);
+               return edg_wll_SetError(ctx, errno, lbm_errdesc);
        }
+       free(msg);
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
 
 
+static int forward_event_to_server(edg_wll_Context ctx, char *event, edg_wll_Event *ev, int local_job)
+{
        if ( ctx->isProxy ) {
                /*
                 *      send event to the proper BK server
@@ -266,32 +233,41 @@ static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev
                                return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
                        }
                }
-               else {
-                       /* event will not arrive to server, only flag was set           */
-                       /* check whether some pending notifications are not triggered   */
-                       if ( newstat->state ) {
+       }
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
+static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int reg_to_JP) 
+{
+       int     local_job = is_job_local(ctx, ev->any.jobId);
+
+
+#ifdef LB_PERF
+       if( sink_mode == GLITE_LB_SINK_SEND ) {
+               glite_wll_perftest_consumeEvent(ev);
+               return edg_wll_Error(ctx,NULL,NULL);
+       }
+#endif
+       
+       if (reg_to_JP) 
+               if (register_to_JP(ctx,ev)) goto err;
+
+       if (forward_event_to_server(ctx, event, ev, local_job)) goto err;
+       
+       if (newstat->state) {
+               if ( ctx->isProxy ) {
+                       if ((ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) || local_job) 
+                               /* event will not arrive to server, only flag was set           */
+                               /* check whether some pending notifications are not triggered   */
                                edg_wll_NotifMatch(ctx, newstat);
                        }
-               }
-                       
-               /* LB proxy purge */
-               if (newstat->remove_from_proxy) {
-                               edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
-               }
-       } else 
-       {
-               /* Purge proxy flag */
-               if ( newstat->remove_from_proxy && local_job ) {
-                       if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
-                               return(edg_wll_Error(ctx,NULL,NULL));
-                       }
-               }
-
-               if ( newstat->state ) {
-                       edg_wll_NotifMatch(ctx, newstat);
+               else {
+                               edg_wll_NotifMatch(ctx, newstat);
                }
        }
-       
 
+err:
        return edg_wll_Error(ctx,NULL,NULL);
 }
index 42192dd..b02e79c 100644 (file)
 #include "lb_authz.h"
 #include "stats.h"
 #include "db_supp.h"
+#include "db_calls.h"
 
 #define DAG_ENABLE     1
 
+#define        DONT_LOCK       0
+#define        LOCK            1
+
 /* TBD: share in whole logging or workload */
 #ifdef __GNUC__
 #define UNUSED_VAR __attribute__((unused))
@@ -74,245 +78,256 @@ int edg_wll_JobStatusServer(
 {
 
 /* Local variables */
-       char            *string_jobid;
-       char            *md5_jobid;
+       char            *string_jobid = NULL;
+       char            *md5_jobid = NULL;
 
        intJobStat      jobstat;
        intJobStat      *ijsp;
-       int             intErr = 0;
-       int             lockErr;
+       int             whole_cycle;
        edg_wll_Acl     acl = NULL;
 #if DAG_ENABLE 
        char            *stmt = NULL;
 #endif
-       char            *errdesc = NULL;
-       //The following declarations have originally been positioned in the funcion's code
-       //That was rather messy and lead to redeclaratios :-(
        char *stat_str, *s_out;
        intJobStat *js;
        char *out[1];
-       glite_lbu_Statement sh;
+       glite_lbu_Statement sh = NULL;
        int num_sub, num_f, i, ii;
 
+
        edg_wll_ResetError(ctx);
 
+       jobstat.pub.owner = NULL;       
        string_jobid = edg_wlc_JobIdUnparse(job);
        if (string_jobid == NULL || stat == NULL)
                return edg_wll_SetError(ctx,EINVAL, NULL);
        md5_jobid = edg_wlc_JobIdGetUnique(job);
 
-       if ( !(jobstat.pub.owner = job_owner(ctx,md5_jobid)) ) {
-               free(md5_jobid);
-               free(string_jobid);
-               return edg_wll_Error(ctx,NULL,NULL);
-       }
+       do {
+               whole_cycle = 0;
 
-       intErr = edg_wll_GetACL(ctx, job, &acl);
-       if (intErr) {
-               free(md5_jobid);
-               free(string_jobid);
-               free(jobstat.pub.owner);
-               return edg_wll_Error(ctx,NULL,NULL);
-       }
+               if (edg_wll_Transaction(ctx)) goto rollback;
+               if (edg_wll_LockJobRowInShareMode(ctx, job)) goto rollback;;
 
-       /* authorization check */
-       if ( !(ctx->noAuth) &&
-           (!(ctx->peerName) ||  !edg_wll_gss_equal_subj(ctx->peerName, jobstat.pub.owner))) {
-               intErr = (acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ);
-             if (intErr) {
-                free(string_jobid);
-                free(md5_jobid);
-                free(jobstat.pub.owner); jobstat.pub.owner = NULL;
-                if (acl) {
-                       edg_wll_FreeAcl(acl);
-                       return edg_wll_Error(ctx, NULL, NULL);
-                } else {
-                       return edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set");
-                }
-             }
-       }
 
-       intErr = edg_wll_LoadIntState(ctx, job, -1 /*all events*/, &ijsp);
-       if (!intErr) {
-               *stat = ijsp->pub;
-               free(jobstat.pub.owner); jobstat.pub.owner = NULL;
-               destroy_intJobStat_extension(ijsp);
-               free(ijsp);
+               if (edg_wll_GetACL(ctx, job, &acl)) goto rollback;
 
-       } else {
-               lockErr = edg_wll_LockJob(ctx,job);
-               intErr = edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store && !lockErr);
-               if (intErr) edg_wll_Error(ctx, NULL, &errdesc);
-               if (!lockErr) {
-                       edg_wll_UnlockJob(ctx,job);
+               /* authorization check */
+               if ( !(ctx->noAuth) &&
+                   (!(ctx->peerName) ||  !edg_wll_gss_equal_subj(ctx->peerName, jobstat.pub.owner))) {
+                     if ((acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ)) {
+                        if (acl) {
+                               goto rollback;
+                        } else {
+                               edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set");
+                               goto rollback;
+                        }
+                     }
                }
-       
-               *stat = jobstat.pub;
-               if (intErr) edg_wll_FreeStatus(&jobstat.pub);
-               destroy_intJobStat_extension(&jobstat);
-       }
-       
-       if (intErr) {
-               free(string_jobid);
-               free(md5_jobid);
-               if (acl) edg_wll_FreeAcl(acl);
-               edg_wll_SetError(ctx, intErr, errdesc);
-               free(errdesc);
-               return edg_wll_UpdateError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, "Could not compute job status from events");
-       }
 
-       if (acl) {
-               stat->acl = strdup(acl->string);
-               edg_wll_FreeAcl(acl);
-       }
+               if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, -1 /*all events*/, &ijsp)) {
+                       *stat = ijsp->pub;
+                       free(jobstat.pub.owner); jobstat.pub.owner = NULL;
+                       destroy_intJobStat_extension(ijsp);
+                       free(ijsp);
+               } else {
+                       if (edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store)) {
+                               edg_wll_UpdateError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, "Could not compute job status from events");
+                               goto rollback;
+                       }
+                       *stat = jobstat.pub;
+               }
+               
+               if (acl) {
+                       stat->acl = strdup(acl->string);
+                       edg_wll_FreeAcl(acl);
+               }
 
-       if ((flags & EDG_WLL_STAT_CLASSADS) == 0) {
-               char *null = NULL;
+               if ((flags & EDG_WLL_STAT_CLASSADS) == 0) {
+                       char *null = NULL;
 
-               mov(stat->jdl, null);
-               mov(stat->matched_jdl, null);
-               mov(stat->condor_jdl, null);
-               mov(stat->rsl, null);
-       }
+                       mov(stat->jdl, null);
+                       mov(stat->matched_jdl, null);
+                       mov(stat->condor_jdl, null);
+                       mov(stat->rsl, null);
+               }
 
-#if DAG_ENABLE
-       if (stat->jobtype == EDG_WLL_STAT_DAG || stat->jobtype == EDG_WLL_STAT_COLLECTION) {
+       #if DAG_ENABLE
+               if (stat->jobtype == EDG_WLL_STAT_DAG || stat->jobtype == EDG_WLL_STAT_COLLECTION) {
 
-//     XXX: The users does not want any histogram. What do we do about it? 
-//             if ((!(flags & EDG_WLL_STAT_CHILDHIST_FAST))&&(!(flags & EDG_WLL_STAT_CHILDHIST_THOROUGH))) { /* No Histogram */
-//                        if (stat->children_hist != NULL) {   /* No histogram will be sent even if there was one */
-//
-//                             printf("\nNo Histogram required\n\n");
-//
-//                              free(stat->children_hist);
-//                     }
-//                     
-//             }
+       //      XXX: The users does not want any histogram. What do we do about it? 
+       //              if ((!(flags & EDG_WLL_STAT_CHILDHIST_FAST))&&(!(flags & EDG_WLL_STAT_CHILDHIST_THOROUGH))) { /* No Histogram */
+       //                        if (stat->children_hist != NULL) {    /* No histogram will be sent even if there was one */
+       //
+       //                              printf("\nNo Histogram required\n\n");
+       //
+       //                              free(stat->children_hist);
+       //                      }
+       //                      
+       //              }
 
 
-               if (flags & EDG_WLL_STAT_CHILDSTAT) {
+                       if (flags & EDG_WLL_STAT_CHILDSTAT) {
 
-                       trio_asprintf(&stmt, "SELECT int_status FROM states WHERE parent_job='%|Ss'"
-                                               " AND version='%|Ss'",
-                                       md5_jobid, INTSTAT_VERSION);
-                       if (stmt != NULL) {
-                               num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
-                               if (num_sub >=0 ) {
-                                       i = 0;
-                                       stat->children_states = calloc(num_sub+1, sizeof(edg_wll_JobStat));
-                                       if (stat->children_states == NULL) {
-                                               glite_lbu_FreeStmt(&sh);
-                                               goto dag_enomem;
-                                       }
-                                       while ((num_f = edg_wll_FetchRow(ctx, sh, 1, NULL, &stat_str)) == 1
-                                               && i < num_sub) {
-                                               js = dec_intJobStat(stat_str, &s_out);
-                                               if (s_out != NULL && js != NULL) {
-                                                       stat->children_states[i] = js->pub;
-                                                       destroy_intJobStat_extension(js);
-                                                       free(js);
-                                                       i++; // Careful, this value will also be used further
+                               trio_asprintf(&stmt, "SELECT int_status FROM states WHERE parent_job='%|Ss'"
+                                                       " AND version='%|Ss'",
+                                               md5_jobid, INTSTAT_VERSION);
+                               if (stmt != NULL) {
+                                       num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
+                                       if (num_sub >=0 ) {
+                                               i = 0;
+                                               stat->children_states = calloc(num_sub+1, sizeof(edg_wll_JobStat));
+                                               if (stat->children_states == NULL) {
+                                                       edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_states failed!");
+                                                       goto rollback;
                                                }
-                                               free(stat_str);
+                                               while ((num_f = edg_wll_FetchRow(ctx, sh, 1, NULL, &stat_str)) == 1
+                                                       && i < num_sub) {
+                                                       js = dec_intJobStat(stat_str, &s_out);
+                                                       if (s_out != NULL && js != NULL) {
+                                                               stat->children_states[i] = js->pub;
+                                                               destroy_intJobStat_extension(js);
+                                                               free(js);
+                                                               i++; // Careful, this value will also be used further
+                                                       }
+                                                       free(stat_str);
+                                               }
+                                               if (num_f < 0) goto rollback;
+
+                                               glite_lbu_FreeStmt(&sh); sh = NULL;
                                        }
-                                       glite_lbu_FreeStmt(&sh);
+                                       else goto rollback;
+
+                                       free(stmt); stmt = NULL;
+                               } else {
+                                       edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!");
+                                       goto rollback;
                                }
-                               free(stmt);
-                       } else goto dag_enomem;
-               }
+                       }
 
 
-               if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) { /* Full (thorough) Histogram */
+                       if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) { /* Full (thorough) Histogram */
 
 
-                       if (stat->children_hist == NULL) {
-                               stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
-                               stat->children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES;
-                       }
-                       else {
-                               /* If hist is loaded, it probably contain only incomplete histogram
-                                * built in update_parent_status. Count it from scratch...*/
-                               for (ii=1; ii<=EDG_WLL_NUMBER_OF_STATCODES; ii++)
-                                       stat->children_hist[ii] = 0;
-                       }
+                               if (stat->children_hist == NULL) {
+                                       stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+                                       if (stat->children_hist == NULL) {
+                                               edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_hist failed!");
+                                               goto rollback;
+                                       }
 
-                       if (flags & EDG_WLL_STAT_CHILDSTAT) { // Job states have already been loaded
-                               for ( ii = 0 ; ii < i ; ii++ ) {
-                                       stat->children_hist[(stat->children_states[ii].state)+1]++;
+                                       stat->children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES;
                                }
-                       }
-                       else {
-                               // Get child states from the database
-                               trio_asprintf(&stmt, "SELECT status FROM states WHERE parent_job='%|Ss' AND version='%|Ss'",
-                                                       md5_jobid, INTSTAT_VERSION);
-                               out[1] = NULL;
-                               if (stmt != NULL) {
-                                       num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
-                                       if (num_sub >=0 ) {
-                                               while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) {
-                                                       num_f = atoi(out[0]);
-                                                       if (num_f > EDG_WLL_JOB_UNDEF && num_f < EDG_WLL_NUMBER_OF_STATCODES)
-                                                               stat->children_hist[num_f+1]++;
-                                                       free(out[0]); 
+                               else {
+                                       /* If hist is loaded, it probably contain only incomplete histogram
+                                        * built in update_parent_status. Count it from scratch...*/
+                                       for (ii=1; ii<=EDG_WLL_NUMBER_OF_STATCODES; ii++)
+                                               stat->children_hist[ii] = 0;
+                               }
+
+                               if (flags & EDG_WLL_STAT_CHILDSTAT) { // Job states have already been loaded
+                                       for ( ii = 0 ; ii < i ; ii++ ) {
+                                               stat->children_hist[(stat->children_states[ii].state)+1]++;
+                                       }
+                               }
+                               else {
+                                       // Get child states from the database
+                                       trio_asprintf(&stmt, "SELECT status FROM states WHERE parent_job='%|Ss' AND version='%|Ss'",
+                                                               md5_jobid, INTSTAT_VERSION);
+                                       out[1] = NULL;
+                                       if (stmt != NULL) {
+                                               num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
+                                               if (num_sub >=0 ) {
+                                                       while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) {
+                                                               num_f = atoi(out[0]);
+                                                               if (num_f > EDG_WLL_JOB_UNDEF && num_f < EDG_WLL_NUMBER_OF_STATCODES)
+                                                                       stat->children_hist[num_f+1]++;
+                                                               free(out[0]); 
+                                                       }
+                                                       if (num_f < 0) goto rollback;
+
+                                                       glite_lbu_FreeStmt(&sh); sh = NULL;
                                                }
-                                               glite_lbu_FreeStmt(&sh);
+                                               else goto rollback;
+
+                                               free(stmt); stmt = NULL;
+                                       } else {
+                                               edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!");
+                                               goto rollback;
                                        }
-                                       free(stmt);
-                               } else goto dag_enomem;
-                       }
-               }
-               else {
-                       if (flags & EDG_WLL_STAT_CHILDHIST_FAST) { /* Fast Histogram */
-                               
-                               if (stat->children_hist == NULL) {
-                                       // If the histogram exists, assume that it was already filled during job state retrieval
-                                       stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
-                                       edg_wll_GetSubjobHistogram(ctx, job, stat->children_hist);
                                }
                        }
                        else {
-                               if (stat->children_hist) {
-                                       free (stat->children_hist);
-                                       stat->children_hist = NULL;
+                               if (flags & EDG_WLL_STAT_CHILDHIST_FAST) { /* Fast Histogram */
+                                       
+                                       if (stat->children_hist == NULL) {
+                                               // If the histogram exists, assume that it was already filled during job state retrieval
+                                               stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+                                               if (stat->children_hist == NULL) {
+                                                       edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_hist failed!");
+                                                       goto rollback;
+                                               }
+
+                                               if (edg_wll_GetSubjobHistogram(ctx, job, stat->children_hist))
+                                                       goto rollback;
+                                       }
+                               }
+                               else {
+                                       if (stat->children_hist) {
+                                               free (stat->children_hist);
+                                               stat->children_hist = NULL;
+                                       }
                                }
+
                        }
 
-               }
 
+                       if (flags & EDG_WLL_STAT_CHILDREN) {
 
-               if (flags & EDG_WLL_STAT_CHILDREN) {
+                               trio_asprintf(&stmt, "SELECT j.dg_jobid FROM states s,jobs j "
+                                               "WHERE s.parent_job='%|Ss' AND s.version='%|Ss' AND s.jobid=j.jobid",
+                                       md5_jobid, INTSTAT_VERSION);
+                               if (stmt != NULL) {
+                                       num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
+                                       if (num_sub >=0 ) {
+                                               while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) {
+                                                       add_stringlist(&stat->children, out[0]);
+                                                       free(out[0]); 
+                                               }
+                                               if (num_f < 0) goto rollback;
 
-                       trio_asprintf(&stmt, "SELECT j.dg_jobid FROM states s,jobs j "
-                                       "WHERE s.parent_job='%|Ss' AND s.version='%|Ss' AND s.jobid=j.jobid",
-                               md5_jobid, INTSTAT_VERSION);
-                       if (stmt != NULL) {
-                               num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
-                               if (num_sub >=0 ) {
-                                       while (edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out) == 1 ) {
-                                               add_stringlist(&stat->children, out[0]);
-                                               free(out[0]); 
+                                               glite_lbu_FreeStmt(&sh); sh = NULL;
                                        }
-                                       glite_lbu_FreeStmt(&sh);
+                                       else goto rollback;
+
+                                       free(stmt); stmt = NULL;
+                               } else {
+                                       edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!");
+                                       goto rollback;
                                }
-                               free(stmt);
-                       } else goto dag_enomem;
+                       }
+               }
+#endif
 
+               whole_cycle = 1;
+commit:
+rollback:
+               if (!whole_cycle) {
+                               edg_wll_FreeStatus(&jobstat.pub);
+                               jobstat.pub.owner = NULL;
+                               destroy_intJobStat_extension(&jobstat);
                }
+               if (jobstat.pub.owner) { free(jobstat.pub.owner); jobstat.pub.owner = NULL; }
+               if (acl) { edg_wll_FreeAcl(acl); acl = NULL; }
+               if (stmt) { free(stmt); stmt = NULL; }
+               if (sh) { glite_lbu_FreeStmt(&sh); sh = NULL; }
 
-       }
-#endif
-       free(string_jobid);
-       free(md5_jobid);
-       return edg_wll_Error(ctx, NULL, NULL);
+        } while (edg_wll_TransNeedRetry(ctx));
 
-#if DAG_ENABLE
-dag_enomem:
        free(string_jobid);
        free(md5_jobid);
-       edg_wll_FreeStatus(stat);
-       free(stmt);
-       return edg_wll_SetError(ctx, ENOMEM, NULL);
-#endif
+
+       return edg_wll_Error(ctx, NULL, NULL);
 }
 
 int edg_wll_intJobStatus(
@@ -667,6 +682,7 @@ cleanup:
 
 edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context ctx,
                                    edg_wlc_JobId jobid,
+                                   int lock,
                                    int seq,
                                    intJobStat **stat)
 {
@@ -679,6 +695,10 @@ edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context ctx,
        edg_wll_ResetError(ctx);
        jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
 
+       if (lock) {
+               edg_wll_LockJobRowForUpdate(ctx,jobid);
+       }
+
        if (seq == -1) {
                /* any sequence number */
                trio_asprintf(&stmt,
@@ -712,7 +732,8 @@ edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context ctx,
        free(res);
 cleanup:
        free(jobid_md5);
-       free(stmt); glite_lbu_FreeStmt(&sh);
+       free(stmt); 
+       if (sh) glite_lbu_FreeStmt(&sh);
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
@@ -755,9 +776,7 @@ static edg_wll_ErrorCode load_parent_intJobStat(edg_wll_Context ctx, intJobStat
 {
        if (*pis) return edg_wll_Error(ctx, NULL, NULL); // already loaded and locked
 
-       if (edg_wll_LockJob(ctx,cis->pub.parent_job)) goto err;
-       
-       if (edg_wll_LoadIntState(ctx, cis->pub.parent_job, - 1, pis))
+       if (edg_wll_LoadIntState(ctx, cis->pub.parent_job, LOCK, - 1, pis))
                goto err;
 
        assert(*pis);   // deadlock would happen with next call of this function
@@ -903,8 +922,6 @@ static edg_wll_ErrorCode update_parent_status(edg_wll_Context ctx, edg_wll_JobSt
        }
 
 err:
-       edg_wll_UnlockJob(ctx,cis->pub.parent_job);
-
        if (pis)
                destroy_intJobStat(pis);
 
@@ -986,7 +1003,7 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx,
 
        memset(&oldstat,0,sizeof oldstat);
 
-       if (!edg_wll_LoadIntState(ctx, job, seq - 1, &ijsp)) {
+       if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, seq - 1, &ijsp)) {
                edg_wll_CpyStatus(&ijsp->pub,&oldstat);
 
                if (ctx->rgma_export) oldstat_rgmaline = write2rgma_statline(&ijsp->pub);
@@ -994,11 +1011,9 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx,
                res = processEvent(ijsp, e, seq, be_strict, &errstring);
                if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
                        edg_wll_FreeStatus(&oldstat);
-                       edg_wll_UnlockJob(ctx,job); /* XXX: error lost */
                        return edg_wll_SetError(ctx, EINVAL, errstring);
                }
                edg_wll_StoreIntState(ctx, ijsp, seq);
-               if (edg_wll_UnlockJob(ctx,job)) goto err;
 
                edg_wll_UpdateStatistics(ctx,&oldstat,e,&ijsp->pub);
 
@@ -1028,7 +1043,6 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx,
                   Right approach is computing parent status from scratch.
                */
 
-               if (edg_wll_UnlockJob(ctx,job)) goto err;
                edg_wll_UpdateStatistics(ctx,NULL,e,&jobstat.pub);
 
                if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
@@ -1039,7 +1053,7 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx,
                }
                else destroy_intJobStat(&jobstat);
        }
-       else edg_wll_UnlockJob(ctx,job);
+
 err:
        return edg_wll_Error(ctx, NULL, NULL);
 }
index 964aa7a..7fba0fb 100644 (file)
@@ -103,7 +103,7 @@ void destroy_intJobStat_extension(intJobStat *p);
 int edg_wll_intJobStatus( edg_wll_Context, glite_jobid_const_t, int, intJobStat *, int);
 edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
 edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, char *icnames, char *values, glite_lbu_bufInsert *bi);
-edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
+edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, int, intJobStat **);
 
 edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, edg_wlc_JobId job, edg_wll_Event *e, int seq, edg_wll_JobStat *stat_out);
 edg_wll_ErrorCode edg_wll_StepIntStateParent(edg_wll_Context,edg_wlc_JobId,edg_wll_Event *,int,intJobStat *,edg_wll_JobStat *);
index 21d8e48..68073bb 100644 (file)
@@ -32,7 +32,7 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
 {
        int                                     fd,
                                                reject_fd = -1,
-                                               readret, i;
+                                               readret, i, ret;
        size_t                                  maxsize;
        char                       *line = NULL,
                                                buff[30];
@@ -48,9 +48,6 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
        if ( (fd = open(req->server_file, O_RDONLY)) == -1 )
                return edg_wll_SetError(ctx, errno, "Server can not open the file");
 
-       if (edg_wll_Transaction(ctx) != 0) 
-               return edg_wll_Error(ctx, NULL, NULL);
-
        memset(result,0,sizeof(*result));
        i = 0;
        while ( 1 )
@@ -58,7 +55,6 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
                /*      Read one line
                 */
                if ( (readret = read_line(&line, &maxsize, fd)) == -1 ) {
-                       edg_wll_Rollback(ctx);
                        return edg_wll_SetError(ctx, errno, "reading dump file");
                }
 
@@ -84,8 +80,15 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
                        result->to = event->any.arrived.tv_sec;
                }
                ctx->event_load = 1;
-               if ( edg_wll_StoreEvent(ctx, event, line, NULL) )
-               {
+               
+               do {
+                       if (edg_wll_Transaction(ctx)) goto err;
+
+                       ret = edg_wll_StoreEvent(ctx, event, line, NULL); 
+
+               } while (edg_wll_TransNeedRetry(ctx));
+
+               if (ret) {
                        char            *errdesc;
                        int             len = strlen(line),
                                        total = 0,
@@ -127,8 +130,7 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req,
                        }
                        write(reject_fd,"\n",1);
                }
-               else
-               {
+               else {
                        result->to = event->any.arrived.tv_sec;
                        if ( jobid )
                        {
@@ -156,6 +158,7 @@ cycle_clean:
                edg_wll_FreeEvent(event);
        }
 
+err:
        if ( jobid )
        {
                edg_wll_JobStat st;
@@ -168,9 +171,6 @@ cycle_clean:
        if ( reject_fd != -1 )
                close(reject_fd);
 
-       if (edg_wll_Commit(ctx) != 0)
-               return edg_wll_Error(ctx, NULL, NULL);
-
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
index 6138881..074260c 100644 (file)
@@ -107,48 +107,45 @@ int edg_wll_NotifNewServer(
                        trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
        }
 
-       /*      Format DB insert statement
-        */
-       trio_asprintf(&q,
-                               "insert into notif_registrations(notifid,destination,valid,userid,conditions) "
-                               "values ('%|Ss','%|Ss',%s,'%|Ss', '<and>%|Ss</and>')",
-                               nid_s, addr_s? addr_s: address_override, time_s, owner, xml_conds);
+       do {
+               if (edg_wll_Transaction(ctx) != 0) goto cleanup;
 
-       if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
-               goto cleanup;
+               /*      Format DB insert statement
+                */
+               trio_asprintf(&q,
+                                       "insert into notif_registrations(notifid,destination,valid,userid,conditions) "
+                                       "values ('%|Ss','%|Ss',%s,'%|Ss', '<and>%|Ss</and>')",
+                                       nid_s, addr_s? addr_s: address_override, time_s, owner, xml_conds);
 
-       if (get_indexed_cols(ctx,nid_s,nconds,&add_index) ||
-               (add_index && edg_wll_ExecSQL(ctx,add_index,NULL) < 0)
-       ) goto cleanup;
+               if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+                       goto rollback;
 
+               if (get_indexed_cols(ctx,nid_s,nconds,&add_index) ||
+                       (add_index && edg_wll_ExecSQL(ctx,add_index,NULL) < 0)
+               ) goto rollback;
 
-       if (jobs) for ( i = 0; jobs[i]; i++ )
-       {
-               free(q);
-               trio_asprintf(&q,
-                               "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
-                               nid_s, jobs[i]);
-               if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+
+               if (jobs) for ( i = 0; jobs[i]; i++ )
                {
-                       /*      XXX: Remove uncoplete registration?
-                        *               Which error has to be returned?
-                        */
                        free(q);
-                       trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s);
-                       edg_wll_ExecSQL(ctx, q, NULL);
-                       free(q);
-                       trio_asprintf(&q, "delete from notif_registrations where notifid='%|Ss'", nid_s);
-                       edg_wll_ExecSQL(ctx, q, NULL);
-                       goto cleanup;
+                       trio_asprintf(&q,
+                                       "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
+                                       nid_s, jobs[i]);
+                       if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+                               goto rollback;
                }
-       }
-       else {
-               trio_asprintf(&q,"insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
-                               nid_s,NOTIF_ALL_JOBS);
-               if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) goto cleanup;
+               else {
+                       trio_asprintf(&q,"insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
+                                       nid_s,NOTIF_ALL_JOBS);
+                       if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) goto rollback;
 
-       }
+               }
 
+rollback:
+               free(q); q= NULL;
+               free(add_index); add_index = NULL;
+
+       } while (edg_wll_TransNeedRetry(ctx));
 
 cleanup:
        if ( q ) free(q);
@@ -176,57 +173,63 @@ int edg_wll_NotifBindServer(
        const char                                         *address_override,
        time_t                                             *valid)
 {
-       char       *time_s = NULL,
-                          *addr_s = NULL;
+       char    *time_s = NULL,
+               *addr_s = NULL;
 
 
        if ( !address_override )
        {
                edg_wll_SetError(ctx, EINVAL, "Address parameter not given");
-               goto cleanup;
+               goto err;
        }
+       
+       do {
+               if (edg_wll_Transaction(ctx) != 0) goto err;
 
-       if ( check_notif_request(ctx, nid, NULL) )
-               goto cleanup;
-
-       /*      Format time of validity
-        */
-       *valid = time(NULL);
-       if (   ctx->peerProxyValidity
-               && (ctx->peerProxyValidity - *valid) < ctx->notifDuration )
-               *valid = ctx->peerProxyValidity;
-       else
-               *valid += ctx->notifDuration;
+               if ( check_notif_request(ctx, nid, NULL) )
+                       goto rollback;
 
-       glite_lbu_TimeToDB(*valid, &time_s);
-       if ( !time_s )
-       {
-               edg_wll_SetError(ctx, errno, "Formating validity time");
-               goto cleanup;
-       }
+               /*      Format time of validity
+                */
+               *valid = time(NULL);
+               if (   ctx->peerProxyValidity
+                       && (ctx->peerProxyValidity - *valid) < ctx->notifDuration )
+                       *valid = ctx->peerProxyValidity;
+               else
+                       *valid += ctx->notifDuration;
 
-       /*      Format the address
-        */
-       if ( address_override )
-       {
-               char   *aux;
+               glite_lbu_TimeToDB(*valid, &time_s);
+               if ( !time_s )
+               {
+                       edg_wll_SetError(ctx, errno, "Formating validity time");
+                       goto rollback;
+               }
 
-               if ( !(aux = strchr(address_override, ':')) )
+               /*      Format the address
+                */
+               if ( address_override )
                {
-                       edg_wll_SetError(ctx, EINVAL, "Addres overrirde not in format host:port");
-                       goto cleanup;
+                       char   *aux;
+
+                       if ( !(aux = strchr(address_override, ':')) )
+                       {
+                               edg_wll_SetError(ctx, EINVAL, "Addres overrirde not in format host:port");
+                               goto rollback;
+                       }
+                       if ( !strncmp(address_override, "0.0.0.0", aux-address_override) )
+                               trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
                }
-               if ( !strncmp(address_override, "0.0.0.0", aux-address_override) )
-                       trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
-       }
 
 
-       update_notif(ctx, nid, NULL, addr_s? addr_s: address_override, (const char *)(time_s));
+               update_notif(ctx, nid, NULL, addr_s? addr_s: address_override, (const char *)(time_s));
 
-cleanup:
-       if ( time_s ) free(time_s);
-       if ( addr_s ) free(addr_s);
+rollback:
+       free(time_s); time_s = NULL;
+       free(addr_s); addr_s = NULL;
+
+       } while (edg_wll_TransNeedRetry(ctx));
 
+err:
        return edg_wll_Error(ctx, NULL, NULL);
 }
 
@@ -248,89 +251,94 @@ int edg_wll_NotifChangeServer(
        /*      Format notification ID
         */
        if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) )
-               goto cleanup;
+               goto err;
 
-       if ( check_notif_request(ctx, nid, NULL) )
-               goto cleanup;
+       do {
+               if (edg_wll_Transaction(ctx) != 0) goto err;
 
-       switch ( op )
-       {
-       case EDG_WLL_NOTIF_REPLACE:
-               /*      Format conditions
-                *      - separate all jobids
-                *      - format new condition list without jobids
-                */
-               if ( split_cond_list(ctx, conditions, &nconds, &jobs) )
-                       goto cleanup;
+               if ( check_notif_request(ctx, nid, NULL) )
+                       goto rollback;
 
-               /*
-                *      encode new cond. list into a XML string
-                */
-               if ( edg_wll_JobQueryRecToXML(ctx, (edg_wll_QueryRec const * const *) nconds, &xml_conds) )
+               switch ( op )
                {
-                       /*      XXX: edg_wll_JobQueryRecToXML() do not set errors in context!
-                        *                      can't get propper error number :(
+               case EDG_WLL_NOTIF_REPLACE:
+                       /*      Format conditions
+                        *      - separate all jobids
+                        *      - format new condition list without jobids
                         */
-                       edg_wll_SetError(ctx, errno, "Can't encode data into xml");
-                       goto cleanup;
-               }
+                       if ( split_cond_list(ctx, conditions, &nconds, &jobs) )
+                               goto rollback;
 
-               /*      Format DB insert statement
-                */
-               if ( update_notif(ctx, nid, xml_conds, NULL, NULL) )
-                       goto cleanup;
+                       /*
+                        *      encode new cond. list into a XML string
+                        */
+                       if ( edg_wll_JobQueryRecToXML(ctx, (edg_wll_QueryRec const * const *) nconds, &xml_conds) )
+                       {
+                               /*      XXX: edg_wll_JobQueryRecToXML() do not set errors in context!
+                                *                      can't get propper error number :(
+                                */
+                               edg_wll_SetError(ctx, errno, "Can't encode data into xml");
+                               goto rollback;
+                       }
 
-               if ( jobs )
-               {
                        /*      Format DB insert statement
                         */
-                       trio_asprintf(&q, "delete from  notif_jobs where notifid='%|Ss'", nid_s);
-                       if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
-                               goto cleanup;
+                       if ( update_notif(ctx, nid, xml_conds, NULL, NULL) )
+                               goto rollback;
 
-                       for ( i = 0; jobs[i]; i++ )
+                       if ( jobs )
                        {
-                               free(q);
-                               trio_asprintf(&q,
-                                               "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
-                                               nid_s, jobs[i]);
+                               /*      Format DB insert statement
+                                */
+                               trio_asprintf(&q, "delete from  notif_jobs where notifid='%|Ss'", nid_s);
                                if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+                                       goto rollback;
+
+                               for ( i = 0; jobs[i]; i++ )
                                {
-                                       /*      XXX: Remove uncoplete registration?
-                                        *               Which error has to be returned?
-                                        */
-                                       free(q);
-                                       trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s);
-                                       edg_wll_ExecSQL(ctx, q, NULL);
                                        free(q);
-                                       trio_asprintf(&q,"delete from notif_registrations where notifid='%|Ss'", nid_s);
-                                       edg_wll_ExecSQL(ctx, q, NULL);
-                                       goto cleanup;
+                                       trio_asprintf(&q,
+                                                       "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
+                                                       nid_s, jobs[i]);
+                                       if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+                                       {
+                                               /*      XXX: Remove uncoplete registration?
+                                                *               Which error has to be returned?
+                                                */
+                                               free(q);
+                                               trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s);
+                                               edg_wll_ExecSQL(ctx, q, NULL);
+                                               free(q);
+                                               trio_asprintf(&q,"delete from notif_registrations where notifid='%|Ss'", nid_s);
+                                               edg_wll_ExecSQL(ctx, q, NULL);
+                                               goto rollback;
+                                       }
                                }
                        }
+                       break;
+
+               case EDG_WLL_NOTIF_ADD:
+                       break;
+               case EDG_WLL_NOTIF_REMOVE:
+                       break;
+               default:
+                       break;
                }
-               break;
-
-       case EDG_WLL_NOTIF_ADD:
-               break;
-       case EDG_WLL_NOTIF_REMOVE:
-               break;
-       default:
-               break;
-       }
 
-cleanup:
-       if ( q ) free(q);
-       if ( xml_conds ) free(xml_conds);
-       if ( nid_s ) free(nid_s);
-       if ( jobs )
-       {
-               for ( i = 0; jobs[i]; i++ )
-                       free(jobs[i]);
-               free(jobs);
-       }
-       if ( nconds ) free(nconds);
+rollback:
+               free(q); q = NULL;
+               free(xml_conds); xml_conds = NULL;
+               free(nid_s); nid_s = NULL;
+               if ( jobs ) {
+                       for ( i = 0; jobs[i]; i++ )
+                               free(jobs[i]);
+                       free(jobs); jobs = NULL;
+               }
+               free(nconds); nconds = NULL;
+
+       } while (edg_wll_TransNeedRetry(ctx));
 
+err:
        return edg_wll_Error(ctx, NULL, NULL);
 }
 
@@ -341,31 +349,36 @@ int edg_wll_NotifRefreshServer(
 {
        char       *time_s = NULL;
 
+       do {
+               if (edg_wll_Transaction(ctx) != 0) goto err;            
 
-       if ( check_notif_request(ctx, nid, NULL) )
-               goto cleanup;
+               if ( check_notif_request(ctx, nid, NULL) )
+                       goto rollback;
 
-       /*      Format time of validity
-        */
-       *valid = time(NULL);
-       if (   ctx->peerProxyValidity
-               && (ctx->peerProxyValidity - *valid) < ctx->notifDuration )
-               *valid = ctx->peerProxyValidity;
-       else
-               *valid += ctx->notifDuration;
+               /*      Format time of validity
+                */
+               *valid = time(NULL);
+               if (   ctx->peerProxyValidity
+                       && (ctx->peerProxyValidity - *valid) < ctx->notifDuration )
+                       *valid = ctx->peerProxyValidity;
+               else
+                       *valid += ctx->notifDuration;
 
-       glite_lbu_TimeToDB(*valid, &time_s);
-       if ( !time_s )
-       {
-               edg_wll_SetError(ctx, errno, "Formating validity time");
-               goto cleanup;
-       }
+               glite_lbu_TimeToDB(*valid, &time_s);
+               if ( !time_s )
+               {
+                       edg_wll_SetError(ctx, errno, "Formating validity time");
+                       goto rollback;
+               }
 
-       update_notif(ctx, nid, NULL, NULL, time_s);
+               update_notif(ctx, nid, NULL, NULL, time_s);
 
-cleanup:
-       if ( time_s ) free(time_s);
+rollback:
+               free(time_s); time_s = NULL;
 
+       } while (edg_wll_TransNeedRetry(ctx));
+
+err:
        return edg_wll_Error(ctx, NULL, NULL);
 }
 
@@ -375,27 +388,33 @@ int edg_wll_NotifDropServer(
 {
        char       *nid_s = NULL,
                           *stmt = NULL;
-       int                     ret;
 
+       
+       do {
+               if (edg_wll_Transaction(ctx) != 0) goto err;
 
-       if ( check_notif_request(ctx, nid, NULL) )
-               goto cleanup;
+               if ( check_notif_request(ctx, nid, NULL) )
+                       goto rollback;
 
-       if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) )
-               goto cleanup;
+               if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) )
+                       goto rollback;
 
-       trio_asprintf(&stmt, "delete from notif_registrations where notifid='%|Ss'", nid_s);
-       if ( (ret = edg_wll_ExecSQL(ctx, stmt, NULL)) < 0 )
-               goto cleanup;
-       free(stmt);
-       trio_asprintf(&stmt, "delete from notif_jobs where notifid='%|Ss'", nid_s);
-       edg_wll_ExecSQL(ctx, stmt, NULL);
-       edg_wll_NotifCancelRegId(ctx, nid);
+               trio_asprintf(&stmt, "delete from notif_registrations where notifid='%|Ss'", nid_s);
+               if ( edg_wll_ExecSQL(ctx, stmt, NULL) < 0 )
+                       goto rollback;
+               free(stmt);
+               trio_asprintf(&stmt, "delete from notif_jobs where notifid='%|Ss'", nid_s);
+               if ( edg_wll_ExecSQL(ctx, stmt, NULL) < 0 ) 
+                       goto rollback;
+               edg_wll_NotifCancelRegId(ctx, nid);
 
-cleanup:
-       if ( nid_s ) free(nid_s);
-       if ( stmt ) free(stmt);
+rollback:
+               free(nid_s); nid_s = NULL;
+               free(stmt); stmt = NULL;
+
+       } while (edg_wll_TransNeedRetry(ctx));
 
+err:
        return edg_wll_Error(ctx, NULL, NULL);
 }
 
@@ -478,7 +497,7 @@ static int check_notif_request(
 
        trio_asprintf(&stmt,
                                "select notifid from notif_registrations "
-                               "where notifid='%|Ss' and userid='%|Ss'",
+                               "where notifid='%|Ss' and userid='%|Ss' FOR UPDATE",
                                nid_s, user);
 
        if ( (ret = edg_wll_ExecSQL(ctx, stmt, NULL)) < 0 )
index 697acdf..37e534e 100644 (file)
@@ -19,7 +19,7 @@ int edg_wll_PurgeServer(
  */
 int edg_wll_PurgeServerProxy(
        edg_wll_Context ctx,
-       edg_wlc_JobId job
+       glite_jobid_const_t job
 );
 
 #define                FILE_TYPE_ANY           ""
index 2ec8236..e0279de 100644 (file)
@@ -211,7 +211,7 @@ int edg_wll_QueryEventsServer(
                                }
                        }
 
-                       // Auth checked in edg_wll_JobStatus above
+                       // Auth checked in edg_wll_JobStatusServer above
                        if ( !(where_flags & FL_FILTER) && !noAuth )
                        {
                                if (!ctx->peerName || (strcmp(res[1],peerid) && strcmp(res[1], can_peerid))) {
index 47f1227..674b725 100644 (file)
@@ -42,9 +42,9 @@ static const char* const resp_headers[] = {
        NULL
 };
 
-static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int);
-int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job);
-static int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job);
+static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int);
+int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job);
+static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job);
 
 
 int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname)
@@ -184,16 +184,16 @@ int edg_wll_CreateFileStorage(edg_wll_Context ctx, char *file_type, char *prefix
        return retfd;
 }
 
-int edg_wll_PurgeServerProxy(edg_wll_Context ctx, edg_wlc_JobId job)
+int edg_wll_PurgeServerProxy(edg_wll_Context ctx, glite_jobid_const_t job)
 {
-       switch ( purge_one(ctx, job, -1, 1) ) {
+       switch ( purge_one(ctx, job, -1, 1, 1) ) {
                case 0:
                case ENOENT:
-                       edg_wll_ResetError(ctx);
-                       return 0;
-
+                       return(edg_wll_ResetError(ctx));
+                       break;
                default:
-                       return -1;
+                       return(edg_wll_Error(ctx,NULL,NULL));
+                       break;
        }
 }
 
@@ -252,7 +252,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request)
                                parse = 1;
                        }
                        else {
-                               switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE)) {
+                               switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,0)) {
                                        case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
                                                        result.jobs = realloc(result.jobs,(naffected_jobs+2) * sizeof(*result.jobs));
                                                        result.jobs[naffected_jobs] = strdup(request->jobs[i]);
@@ -260,7 +260,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request)
                                                }
                                                naffected_jobs++;
                                                break;
-                                       case ENOENT: parse = 1;
+                                       case ENOENT: /* job does not exist, consider purged and ignore */
                                                     edg_wll_ResetError(ctx);
                                                     break;
                                        default: goto abort;
@@ -298,6 +298,11 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request)
 
                                memset(&stat,0,sizeof stat);
                                if (edg_wll_JobStatusServer(ctx,job,0,&stat)) {  /* FIXME: replace by intJobStatus ?? */
+                                       if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+                                               /* job purged meanwhile, ignore */
+                                               edg_wll_ResetError(ctx);
+                                               continue;
+                                       }
                                        edg_wll_FreeStatus(&stat);
                                        goto abort; 
                                }
@@ -314,8 +319,13 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request)
 
                                if (now-stat.lastUpdateTime.tv_sec > timeout[i] && !check_strict_jobid(ctx,job))
                                {
-                                       if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE)) {
+                                       if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,0)) {
                                                edg_wll_FreeStatus(&stat);
+                                               if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+                                                       /* job purged meanwhile, ignore */
+                                                       edg_wll_ResetError(ctx);
+                                                       continue;
+                                               }
                                                goto abort;
                                        }
 
@@ -424,214 +434,204 @@ static void unlock_and_check(edg_wll_Context ctx,edg_wlc_JobId job)
        }
 }
 
+static int dump_events(edg_wll_Context ctx, glite_jobid_const_t job, int dump, char **res)
+{
+       edg_wll_Event   e;
+       int             event;
+
+
+       event = atoi(res[0]);
+       free(res[0]); res[0] = NULL;
+
+       res[0] = edg_wlc_JobIdUnparse(job);
+       if (convert_event_head(ctx,res,&e) || edg_wll_get_event_flesh(ctx,event,&e))
+       {
+               char    *et,*ed, *dbjob;
+               int     i;
+
 
-int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge)
+       /* Most likely sort of internal inconsistency. 
+        * Must not be fatal -- just complain
+        */
+               edg_wll_Error(ctx,&et,&ed);
+               dbjob = edg_wlc_JobIdGetUnique(job);
+               fprintf(stderr,"%s event %d: %s (%s)\n",dbjob,event,et,ed);
+               syslog(LOG_WARNING,"%s event %d: %s (%s)",dbjob,event,et,ed);
+               free(et); free(ed); free(dbjob);
+               for (i=0; i<sizofa(res); i++) free(res[i]);
+               edg_wll_ResetError(ctx);
+       }
+       else {
+               char    *event_s = edg_wll_UnparseEvent(ctx,&e);
+               char    arr_s[100];
+               int     len, written, total;
+
+               strcpy(arr_s, "DG.ARRIVED=");
+               edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec,
+                                               e.any.arrived.tv_usec,
+                                               arr_s+strlen("DG.ARRIVED="));
+
+               len = strlen(arr_s);
+               total = 0;
+               while (total != len) {
+                       written = write(dump,arr_s+total,len-total);
+                       if (written < 0 && errno != EAGAIN) {
+                               edg_wll_SetError(ctx,errno,"writing dump file");
+                               free(event_s);
+                               return edg_wll_Error(ctx,NULL,NULL);
+                       }
+                       total += written;
+               }
+               write(dump, " ", 1);
+               
+               len = strlen(event_s);
+               total = 0;
+               while (total != len) {
+                       written = write(dump,event_s+total,len-total);
+                       if (written < 0 && errno != EAGAIN) {
+                               perror("dump to file");
+                               syslog(LOG_ERR,"dump to file: %m");
+                               dump = -1; /* XXX: likely to be a permanent error
+                                           * give up writing but do purge */
+                               break;
+                       }
+                       total += written;
+               }
+               /* write(dump,"\n",1); edg_wll_UnparseEvent does so */
+               free(event_s);
+       }
+       edg_wll_FreeEvent(&e);
+
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, int purge_from_proxy_only)
 {
        char    *dbjob;
        char    *stmt = NULL;
        glite_lbu_Statement     q;
        int             ret,dumped = 0;
+       char    *res[9];
+
 
        edg_wll_ResetError(ctx);
        if ( !purge && dump < 0 ) return 0;
 
-       switch (edg_wll_jobMembership(ctx, job)) {
-               case DB_PROXY_JOB:
-                       if (!ctx->isProxy) {
-                               /* should not happen */
-                               return 0;
-                       }
-                       /* continue */
-                       break;
-               case DB_SERVER_JOB:
-                       if (ctx->isProxy) {
-                               /* should not happen */
-                               return 0;
-                       }
-                       /* continue */
-                       break;
-               case DB_PROXY_JOB+DB_SERVER_JOB:
-                       if (ctx->isProxy) {
-                               purge = 0;
-                               if (unset_proxy_flag(ctx, job) < 0) {
-                                       return(edg_wll_Error(ctx,NULL,NULL));
+       do {
+               if (edg_wll_Transaction(ctx)) goto err;
+
+               switch (edg_wll_jobMembership(ctx, job)) {
+                       case DB_PROXY_JOB:
+                               if (!ctx->isProxy) {
+                                       /* should not happen */
+                                       goto commit;
                                }
-                       }
-                       else {
-                               purge = 0;
-                               if (unset_server_flag(ctx, job) < 0) {
-                                       return(edg_wll_Error(ctx,NULL,NULL));
+                               /* continue */
+                               break;
+                       case DB_SERVER_JOB:
+                               if (ctx->isProxy) {
+                                       /* should not happen */
+                                       goto commit;
                                }
-                       }
-                       break;
-               case 0:
-                       // Zombie job (server=0, proxy=0)? should not happen;
-                       // clear it to keep DB healthy
-                       break;
-               default:
-                       return 0;
-                       break;
-       }
+                               /* continue */
+                               break;
+                       case DB_PROXY_JOB+DB_SERVER_JOB:
+                               if (ctx->isProxy) {
+                                       purge = 0;
+                                       if (unset_proxy_flag(ctx, job) < 0) {
+                                               goto rollback;
+                                       }
+                               }
+                               else {
+                                       purge = 0;
+                                       /* if server&proxy DB is shared ... */
+                                       if (is_job_local(ctx,job) && purge_from_proxy_only) {
+                                               if (unset_proxy_flag(ctx, job) < 0) {
+                                                       goto rollback;
+                                               }
+                                       }
+                                       else {
+                                               if (unset_server_flag(ctx, job) < 0) {
+                                                       goto rollback;
+                                               }
+                                       }
+                               }
+                               break;
+                       case 0:
+                               // Zombie job (server=0, proxy=0)? should not happen;
+                               // clear it to keep DB healthy
+                               break;
+                       default:
+                               goto rollback;
+                               break;
+               }
 
-       dbjob = edg_wlc_JobIdGetUnique(job);    /* XXX: strict jobid already checked */
-       if (edg_wll_LockJob(ctx,job)) goto clean;
+               dbjob = edg_wlc_JobIdGetUnique(job);    /* XXX: strict jobid already checked */
 
-       if ( purge )
-       {
-               trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob);
-               ret = edg_wll_ExecSQL(ctx,stmt,NULL);
-               if (ret <= 0) {
-                       unlock_and_check(ctx,job);
-                       if (ret == 0) {
-                               fprintf(stderr,"%s: no such job\n",dbjob);
-                               edg_wll_SetError(ctx,ENOENT,dbjob);
-                       }
-                       goto clean;
-               }
-               free(stmt); stmt = NULL;
+               if ( purge )
+               {
+                       trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob);
+                       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback;
+                       free(stmt); stmt = NULL;
 
-               trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob);
-               if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
-                       unlock_and_check(ctx,job);
-                       goto clean;
+                       trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob);
+                       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback; 
+                       free(stmt); stmt = NULL;
                }
-               free(stmt); stmt = NULL;
 
-/* Why on earth ?
-               trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob);
-               if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
-                       unlock_and_check(ctx,job);
-                       goto clean;
+               if ( purge )
+               {
+                       trio_asprintf(&stmt,"delete from status_tags where jobid = '%|Ss'",dbjob);
+                       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback;
+                       free(stmt); stmt = NULL;
                }
-               free(stmt); stmt = NULL;
-*/
 
-       }
-
-       if (!ctx->strict_locking) unlock_and_check(ctx,job);
+               if (dump >= 0) 
+                       trio_asprintf(&stmt,
+                               "select event,code,prog,host,u.cert_subj,time_stamp,usec,level,arrived "
+                               "from events e,users u "
+                               "where e.jobid='%|Ss' "
+                               "and u.userid=e.userid "
+                               "order by event", dbjob);
+               else
+                       trio_asprintf(&stmt,"select event from events "
+                               "where jobid='%|Ss' "
+                               "order by event", dbjob);
 
-       if ( purge )
-       {
-               trio_asprintf(&stmt,"delete from status_tags where jobid = '%|Ss'",dbjob);
-               if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto unlock;
+               if (edg_wll_ExecSQL(ctx,stmt,&q) < 0) goto rollback;
                free(stmt); stmt = NULL;
-       }
-
-       if (dump >= 0) 
-               trio_asprintf(&stmt,
-                       "select event,code,prog,host,u.cert_subj,time_stamp,usec,level,arrived "
-                       "from events e,users u "
-                       "where e.jobid='%|Ss' "
-                       "and u.userid=e.userid "
-                       "order by event", dbjob);
-       else
-               trio_asprintf(&stmt,"select event from events "
-                       "where jobid='%|Ss' "
-                       "order by event", dbjob);
-
-/* check for events repeatedly -- new one may have arrived in the meantime */
-       while ((ret = edg_wll_ExecSQL(ctx,stmt,&q)) > 0) {
-               char    *res[9];
 
                dumped = 1;
                while ((ret = edg_wll_FetchRow(ctx,q,sizofa(res),NULL,res)) > 0) {
                        int     event;
 
+                       
+                       assert(ret == 9);
                        event = atoi(res[0]);
-                       free(res[0]); res[0] = NULL;
 
-                       if (dump >= 0) {
-                               edg_wll_Event   e;
+                       if (dump >= 0) 
+                               if (dump_events( ctx, job, dump, (char **) &res)) goto rollback;
 
-                               assert(ret == 9);
-                               res[0] = edg_wlc_JobIdUnparse(job);
-                               if (convert_event_head(ctx,res,&e) || edg_wll_get_event_flesh(ctx,event,&e))
-                               {
-                                       char    *et,*ed;
-                                       int     i;
-
-                               /* Most likely sort of internal inconsistency. 
-                                * Must not be fatal -- just complain
-                                */
-                                       edg_wll_Error(ctx,&et,&ed);
-                                       fprintf(stderr,"%s event %d: %s (%s)\n",dbjob,event,et,ed);
-                                       syslog(LOG_WARNING,"%s event %d: %s (%s)",dbjob,event,et,ed);
-                                       free(et); free(ed);
-                                       for (i=0; i<sizofa(res); i++) free(res[i]);
-                                       edg_wll_ResetError(ctx);
-                               }
-                               else {
-                                       char    *event_s = edg_wll_UnparseEvent(ctx,&e);
-                                       char    arr_s[100];
-                                       int     len, written, total;
-
-                                       strcpy(arr_s, "DG.ARRIVED=");
-                                       edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec,
-                                                                       e.any.arrived.tv_usec,
-                                                                       arr_s+strlen("DG.ARRIVED="));
-
-                                       len = strlen(arr_s);
-                                       total = 0;
-                                       while (total != len) {
-                                               written = write(dump,arr_s+total,len-total);
-                                               if (written < 0 && errno != EAGAIN) {
-                                                       edg_wll_SetError(ctx,errno,"writing dump file");
-                                                       free(event_s);
-                                                       goto clean;
-                                               }
-                                               total += written;
-                                       }
-                                       write(dump, " ", 1);
-                                       
-                                       len = strlen(event_s);
-                                       total = 0;
-                                       while (total != len) {
-                                               written = write(dump,event_s+total,len-total);
-                                               if (written < 0 && errno != EAGAIN) {
-                                                       perror("dump to file");
-                                                       syslog(LOG_ERR,"dump to file: %m");
-                                                       dump = -1; /* XXX: likely to be a permanent error
-                                                                   * give up writing but do purge */
-                                                       break;
-                                               }
-                                               total += written;
-                                       }
-                                       /* write(dump,"\n",1); edg_wll_UnparseEvent does so */
-                                       free(event_s);
-                               }
-                               edg_wll_FreeEvent(&e);
-                       }
-
-                       if ( purge ) {
-                               if (edg_wll_delete_event(ctx,dbjob,event)) {
-                                       char    *et,*ed;
-
-                               /* XXX: just complain and carry on. Is it OK? */
-                                       edg_wll_Error(ctx,&et,&ed);
-                                       fprintf(stderr,"%s event %d: %s (%s)\n",dbjob,event,et,ed);
-                                       syslog(LOG_WARNING,"%s event %d: %s (%s)",dbjob,event,et,ed);
-                                       free(et); free(ed);
-                                       edg_wll_ResetError(ctx);
-                               }
-                       }
+                       if ( purge ) 
+                               if (edg_wll_delete_event(ctx,dbjob,event)) goto rollback;
                }
                glite_lbu_FreeStmt(&q);
-               if (ret < 0 || !purge) break;
-       }
+               if (ret < 0) goto rollback;
 
-       glite_lbu_FreeStmt(&q);
+commit:
+rollback:;
+       } while (edg_wll_TransNeedRetry(ctx));
 
-unlock:
-       if (ctx->strict_locking) unlock_and_check(ctx,job);
 
-clean:
+err:
        free(dbjob);
        free(stmt);
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
 
-int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job)
+int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job)
 {
        char    *stmt = NULL;
        char            *dbjob;
@@ -646,7 +646,7 @@ int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job)
 }
 
 
-int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job)
+int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job)
 {
        char    *stmt = NULL;
        char            *dbjob;
index 1bb3623..5610b8e 100644 (file)
@@ -16,6 +16,7 @@
 #include <assert.h>
 #include <errno.h>
 #include <syslog.h>
+#include <ctype.h>
 
 #include "glite/jobid/strmd5.h"
 #include "glite/lbu/trio.h"
 #include "jobstat.h"
 #include "db_calls.h"
 #include "db_supp.h"
+#include "index.h"
 
 static int store_user(edg_wll_Context,const char *,const char *); 
-static int store_job(edg_wll_Context,glite_jobid_const_t,const char *, int, int);
+static int store_job(edg_wll_Context,glite_jobid_const_t,const char *, int, int, int, int);
 #ifdef LB_BUF
 static int store_job_block(edg_wll_Context, glite_jobid_const_t, const char *, glite_lbu_bufInsert *, int, int);
 #endif
-static int store_job_grey(edg_wll_Context,glite_jobid_const_t,time_t);
+static int set_job_grey(edg_wll_Context ctx, char *jobid);
 static int store_flesh(edg_wll_Context,edg_wll_Event *,const char *ulm, char *,int);
-static int store_seq(edg_wll_Context,edg_wll_Event *,int);
 static int check_dup(edg_wll_Context,edg_wll_Event *);
 static int check_auth(edg_wll_Context,edg_wll_Event *e); 
-#ifndef LB_DAG_EMBRIONIC
-static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
-#endif
-static int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *, const char *);
+static void lowercase_usertag(edg_wll_Event *ev);
 
 void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) {
        ctx->allowAnonymous = anon;
 }
 
+
+/* !!! to be called from OPEN TRANSACTION only !!! 
+ */
 int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int *seq)
 {
-       edg_wll_ErrorCode       err = 0;
-       char            *userid = NULL,*jobid,*stmt;
-       char            *select_max,*ssrc;
+       char                    *userid, *jobid, *stmt, *ssrc, *now_s, *stamp, *dummy, *max;
        glite_lbu_Statement     sh = NULL;
-       int             next = 0xDEAD;
-       int             lbproxy_notreg = 0;
-       char            *now_s = NULL;
+       int                     next = 0xDEAD, nr;
 
-       ssrc = jobid = stmt = select_max = NULL;
+
+       userid = ssrc = jobid = stmt = now_s = stamp = dummy = max = NULL;
+
+       lowercase_usertag(e);
+       jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
+       glite_lbu_TimeToDB(e->any.timestamp.tv_sec, &stamp);
+       ssrc = edg_wll_SourceToString(e->any.source);
 
        if ( ctx->event_load )
                glite_lbu_TimeToDB(e->any.arrived.tv_sec, &now_s);
@@ -69,21 +72,12 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int
                glite_lbu_TimeToDB(time(NULL), &now_s);
 
        edg_wll_ResetError(ctx);
-       switch (err = check_auth(ctx,e)) {
+       switch (check_auth(ctx,e)) {
                case 0: break;
                case ENOENT: 
-                       if ( !ctx->isProxy ) {
-                               if (ctx->greyjobs) {
-                                       edg_wll_ResetError(ctx);
-                                       if (store_job_grey(ctx,e->any.jobId,e->any.timestamp.tv_sec))
-                                               goto clean;
-                                       break;
-                               } 
-                               else goto clean;
-                       }
-
-                       edg_wll_ResetError(ctx);
-                       lbproxy_notreg = 1;
+                       /* job not registered */
+                       // should not happen, store_job_server_proxy() miscoded or going thu load?
+                       goto clean;
                        break;
                case EPERM:
                        if (!ctx->noAuth) goto clean;
@@ -92,11 +86,6 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int
                default: goto clean;
        }
 
-/* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */
-       if ((err = check_dup(ctx,e))) goto clean;
-
-       jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
-
        trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid);
        
        if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(ctx,sh,1,NULL,&userid) < 0) goto clean;
@@ -104,131 +93,69 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int
        free(stmt); stmt = NULL;
 
 
-/* obtain next event sequence number */
-       trio_asprintf(&select_max,
-               "select max(event) from events "
-               "where jobid = '%|Ss'",jobid);
+/* check duplicity */
+       trio_asprintf(&stmt,
+               "select arrived from events where jobid='%|Ss' and code='%d'"
+               " and prog='%|Ss' and host='%|Ss' and time_stamp='%s' and usec='%d'"
+               " and level='%d' and userid='%|Ss' and seqcode='%|Ss') ",
+               jobid, (int) e->any.type,
+               ssrc,e->any.host,
+               stamp,e->any.timestamp.tv_usec,
+               e->any.level,userid, e->any.seqcode);
+
+       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean;
+       nr = edg_wll_FetchRow(ctx,sh,1,NULL,&dummy);
+       if (sh) glite_lbu_FreeStmt(&sh);
+       free(stmt); stmt = NULL;
+       free(dummy);
 
-       ssrc = edg_wll_SourceToString(e->any.source);
+       if (nr < 0) goto clean;
+       if (nr > 0) {
+               /* possible duplicity (99%) */
+               // XXX: check event flesh to be 100% sure
+               edg_wll_SetError(ctx,EEXIST,"duplicate event");
+               goto clean;
+       }
+       /* else (nr == 0) -> unique event, continue */
+       
 
-/* try to insert (someone else may be doing the same) */
-       {
-               char    *max = NULL;
+/* obtain number of stored events */
+       trio_asprintf(&stmt,
+               "select nevents from jobs "
+               "where jobid = '%|Ss'",jobid);
+
+       if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 ||
+           edg_wll_FetchRow(ctx,sh,1,NULL,&max) < 0) goto clean;
+       glite_lbu_FreeStmt(&sh); 
+       
+       next = (max && *max) ? atoi(max)+1 : 0;
+       free(max);
 
-               if (edg_wll_ExecSQL(ctx,select_max,&sh) < 0 ||
-                   edg_wll_FetchRow(ctx,sh,1,NULL,&max) < 0)
-               {
-                       err = edg_wll_Error(ctx,NULL,NULL);
-                       goto clean;
-               }
-               glite_lbu_FreeStmt(&sh); 
-               
-               next = max && *max ? atoi(max)+1 : 0;
-               free(max);
-       }
-               
-       while (1) {
-       /* 
-        * 1) when using transactions:
-        *   Store the whole event right now.
-        *
-        * 2) when not using transactions:
-        *   Store an UNDEF event first in order to prevent race condition
-        *   with readers and update event code later.
-        */
-               char    *stamp = NULL;
-
-               glite_lbu_TimeToDB(e->any.timestamp.tv_sec, &stamp);
-               trio_asprintf(&stmt,
-                       "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid) "
-                       "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss')",
-                       jobid,next,
-                       ctx->dbcaps & GLITE_LBU_DB_CAP_TRANSACTIONS ? (int) e->any.type : EDG_WLL_EVENT_UNDEF,
-                       ssrc,e->any.host,
-                       stamp,e->any.timestamp.tv_usec,
-                       now_s, e->any.level,userid);
-               free(stamp);
-
-               if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
-                       if ((err = edg_wll_Error(ctx,NULL,NULL)) != EEXIST)
-                               goto clean;
-               } else break; /* successful insert */
-
-       /* we were late -- try once again */
-               next++;
-               free(stmt);
-       }
 
+/* store event */              
+       trio_asprintf(&stmt,
+               "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid,seqcode) "
+               "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss','%|Ss')",
+               jobid,next,
+               (int) e->any.type,
+               ssrc,e->any.host,
+               stamp,e->any.timestamp.tv_usec,
+               now_s, e->any.level,userid, e->any.seqcode);
+
+       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean;
        free(stmt); stmt = NULL;
-       if ((err = store_seq(ctx,e,next)) ||
-               (err = store_flesh(ctx,e,ulm,jobid,next))) {
-       /* attempt to cleanup, ignore new errors */
-               char            *desc;
-               edg_wll_ErrorCode       oerr = edg_wll_Error(ctx,NULL,&desc);
-
-               edg_wll_delete_event(ctx,jobid,next);
-               edg_wll_SetError(ctx,oerr,desc);
-               free(desc);
-       } else
-       if (!(ctx->dbcaps & GLITE_LBU_DB_CAP_TRANSACTIONS)) {
-       /* emulate commit, i.e. swith to a real event type to make
-        * the record valid */
-               trio_asprintf(&stmt,
-                       "update events set code=%d "
-                       "where jobid='%|Ss' and event=%d",
-                       (int) e->any.type,jobid,next);
-               switch (edg_wll_ExecSQL(ctx,stmt,NULL)) {
-                       case 0: if (ctx->strict_locking)
-                                       err = edg_wll_SetError(ctx,ENOENT,"event disappeared on store while strict locking");
-                       /* purge in progres: drop the garbage, ignore errors */
-                               else {
-                                       edg_wll_delete_event(ctx,jobid,next);
-                                       err = edg_wll_SetError(ctx,ENOENT,"job being purged");
-                               }
-                               break;
-                       case 1: if (ctx->strict_locking) err = 0;
-                               else {
-                       /* check whether the job is still there to prevent garbage
-                        * left while there is a concurrent purge 
-                        */
-                                       free(stmt);
-                                       trio_asprintf(&stmt,
-                                               "select 'x' from jobs where jobid='%|Ss'",
-                                               jobid);
-                                       switch (edg_wll_ExecSQL(ctx,stmt,NULL)) {
-                                               case 1: break;
-                                               case 0: /* purge in progres */
-                                                       edg_wll_delete_event(ctx,jobid,next);
-                                                       err = edg_wll_SetError(ctx,ENOENT,"job being purged");
-                                                       break;
-                                               default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,
-                                                               "more job records, what is that?");
-                                                       break;
-                                       }
-                               }
-                               break;
-                       case -1: err = edg_wll_Error(ctx,NULL,NULL);
-                               break;
-
-                       default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,
-                               "more event records, what is that?");
-                               break;
-               }
-       } /* if !transactions */
 
-       if (err == 0 && 
-               e->any.type == EDG_WLL_EVENT_REGJOB &&
-               (e->regJob.jobtype == EDG_WLL_REGJOB_DAG ||
-                e->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED ||
-                e->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) &&
-               e->regJob.nsubjobs > 0)  
+/* increase number of stored events */
+       trio_asprintf(&stmt,
+               "update jobs set nevents='%d'"
+               "where jobid = '%|Ss'", next, jobid);
 
-#ifdef LB_DAG_EMBRIONIC
-                       err = register_subjobs_embryonic(ctx,&e->regJob,userid);
-#else
-                       err = register_subjobs(ctx,&e->regJob);
-#endif
+       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean;
+       free(stmt); stmt = NULL;
+       
 
+/* store event record */
+       if (store_flesh(ctx,e,ulm,jobid,next)) goto clean;
 
 clean:
        free(now_s);
@@ -236,10 +163,10 @@ clean:
        free(jobid);
        free(stmt);
        free(ssrc);
-       free(select_max);
        if (sh) glite_lbu_FreeStmt(&sh);
-       if (!err && seq) *seq = next;
-       return err;
+       if (!edg_wll_Error(ctx,NULL,NULL) && seq) *seq = next;
+       free(stamp);
+       return edg_wll_Error(ctx,NULL,NULL);
 }
 
 static int store_user(edg_wll_Context ctx,const char *userid,const char *subj)
@@ -258,7 +185,7 @@ static int store_user(edg_wll_Context ctx,const char *userid,const char *subj)
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
-static int store_job(edg_wll_Context ctx,glite_jobid_const_t job,const char *userid, int proxy, int server)
+static int store_job(edg_wll_Context ctx,glite_jobid_const_t job,const char *userid, int proxy, int server,int grey, int update)
 {
        char *jobstr = edg_wlc_JobIdUnparse(job);
        char *jobid = edg_wlc_JobIdGetUnique(job);
@@ -288,22 +215,26 @@ static int store_job(edg_wll_Context ctx,glite_jobid_const_t job,const char *use
        else {
                server = 1;
        }
-       
-       trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid,proxy,server) "
-               "values ('%|Ss','%|Ss','%|Ss', '%|Sd', '%|Sd')",jobid,jobstr,userid,proxy,server);
 
-       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
-               if (edg_wll_Error(ctx,NULL,NULL) == EEXIST) 
-                       edg_wll_ResetError(ctx);
+       if (update) {
+               trio_asprintf(&stmt,"update jobs set userid='%|Ss', proxy='%|Sd', server='%|Sd', grey='%|Sd' where jobid='%|Ss'", 
+                       userid,proxy,server,grey,jobid);
+       }
+       else {
+               trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid,proxy,server,grey) "
+                       "values ('%|Ss','%|Ss','%|Ss', '%|Sd', '%|Sd', '%|Sd')",jobid,jobstr,userid,proxy,server,grey);
        }
-       free(stmt);
 
-       if (ctx->greyjobs) {
-               trio_asprintf(&stmt,"delete from grey_jobs where jobid = '%|Ss'",jobid);
-               edg_wll_ExecSQL(ctx,stmt,NULL); /* XXX: error propagates */
-               free(stmt);
+       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
+               if (edg_wll_Error(ctx,NULL,NULL) == EEXIST && !update) 
+                       edg_wll_ResetError(ctx);
+               else
+                       goto err;
        }
+       free(stmt); stmt = NULL;
 
+err:
+       free(stmt);
        free(jobstr);
        free(jobid);
        return edg_wll_Error(ctx,NULL,NULL);
@@ -335,35 +266,19 @@ static int store_job_block(edg_wll_Context ctx,glite_jobid_const_t job,const cha
 }
 #endif
 
-static int store_job_grey(edg_wll_Context ctx,glite_jobid_const_t job,time_t etime)
+static int set_job_grey(edg_wll_Context ctx, char *jobid)
 {
-       char *jobstr = edg_wlc_JobIdUnparse(job);
-       char *jobid = edg_wlc_JobIdGetUnique(job);
-       char *stmt, *s_etime;
-
-       if (jobid == NULL || jobstr == NULL)
-               return edg_wll_SetError(ctx,EINVAL,"store_job_grey()");
-
-       edg_wll_ResetError(ctx);
-       glite_lbu_TimeToDB(etime, &s_etime);
-       trio_asprintf(&stmt,"insert into grey_jobs(jobid,dg_jobid,time_stamp) "
-                       "values ('%|Ss','%|Ss',%s)",
-                       jobid,jobstr,s_etime);
-       free(s_etime);
+       char *stmt;
 
-       if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
-               if (edg_wll_Error(ctx,NULL,NULL) == EEXIST)
-                       edg_wll_ResetError(ctx);
-       }
+       trio_asprintf(&stmt,"update jobs set grey='1' where jobid='%|Ss'", jobid);
+       edg_wll_ExecSQL(ctx,stmt,NULL); 
 
        free(stmt);
-       free(jobstr);
-       free(jobid);
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
 /* test whether job shares LB proxy and server DB or not */
-int is_job_local(edg_wll_Context ctx, edg_wlc_JobId jobId) 
+int is_job_local(edg_wll_Context ctx, glite_jobid_const_t jobId) 
 {
        char            *srvName = NULL;
        unsigned int    srvPort; 
@@ -377,169 +292,88 @@ int is_job_local(edg_wll_Context ctx, edg_wlc_JobId jobId)
        return(ret);
 }
 
-int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event)
-{      
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP)
+{
        char            *unique = edg_wlc_JobIdGetUnique(event->any.jobId);
-       char            *q = NULL, *owner = NULL, *userid = NULL;
+       char            *q = NULL, *userid = NULL, *subj;
        glite_lbu_Statement    stmt = NULL;
-       int             nar;
+       int             nar, grey = 0;
        char            *can_peername = NULL;
-       int             local_job = is_job_local(ctx, event->any.jobId);
+       int             local_job = is_job_local(ctx, event->any.jobId);
+       char            *res[3] = {NULL, NULL, NULL};
+       
+
+       /* check auth */
+       if (!ctx->isProxy && !ctx->peerName)
+               return edg_wll_SetError(ctx,EPERM,"LB server can't store using unauthenticated connection");
+       if (ctx->isProxy && (!event->any.user || !strcmp(event->any.user,EDG_WLL_LOG_USER_DEFAULT)) )
+               return edg_wll_SetError(ctx,EPERM,"LB proxy can't store using unauthenticated connection");
 
 
-       edg_wll_ResetError(ctx);
-       
        can_peername = edg_wll_gss_normalize_subj(ctx->peerName, 0);
 
-       if (ctx->isProxy) {
-               /* event arrived on proxy socket */
-               if (event->any.type == EDG_WLL_EVENT_REGJOB) {
-                       if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
-                        /* first synchronous registration */
-                               if (local_job) {
-                                       /* we are both server and proxy for this job */
-                                       trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'",
-                                               unique);
-
-                                       nar = edg_wll_ExecSQL(ctx, q, NULL);
-
-                                       if (nar == 0) {
-                                               /* job isn't registered yet */
-                                               userid = strdup(strmd5("unknown_to_proxy", NULL));
-                                               if (store_user(ctx,userid,"unknown_to_proxy")) goto err;
-
-                                               if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
-                                                       userid, 1, ctx->serverRunning)) goto err;
-                                                       
-                                       }
-                                       else {} /* job was registered thru GSI, no further action needed */
-                                               /* or error occured - and will go out via return() */
-                               }
-                               else {
-                                       /* we are only proxy for this job, forward it to server */
-
-                                       /* XXX - does it have any sence ??
-                                       if (!strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT)) {
-                                               edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously");
-                                               goto err;
-                                       }
-                                       */
-
-                                       userid = strdup(strmd5(event->any.user, NULL));
-                                       if (store_user(ctx,userid,event->any.user)) goto err;
-
-                                       if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
-                                                        userid, 1, 0)) goto err;
-                               }
-                       }
+       trio_asprintf(&q,"select proxy,server,grey from jobs where jobid='%|Ss' for update", event->any.jobId);
+
+       if ( (nar = edg_wll_ExecSQL(ctx,q,&stmt)) < 0 ) {
+               /* Job not registered yet */
+
+               if (!( (event->any.type == EDG_WLL_EVENT_REGJOB) && 
+                       (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) )) 
+               {
+                       if (ctx->greyjobs) grey = 1;
                        else {
-                               /* supplementary re-registration (JDL of subjob, etc.) */
-                               if (local_job) {
-                                       /* previous registration via GSI required */
-                                       trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'",
-                                               unique, unique);
-                                       /* does the job exists ? */
-                                       if (edg_wll_ExecSQL(ctx,q,NULL) < 0) {
-                                               edg_wll_SetError(ctx, ENOENT, "job not registered");
-                                               goto err;
-                                       }
-                               }
-                               else {
-                                       /* try to register job in case that first reistration   */
-                                       /* was sent to server only; ignore errors (EEXIST)      */
-                                       userid = strdup(strmd5(event->any.user, NULL));
-                                       if (store_user(ctx,userid,event->any.user)) goto err;
-
-                                       store_job(ctx,(glite_jobid_const_t) event->any.jobId,
-                                                        userid, 1, 0);
-                                       edg_wll_ResetError(ctx);
-                                       
-                               }
-                       }
-               }
-               else {
-                       /* any other event than JobReg */
-                       trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'",
-                               unique, unique);
-                       /* does the job exists ? now we require registration on proxy too */
-                       if (edg_wll_ExecSQL(ctx,q,NULL) < 0) {
                                edg_wll_SetError(ctx, ENOENT, "job not registered");
-                                goto err;
+                               goto err;
                        }
                }
+
+               subj = strdup( ctx->isProxy ? event->any.user : can_peername);
+               userid = strdup(strmd5(subj, NULL));
+               if (store_user(ctx,userid,subj)) goto err;
+               if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
+                       userid, ctx->isProxy, local_job ? ctx->serverRunning : 0, grey, 0 )) goto err;
+               *register_to_JP = 1;
        }
        else {
-               /* event arrived on LB port */
-               if (event->any.type == EDG_WLL_EVENT_REGJOB) {
-                       trio_asprintf(&q,"select cert_subj from jobs,users where jobs.jobid='%|Ss'"
-                                       " AND jobs.userid=users.userid",unique);
-                       if ( (nar = edg_wll_ExecSQL(ctx,q,&stmt)) < 0 || edg_wll_FetchRow(ctx,stmt,1,NULL,&owner) < 0 ) { 
-                               goto err;
-                       }
-                       free(q); q = NULL;
-                       
-                       if (nar) {
-                               /* job is already registered */
-                               if (!strcmp(owner,"unknown_to_proxy")) {
-                                       /* proxy registration was already done */
-                                       userid = strdup(strmd5(can_peername, NULL));
-                                       if (store_user(ctx,userid,can_peername)) goto err;
-
-                                       trio_asprintf(&q,"update jobs set server=1, userid='%|Ss' where jobid='%|Ss'",
-                                               userid, unique);
-                                       
-                                       edg_wll_ExecSQL(ctx, q, NULL);
-                               }
-                               else { } /* re-registration, no action needed */
-                       }
-                       else {
-                               userid = strdup(strmd5(can_peername, NULL));
-                               if (store_user(ctx,userid,can_peername)) goto err;
+               /* Job already registered */
 
-                               if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
-                                               userid, 0, 1)) goto err;
-                       }
+               if (edg_wll_FetchRow(ctx,stmt,sizeof(res)/sizeof(res[0]),NULL,res) < 0) goto err;
+               if (ctx->greyjobs && !strcmp(res[2],"1") && 
+                       (event->any.type == EDG_WLL_EVENT_REGJOB) && 
+                       (event->any.priority & EDG_WLL_LOGFLAG_DIRECT)) 
+               {
+                       subj = strdup(ctx->isProxy ? event->any.user : can_peername);
+                       userid = strdup(strmd5(subj, NULL));
+                       if (store_user(ctx,userid,subj)) goto err;
+                       if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
+                               userid, (ctx->isProxy || !strcmp(res[0],"1")), 
+                               !strcmp(res[1],"1") || (local_job ? ctx->serverRunning : 0), 0, 1)) goto err;
+                       *register_to_JP = 1;
+                               
                }
                else {
-                       /* any other event than JobReg  */
-                       /* no action needed             */
+                       // if (!strcmp(res[0],"1") && !strcmp(res[1],"1") ) /*nothing to do */;
+                       if ( (!strcmp(res[0],"0") && ctx->isProxy) || (!strcmp(res[1],"0") && !ctx->isProxy) ) {
+                               trio_asprintf(&q,"update jobs set server='1', proxy='1' where jobid='%|Ss'",
+                                       unique);
+                       }
                }
+
+               /* ??? test whether user from proxy is the same as user from server ??? */
        }
-       
+
 err:
+       free(res[0]); free(res[1]); free(res[3]);
        if (stmt) glite_lbu_FreeStmt(&stmt);
-       free(unique);
+       free(subj);
        free(userid);
        free(q);
 
        return edg_wll_Error(ctx,NULL,NULL);
-}
 
+}
 
-/*
- * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated
- *     column in EVENTS.
- *
- *     don't want to change the database structure now, will be done anyway
- *     soon
- */
-static int store_seq(edg_wll_Context ctx,edg_wll_Event *e,int no)
-{
-       int     ret;
-       char    *stmt;
-       char    *jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
-
-       edg_wll_ResetError(ctx);
-       trio_asprintf(&stmt,"insert into short_fields(jobid,event,name,value) "
-                       "values ('%|Ss',%d,'SEQCODE','%|Ss')",
-                       jobid,no,e->any.seqcode);
-
-       ret = edg_wll_ExecSQL(ctx,stmt,NULL);
-       free(stmt);
-       free(jobid);
 
-       return ret>=0 ? 0 : edg_wll_Error(ctx,NULL,NULL);
-}
 
 #define SHORT_LEN      255     /* short_fiels.value db column lenght */
 
@@ -705,19 +539,12 @@ static int check_auth(edg_wll_Context ctx,edg_wll_Event *e)
        char    *jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
        char    *q = NULL,*owner = NULL;
        glite_lbu_Statement     stmt = NULL;
-       char    *user;
 
        edg_wll_ResetError(ctx);
 
        if (!ctx->isProxy && !ctx->peerName) 
                return edg_wll_SetError(ctx,EPERM,"can't store using unauthenticated connection");
 
-#if 0
-       if (e->type == EDG_WLL_EVENT_REGJOB) 
-               return strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT) ?
-                       0 : edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously");
-#endif
-
        trio_asprintf(&q,"select u.cert_subj from jobs j, users u "
                                "where j.jobid='%|Ss' and u.userid=j.userid",jobid);
 
@@ -726,14 +553,7 @@ static int check_auth(edg_wll_Context ctx,edg_wll_Event *e)
        ) goto clean;
 
        if (!owner) {
-               if ( ctx->isProxy )
-                       edg_wll_SetError(ctx, EINVAL, "Job not registered");
-               else
-                       /* We have to let the calling function know what happened here
-                        * even if it hapens inside the LB Proxy which shouldn't consider
-                        * this as an error
-                        */
-                       edg_wll_SetError(ctx, ENOENT, "job not registered");
+               edg_wll_SetError(ctx, ENOENT, "job not registered");
                goto clean;
        }
 
@@ -760,7 +580,7 @@ clean:
 }
 
 #ifndef LB_DAG_EMBRIONIC
-static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
+int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 {
        int     i,err;
        edg_wlc_JobId   *subjobs;
@@ -856,7 +676,7 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 
 static edg_wll_ErrorCode states_values_embryonic(
        edg_wll_Context ctx,
-       edg_wlc_JobId jobid,
+       glite_jobid_const_t jobid,
        const edg_wll_RegJobEvent *e,
        char **icnames,
        char **values)
@@ -898,21 +718,22 @@ err:
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
-static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e, const char *userid)
+int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 {
-       int                     i, err = 0;
-       edg_wlc_JobId           *subjobs;
+       int                     i, j,  err = 0;
+       edg_wlc_JobId           *subjobs = NULL;
        struct timeval          now;
-       char                    *jobid_md5, *jobid_md5_old;
+       char                    *jobid = NULL, *jobid_md5 = NULL, *jobid_md5_old = NULL;
        size_t                  jobid_len;
 #ifdef LB_BUF
        glite_lbu_bufInsert     bi_j;
        glite_lbu_bufInsert     *bi_jobs = &bi_j;
-               char                    *states_cols;
+               char                    *states_cols = NULL;
 #endif
        glite_lbu_bufInsert     bi_s, *bi_states = &bi_s;
-       char                    *icnames, *values;
+       char                    *icnames = NULL, *values = NULL, *userid = NULL, *stmt = NULL;
        int                     server, proxy, membership = 0;
+       glite_lbu_Statement     sh = NULL;
 
 
        edg_wll_ResetError(ctx);
@@ -952,10 +773,16 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
        ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
        if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
 
-       membership = edg_wll_jobMembership(ctx, e->jobId);
+       if ((membership = edg_wll_jobMembership(ctx, e->jobId)) < 0) goto err;
+
        proxy = membership & DB_PROXY_JOB;
        server = membership & DB_SERVER_JOB;
 
+       /* get userid of parent job */
+       jobid = edg_wlc_JobIdGetUnique(e->jobId);
+       trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid);
+       if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(ctx,sh,1,NULL,&userid) < 0) goto err;
+
        for (i=0; i<e->nsubjobs; i++) {
                char            *et,*ed,*job_s,*p,*p1;
 
@@ -963,9 +790,9 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
 #ifdef LB_BUF
                if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs, proxy, server)))
 #else
-               if ((err = store_job(ctx, subjobs[i], userid, proxy, server)))
+               if ((err = store_job(ctx, subjobs[i], userid, proxy, server, 0, 0)))
 #endif
-                       edg_wll_Error(ctx,&et,&ed);
+                       if (edg_wll_Error(ctx,&et,&ed) == EDEADLOCK) goto err;
 
                /* interchange variable parts (jobids) in values */
                /* there are only two occurences of subjob jobid */
@@ -985,10 +812,6 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
                if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states)))
                        edg_wll_Error(ctx,&et,&ed);
 
-//job_s = edg_wlc_JobIdUnparse(subjobs[i]);
-//printf("%s\n", job_s);
-//free(job_s);
-
                if (err) {
                        job_s = edg_wlc_JobIdUnparse(subjobs[i]);
                        fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed);
@@ -999,10 +822,15 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
                edg_wlc_JobIdFree(subjobs[i]);
        }
 
+err:
        free(jobid_md5_old);    //free the last one
        free(icnames);
        free(values);
+       /* free the rest of subjobs if DEADLOCK occurs */
+       for (j=i; j<e->nsubjobs; j++) edg_wlc_JobIdFree(subjobs[i]);
        free(subjobs);
+       if (sh) glite_lbu_FreeStmt(&sh);
+       free(stmt);
 
 #ifdef LB_BUF
        /* commit the rest of multirows insert and clean structures */
@@ -1050,3 +878,19 @@ int edg_wll_delete_event(edg_wll_Context ctx,const char *jobid,int event)
 
        return edg_wll_Error(ctx,NULL,NULL);
 }
+
+
+/* XXX: if event type is user tag, convert the tag name to lowercase!
+ *       (not sure whether to convert a value too is reasonable
+ *       or keep it 'case sensitive')
+ */
+static void lowercase_usertag(edg_wll_Event *ev)
+{
+       int i;
+
+       if ( ev->any.type == EDG_WLL_EVENT_USERTAG ) {
+
+               for ( i = 0; ev->userTag.name[i] != '\0'; i++ )
+               ev->userTag.name[i] = tolower(ev->userTag.name[i]);
+       }
+}
index 61063a3..799b0de 100644 (file)
@@ -21,7 +21,7 @@ int edg_wll_UserJobsServer(
        char    *userid, *stmt = NULL,
                *res = NULL;
        char    *can_peername;
-       int     njobs = 0,ret,i,j;
+       int     njobs = 0,ret,i,j,idx;
        edg_wlc_JobId   *out = NULL;
        glite_lbu_Statement     sth = NULL;
        edg_wll_ErrorCode       err = 0;
@@ -52,7 +52,7 @@ int edg_wll_UserJobsServer(
        free(stmt); stmt = NULL;
        free(res); res = NULL;
 
-       trio_asprintf(&stmt,"select dg_jobid from jobs where userid = '%|Ss'",userid);
+       trio_asprintf(&stmt,"select dg_jobid from jobs where userid = '%|Ss' and grey='0'",userid);
        switch (njobs = edg_wll_ExecSQL(ctx,stmt,&sth)) {
                case 0: edg_wll_SetError(ctx,ENOENT,ctx->peerName);
                case -1: goto err;
@@ -71,12 +71,18 @@ int edg_wll_UserJobsServer(
        }
 
        *states = calloc(njobs, sizeof(**states));
+       idx = 0;
        for (i = 0; i < njobs; i++) {
-               if (edg_wll_JobStatusServer(ctx, out[i], -1, &(*states)[i]) != 0) {
+               if (edg_wll_JobStatusServer(ctx, out[idx], -1, &(*states)[idx]) != 0) {
                        for (j = 0; j < i; j++) edg_wll_FreeStatus(&(*states)[j]);
                        *states = NULL;
-                       break;
+                       if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+                               /* some jobs may be purged meanwhile, ignore */
+                               continue;
+                       }
+                       else break;
                }
+               idx++;
        }
 err:
        free(res);