reshuffle of db_store_finalize
authorMiloš Mulač <mulac@civ.zcu.cz>
Tue, 18 Dec 2007 10:40:27 +0000 (10:40 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Tue, 18 Dec 2007 10:40:27 +0000 (10:40 +0000)
- enable registration of local subjobs
- propagate regitrations of subjobs to JP
- correct trigerring of notifications

org.glite.lb.server/interface/store.h
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/store.c.T

index a9b50c8..bb0bb38 100644 (file)
@@ -45,7 +45,8 @@ int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *);
 int handle_request(edg_wll_Context,char *);
 int create_reply(const edg_wll_Context,char **);
 int trans_db_store(edg_wll_Context,char *,edg_wll_Event *,intJobStat *);
-int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort);
+int is_job_local(edg_wll_Context, edg_wlc_JobId jobId);
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event);
 
 int edg_wll_delete_event(edg_wll_Context,const char *, int);
 
index cc4bf3b..8af2ed8 100644 (file)
@@ -1214,6 +1214,8 @@ int bk_handle_connection_proxy(int conn, struct timeval *timeout, void *data)
        
        /*      set globals
         */
+       ctx->notifDuration = notif_duration;
+       if ( jpreg ) ctx->jpreg_dir = strdup(jpregDir); else ctx->jpreg_dir = NULL;
        ctx->allowAnonymous = 1;
        ctx->isProxy = 1;
        ctx->noAuth = 1;
@@ -1221,6 +1223,7 @@ int bk_handle_connection_proxy(int conn, struct timeval *timeout, void *data)
        ctx->semset = semset;
        ctx->semaphores = semaphores;
 
+
        if (fake_host)
        {
                ctx->srvName = strdup(fake_host);
index 2032bfa..6102b4f 100644 (file)
 extern int unset_proxy_flag(edg_wll_Context, edg_wlc_JobId);
 extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *);
 
-static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq);
+static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP);
 
 
 int
 db_store(edg_wll_Context ctx,char *ucs, char *event)
 {
   edg_wll_Event *ev;
-  int  seq;
+  int  seq, reg_to_JP = 0;
   int   err;
+  int  local_job;
   edg_wll_JobStat      newstat;
-  char                         *srvName = NULL;
-  unsigned int         srvPort;
 
 
   ev = NULL;
@@ -49,6 +48,7 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
 
   if(edg_wll_ParseEvent(ctx, event, &ev))
     goto err;
+  local_job = is_job_local(ctx, ev->any.jobId);
 
 #ifdef LB_PERF
   if (sink_mode == GLITE_LB_SINK_STORE) {
@@ -59,14 +59,12 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
   }
 #endif
 
-  edg_wlc_JobIdGetServerParts(ev->any.jobId, &srvName, &srvPort);
-
   if(use_db) {
     char       *ed;
     int                code;
 
     if (edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
-    store_job_server_proxy(ctx, ev, srvName, srvPort);
+    store_job_server_proxy(ctx, ev);
     code = edg_wll_Error(ctx,NULL,&ed);
     edg_wll_UnlockJob(ctx,ev->any.jobId);      /* XXX: ignore error */
     if (code) {
@@ -81,12 +79,15 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
    * if jobid prefix hostname matches server hostname -> they will
    * sooner or later arrive to server too and are stored in common DB 
    */
-  if (ctx->isProxy && ctx->serverRunning && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) ) {
-       if (!strcmp(ctx->srvName, srvName)) {
-               free(srvName);
+  if (ctx->isProxy && local_job) {
+       if  (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
                return 0;
        }
-
+       else {
+               /* these are re-registrations of subjobs on proxy               */
+               /* embryonic registrations does not trigger registration in JP  */
+               reg_to_JP = 1;
+       }
   }
 
   /* XXX: if event type is user tag, convert the tag name to lowercase!
@@ -138,7 +139,7 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
    */
   if (err) goto err;
 
-  db_store_finalize(ctx, event, ev, &newstat, seq);
+  db_store_finalize(ctx, event, ev, &newstat, seq, reg_to_JP);
 
 err:
 
@@ -147,8 +148,6 @@ err:
     free(ev);
   }
 
-  if (srvName) free(srvName);
-
   if ( newstat.state ) edg_wll_FreeStatus(&newstat);
 
 
@@ -204,7 +203,7 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is)
     assert(event);
   }
 
-  db_store_finalize(ctx, event, ev, &newstat, seq);
+  db_store_finalize(ctx, event, ev, &newstat, seq, 0);
 
 err:
 
@@ -216,63 +215,22 @@ err:
 
 
 
-static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq) {
-printf("%d\n", seq);
-  if ( ctx->isProxy ) {
-       /*
-        *      send event to the proper BK server
-        *      event with priority flag EDG_WLL_LOGFLAG_DIRECT (typically RegJob) is not sent
-        */
-
-#ifdef LB_PERF
-       if( sink_mode == GLITE_LB_SINK_SEND ) {
-               glite_wll_perftest_consumeEvent(ev);
-       } else
-#endif
-
-       /* XXX: ending here may break the backward compatibility */
-       if (!(ev->any.priority & EDG_WLL_LOGFLAG_PROXY)) {
-               edg_wll_UpdateError(ctx, 0, "db_actual_store() WARNING: the event is not PROXY");
-               //return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "db_actual_store() ERROR: the event is not PROXY");
-       }
+static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP) 
+{
+       int     local_job = is_job_local(ctx, ev->any.jobId);
 
-       if (!(ev->any.priority & EDG_WLL_LOGFLAG_DIRECT)) {
-               if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) )  {
-                       return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
-               }
-       }
 
-       /* LB proxy purge */
-       if (newstat->remove_from_proxy) {
-                       edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
-       }
-  } else 
 #ifdef LB_PERF
        if( sink_mode == GLITE_LB_SINK_SEND ) {
                glite_wll_perftest_consumeEvent(ev);
-       } else 
-#endif
-  {
-       char            *jobIdHost = NULL;
-       unsigned int    jobIdPort;
-
-
-       /* Purge proxy flag */
-       edg_wlc_JobIdGetServerParts(ev->any.jobId, &jobIdHost, &jobIdPort);
-       if ( newstat->remove_from_proxy && (ctx->srvPort == jobIdPort) && 
-               !strcmp(jobIdHost,ctx->srvName) )
-       {
-                       if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
-                                               free(jobIdHost);
-                                               return(edg_wll_Error(ctx,NULL,NULL));
-                       }
+               return edg_wll_Error(ctx,NULL,NULL);
        }
