separate function for storing CollectionState events
authorMiloš Mulač <mulac@civ.zcu.cz>
Thu, 26 Oct 2006 11:44:11 +0000 (11:44 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Thu, 26 Oct 2006 11:44:11 +0000 (11:44 +0000)
- needs some optimalizations

org.glite.lb.server/interface/store.h
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/request.c

index f2d7dbf..1a66bf0 100644 (file)
@@ -40,7 +40,8 @@ edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic(
         edg_wll_Event *e       /* IN */
 );
 
-int db_store(edg_wll_Context,char *,char *);
+int db_store(edg_wll_Context,char *, char *);
+int db_parent_store(edg_wll_Context, edg_wll_Event *);
 int handle_request(edg_wll_Context,char *);
 int create_reply(const edg_wll_Context,char **);
 
index 2a085f8..2e3769d 100644 (file)
@@ -175,3 +175,141 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
   return edg_wll_Error(ctx,NULL,NULL);
 }
 
+
+int
+db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev)
+{
+  char  *event = NULL;
+  int  seq;
+  int   err;
+  edg_wll_JobStat      newstat;
+
+
+  edg_wll_ResetError(ctx);
+  memset(&newstat,0,sizeof newstat);
+
+
+#ifdef LB_PERF
+  if (sink_mode == GLITE_LB_SINK_STORE) {
+         glite_wll_perftest_consumeEvent(ev);
+         edg_wll_FreeEvent(ev);
+         free(ev);
+         return 0;
+  }
+#endif
+
+
+  /* XXX: if event type is user tag, convert the tag name to lowercase!
+   *     (not sure whether to convert a value too is reasonable
+   *     or keep it 'case sensitive')
+   */
+  if ( ev->any.type == EDG_WLL_EVENT_USERTAG )
+  {
+       int i;
+       for ( i = 0; ev->userTag.name[i] != '\0'; i++ )
+         ev->userTag.name[i] = tolower(ev->userTag.name[i]);
+  }
+  
+  assert(ev->any.user);
+
+  if(use_db) {
+    if(edg_wll_StoreEvent(ctx, ev,&seq))
+      goto err;
+  }
+
+  if ( ev->any.type == EDG_WLL_EVENT_CHANGEACL )
+    err = edg_wll_UpdateACL(ctx, ev->any.jobId,
+                       ev->changeACL.user_id, ev->changeACL.user_id_type,
+                       ev->changeACL.permission, ev->changeACL.permission_type,
+                       ev->changeACL.operation);
+  else {
+#ifdef LB_PERF
+    if(sink_mode == GLITE_LB_SINK_STATE) {
+            glite_wll_perftest_consumeEvent(ev);
+            goto err;
+    }
+#endif
+
+    err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, ctx->isProxy? NULL: &newstat);
+  }
+
+  if (err) goto err;
+
+  if ( ctx->isProxy ) {
+       /*
+        *      send event to the proper BK server
+        */
+       /* XXX: RegJob events, which were logged also directly, are duplicated at server,
+               but it should not harm */
+
+#ifdef LB_PERF
+       if( sink_mode == GLITE_LB_SINK_SEND ) {
+               glite_wll_perftest_consumeEvent(ev);
+       } else
+#endif
+       event = edg_wll_UnparseEvent(ctx, ev);
+       assert(event);
+
+       if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) )  {
+               edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
+               goto err;
+       }
+
+       /* LB proxy purge
+        * XXX: Set propper set of states!
+        * TODO: Do the set of states configurable? 
+        */
+       switch ( ev->any.type ) {
+       case EDG_WLL_EVENT_CLEAR:
+       case EDG_WLL_EVENT_ABORT:
+               edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
+               break;
+       case EDG_WLL_EVENT_CANCEL:
+               if (ev->cancel.status_code == EDG_WLL_CANCEL_DONE) 
+                       edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
+               break;
+       default: break;
+       }
+  } else 
+#ifdef LB_PERF
+       if( sink_mode == GLITE_LB_SINK_SEND ) {
+               glite_wll_perftest_consumeEvent(ev);
+       } else 
+#endif
+  {
+       if ( newstat.state ) {
+               edg_wll_NotifMatch(ctx, &newstat);
+               edg_wll_FreeStatus(&newstat);
+       }
+       if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB ) {
+               char *jids, *msg;
+               
+               if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
+                       edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP");
+                       goto err;
+               }
+               if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) {
+                       free(jids);
+                       edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP");
+                       goto err;
+               }
+               strcat(msg, "\n");
+               strcat(msg, ev->any.user);
+               if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) {
+                       free(msg);
+                       edg_wll_SetError(ctx, errno, lbm_errdesc);
+                       goto err;
+               }
+               free(msg);
+       }
+  }
+
+  free(event);
+
+  return(0);
+
+ err:
+  free(event);
+  
+  return edg_wll_Error(ctx,NULL,NULL);
+}
index 55ce6df..c2b4e1e 100644 (file)
 #else
 #define UNUSED_VAR
 #endif
+
+
+int
+trans_db_store(edg_wll_Context ctx, char *event_data, edg_wll_Event *e)
+{
+  int ret;
+
+  if ((ret = edg_wll_Transaction(ctx) != 0)) goto err;
+
+  if (e) ret = db_parent_store(ctx, e);
+  else ret = db_store(ctx, "NOT USED", event_data);
+
+  if (ret == 0) {
+    if ((ret = edg_wll_Commit(ctx)) != 0) goto err;
+  } else {
+    edg_wll_Rollback(ctx);
+  }
+
+err:
+  return(ret);
+}
     
 int 
 handle_request(edg_wll_Context ctx,char *buf)
@@ -31,15 +52,8 @@ handle_request(edg_wll_Context ctx,char *buf)
     return EDG_WLL_IL_PROTO;
   }
 
-  if ((ret = edg_wll_Transaction(ctx) != 0)) goto err;
-  ret = db_store(ctx, "NOT USED", event.data);
-  if (ret == 0) {
-    if ((ret = edg_wll_Commit(ctx)) != 0) goto err;
-  } else {
-    edg_wll_Rollback(ctx);
-  }
+  ret = trans_db_store(ctx, event.data, NULL);
 
-err:
   if(event.data)
     free(event.data);