-       free(jobIdHost);
-
-       if ( newstat->state ) {
-               edg_wll_NotifMatch(ctx, newstat);
-       }
-       if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0) {
+#endif
+       
+       /* Send regitration to JP 
+        */
+       if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0 &&
+            (!ctx->isProxy || reg_to_JP) ) {
                char *jids, *msg;
                
                if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
@@ -290,6 +248,51 @@ printf("%d\n", seq);
                }
                free(msg);
        }
-  }
-  return edg_wll_Error(ctx,NULL,NULL);
+
+
+       if ( ctx->isProxy ) {
+               /*
+                *      send event to the proper BK server
+                *      event with priority flag EDG_WLL_LOGFLAG_DIRECT (typically RegJob) is not sent
+                */
+
+               /* XXX: ending here may break the backward compatibility */
+               if (!(ev->any.priority & EDG_WLL_LOGFLAG_PROXY)) {
+                       edg_wll_UpdateError(ctx, 0, "db_actual_store() WARNING: the event is not PROXY");
+                       //return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "db_actual_store() ERROR: the event is not PROXY");
+               }
+
+               if (!(ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) && !local_job) {
+                       if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) )  {
+                               return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
+                       }
+               }
+               else {
+                       /* event will not arrive to server, only flag was set           */
+                       /* check whether some pending notifications are not triggered   */
+                       if ( newstat->state ) {
+                               edg_wll_NotifMatch(ctx, newstat);
+                       }
+               }
+                       
+               /* LB proxy purge */
+               if (newstat->remove_from_proxy) {
+                               edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
+               }
+       } else 
+       {
+               /* Purge proxy flag */
+               if ( newstat->remove_from_proxy && local_job ) {
+                       if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
+                               return(edg_wll_Error(ctx,NULL,NULL));
+                       }
+               }
+
+               if ( newstat->state ) {
+                       edg_wll_NotifMatch(ctx, newstat);
+               }
+       }
+       
+
+       return edg_wll_Error(ctx,NULL,NULL);
 }
index c409918..2be77e3 100644 (file)
@@ -355,14 +355,29 @@ static int store_job_grey(edg_wll_Context ctx,const edg_wlc_JobId job,time_t eti
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+/* test whether job shares LB proxy and server DB or not */
+int is_job_local(edg_wll_Context ctx, edg_wlc_JobId jobId) 
+{
+       char            *srvName = NULL;
+       unsigned int    srvPort; 
+       int             ret;    
+
+
+       edg_wlc_JobIdGetServerParts(jobId, &srvName, &srvPort);
+       ret = ((ctx->srvPort == srvPort) && !strcmp(srvName,ctx->srvName));
+       free(srvName);
+
+       return(ret);
+}
 
-int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort)
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event)
 {      
        char            *unique = edg_wlc_JobIdGetUnique(event->any.jobId);
        char            *q = NULL, *owner = NULL, *userid = NULL;
        glite_lbu_Statement    stmt = NULL;
        int             nar;
        char            *can_peername = NULL;
+       int             local_job = is_job_local(ctx, event->any.jobId);
 
 
        edg_wll_ResetError(ctx);
@@ -374,7 +389,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobI
                if (event->any.type == EDG_WLL_EVENT_REGJOB) {
                        if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
                         /* first synchronous registration */
-                               if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) {
+                               if (local_job) {
                                        /* we are both server and proxy for this job */
                                        trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'",
                                                unique);
@@ -412,7 +427,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobI
                        }
                        else {
                                /* supplementary re-registration (JDL of subjob, etc.) */
-                               if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) {
+                               if (local_job) {
                                        /* previous registration via GSI required */
                                        trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'",
                                                unique, unique);