lb.dualreg stuff:
authorJan Pospíšil <honik@ntc.zcu.cz>
Thu, 26 Jul 2007 09:25:52 +0000 (09:25 +0000)
committerJan Pospíšil <honik@ntc.zcu.cz>
Thu, 26 Jul 2007 09:25:52 +0000 (09:25 +0000)
- LOGFLAG_*defines  renamed to EDG_WLL_LOGFLAG_* and moved to events.h
- Set proper priority flag on client side (dualregjob event has both EDG_WLL_LOGFLAG_PROXY and EDG_WLL_LOGFLAG_DIRECT)
- on LB proxy, do not send to bkserver the events that has already the flag EDG_WLL_LOGFLAG_DIRECT, i.e. those already sent
- still missing: storing priority flags to DB, requires addition to DB schema

org.glite.lb.client/src/producer.c
org.glite.lb.common/interface/events.h.T
org.glite.lb.server/src/db_store.c

index b3617a9..3d40357 100644 (file)
 #include "glite/lb/prod_proto.h"
 #include "glite/lb/consumer.h" // for QuerySequenceCode
 
-/* XXX: paralel registration is disabled until the race condition (via proxy first)
- * job owner assignment is solved */
-
-#define LB_SERIAL_REG
-
 #ifdef FAKE_VERSION
 int edg_wll_DoLogEvent(edg_wll_Context ctx, edg_wll_LogLine logline);
 int edg_wll_DoLogEventProxy(edg_wll_Context ctx, edg_wll_LogLine logline);
@@ -210,18 +205,12 @@ edg_wll_DoLogEventDirect_end:
 
 #endif /* FAKE_VERSION */
 
-#define        LOGFLAG_ASYNC   0 /**< asynchronous logging */
-#define        LOGFLAG_SYNC    1 /**< synchronous logging */
-#define        LOGFLAG_NORMAL  2 /**< logging to local logger */
-#define        LOGFLAG_PROXY   4 /**< logging to L&B Proxy */
-#define        LOGFLAG_DIRECT  8 /**< logging directly to bkserver */
-
 /**
  *----------------------------------------------------------------------
  * Formats a logging message 
  * \brief formats a logging message
  * \param[in,out] ctx  context to work with,
- * \param[in] flags            as defined by LOGFLAG_*
+ * \param[in] flags            as defined by EDG_WLL_LOGFLAG_*
  * \param[in] event            type of the event,
  * \param[out] logline         formated logging message
  * \param[in] fmt              printf()-like format string,
@@ -248,7 +237,7 @@ static int edg_wll_FormatLogLine(
 
        i = errno = size = ret = 0;
        seq = fix = var = dguser = out = source = eventName = lvl = fullid = NULL;
-       priority = flags & LOGFLAG_SYNC;
+       priority = flags;
 
        edg_wll_ResetError(ctx);
 
@@ -281,7 +270,7 @@ static int edg_wll_FormatLogLine(
                goto edg_wll_formatlogline_end; 
        }
        /* TODO: add always, probably new ctx->p_user */
-       if ( ( (flags & LOGFLAG_PROXY) || (flags & LOGFLAG_DIRECT) ) && 
+       if ( ( (flags & EDG_WLL_LOGFLAG_PROXY) || (flags & EDG_WLL_LOGFLAG_DIRECT) ) && 
           (ctx->p_user_lbproxy) ) {
                if (trio_asprintf(&dguser,EDG_WLL_FORMAT_USER,ctx->p_user_lbproxy) == -1) {
                        edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_asprintf() error"); 
@@ -306,7 +295,7 @@ static int edg_wll_FormatLogLine(
        }
 
 #ifdef EDG_WLL_LOG_STUB
-//     fprintf(stderr,"edg_wll_FormatLogLine (%d chars): %s",size,out);
+       fprintf(stderr,"edg_wll_FormatLogLine (%d chars): %s",size,out);
 #endif
        if (out) {
                *logline = out;
@@ -333,7 +322,7 @@ edg_wll_formatlogline_end:
  * Formats a logging message and sends it to local-logger
  * \brief master logging event function
  * \param[in,out] ctx          context to work with,
- * \param[in] flags            as defined by LOGFLAG_*
+ * \param[in] flags            as defined by EDG_WLL_LOGFLAG_*
  * \param[in] event            type of the event,
  * \param[in] fmt              printf()-like format string,
  * \param[in] ...              event specific values/data according to fmt.
@@ -350,7 +339,7 @@ static int edg_wll_LogEventMaster(
        int     ret;
        edg_wll_LogLine in = NULL, out = NULL;
 
-       priority = flags & LOGFLAG_SYNC;
+       priority = flags;
 
        edg_wll_ResetError(ctx);
 
@@ -385,13 +374,13 @@ static int edg_wll_LogEventMaster(
 
    /* and send the message */ 
 #ifndef LB_PERF_DROP
-       if (flags & LOGFLAG_NORMAL) {
+       if (flags & EDG_WLL_LOGFLAG_LOCAL) {
                /* to the local-logger: */
                ret = edg_wll_DoLogEvent(ctx, out);
-       } else if (flags & LOGFLAG_PROXY) {
+       } else if (flags & EDG_WLL_LOGFLAG_PROXY) {
                /* to the L&B Proxy: */
                ret = edg_wll_DoLogEventProxy(ctx, out);
-       } else if (flags & LOGFLAG_DIRECT) {
+       } else if (flags & EDG_WLL_LOGFLAG_DIRECT) {
                /* directly to the bkserver: */
                ret = edg_wll_DoLogEventDirect(ctx, out);
        } else {
@@ -437,7 +426,7 @@ int edg_wll_LogEvent(
                goto edg_wll_logevent_end; 
        }
 
-       ret=edg_wll_LogEventMaster(ctx,LOGFLAG_NORMAL | LOGFLAG_ASYNC,event,"%s",list);
+       ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_ASYNC,event,"%s",list);
 
 edg_wll_logevent_end:
        va_end(fmt_args);
@@ -472,7 +461,7 @@ int edg_wll_LogEventSync(
                goto edg_wll_logeventsync_end; 
        }
 
-       ret=edg_wll_LogEventMaster(ctx,LOGFLAG_NORMAL | LOGFLAG_SYNC,event,"%s",list);
+       ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_SYNC,event,"%s",list);
 
 edg_wll_logeventsync_end:
        va_end(fmt_args);
@@ -507,7 +496,7 @@ int edg_wll_LogEventProxy(
                 goto edg_wll_logevent_end;
         }
 
-        ret=edg_wll_LogEventMaster(ctx,LOGFLAG_PROXY | LOGFLAG_SYNC, event,"%s",list);
+        ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_SYNC, event,"%s",list);
 
 edg_wll_logevent_end:
         va_end(fmt_args);
@@ -787,32 +776,59 @@ static int edg_wll_RegisterJobMaster(
        }
        parent_s = parent ? edg_wlc_JobIdUnparse(parent) : strdup("");
 
-       if (flags & LOGFLAG_DIRECT) {
-               /* SetLoggingJob and log directly the message */
+       if ( (flags & EDG_WLL_LOGFLAG_DIRECT & EDG_WLL_LOGFLAG_LOCAL) ||
+            (flags & EDG_WLL_LOGFLAG_PROXY & EDG_WLL_LOGFLAG_LOCAL) ||
+            !(flags | EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_LOCAL) ) {
+               edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): wrong flag specified");
+               goto edg_wll_registerjobmaster_end;
+       }
+       if (flags & EDG_WLL_LOGFLAG_DIRECT) {
+               /* SetLoggingJob and log the message directly to bkserver */
                if (edg_wll_SetLoggingJob(ctx,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) {
-                       edg_wll_LogEventMaster(ctx,LOGFLAG_DIRECT | LOGFLAG_SYNC,
-                               EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB,
-                               (char *)jdl,ns,parent_s,type_s,num_subjobs,seed);
+                       err = edg_wll_LogEventMaster(ctx, flags,
+                               EDG_WLL_EVENT_REGJOB, EDG_WLL_FORMAT_REGJOB,
+                               (char *)jdl, ns, parent_s, type_s, num_subjobs, seed);
+                       if (err) {
+                               edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to register with bkserver");
+                               goto edg_wll_registerjobmaster_end;
+                       }
+               } else {
+                       edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to set logging job (direct)");
+                       goto edg_wll_registerjobmaster_end;
                }
-       } else if (flags & LOGFLAG_PROXY) {
-               /* SetLoggingJobProxy and and log to proxy */
+       } 
+       if (flags & EDG_WLL_LOGFLAG_PROXY) {
+               /* SetLoggingJobProxy and log the message to lbproxy */
                edg_wll_SetSequenceCode(ctx, NULL, EDG_WLL_SEQ_NORMAL);
                if (seq) free(seq);
                seq = edg_wll_GetSequenceCode(ctx);
                if (edg_wll_SetLoggingJobProxy(ctx,job,seq,NULL,EDG_WLL_SEQ_NORMAL) == 0) {
-                       edg_wll_LogEventMaster(ctx,LOGFLAG_PROXY | LOGFLAG_SYNC,
-                               EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB,
-                               (char *)jdl,ns,parent_s,type_s,num_subjobs,seed);
+                       err = edg_wll_LogEventMaster(ctx, flags,
+                               EDG_WLL_EVENT_REGJOB, EDG_WLL_FORMAT_REGJOB,
+                               (char *)jdl, ns, parent_s, type_s, num_subjobs, seed);
+                       if (err) {
+                               edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to register with lbproxy");
+                               goto edg_wll_registerjobmaster_end;
+                       }
+               } else {
+                       edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to set logging job (proxy)");
+                       goto edg_wll_registerjobmaster_end;
                }
-       } else if (flags & LOGFLAG_NORMAL) {
-               /* SetLoggingJob and log normally the message through the local-logger */
+       } 
+       if (flags & EDG_WLL_LOGFLAG_LOCAL) {
+               /* SetLoggingJob and log the message through the local-logger */
                if (edg_wll_SetLoggingJob(ctx,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) {
-                       edg_wll_LogEventMaster(ctx, LOGFLAG_NORMAL,
-                               EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB,
-                               (char *)jdl,ns,parent_s,type_s,num_subjobs,seed);
+                       err = edg_wll_LogEventMaster(ctx, flags,
+                               EDG_WLL_EVENT_REGJOB, EDG_WLL_FORMAT_REGJOB,
+                               (char *)jdl, ns, parent_s, type_s, num_subjobs, seed);
+                       if (err) {
+                               edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to register through local-logger");
+                               goto edg_wll_registerjobmaster_end;
+                       }
+               } else {
+                       edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to set logging job (local)");
+                       goto edg_wll_registerjobmaster_end;
                }
-       } else {
-               edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): wrong flag specified");
        }
 
 edg_wll_registerjobmaster_end:
@@ -839,7 +855,7 @@ int edg_wll_RegisterJobSync(
         const char *            seed,
         edg_wlc_JobId **        subjobs)
 {
-       return edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
+       return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_SYNC,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
 }
 
 /**
@@ -858,7 +874,7 @@ int edg_wll_RegisterJob(
         const char *            seed,
         edg_wlc_JobId **        subjobs)
 {
-       return edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
+       return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_ASYNC,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
 }
 
 #ifndef LB_SERIAL_REG
@@ -926,7 +942,7 @@ int edg_wll_RegisterJobProxy(
        }
 
        /* format the RegJob event message */
-       if (edg_wll_FormatLogLine(ctx,LOGFLAG_SYNC | LOGFLAG_PROXY | LOGFLAG_PROXY,
+       if (edg_wll_FormatLogLine(ctx,EDG_WLL_LOGFLAG_SYNC | EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY,
                EDG_WLL_EVENT_REGJOB,&logline,
                EDG_WLL_FORMAT_REGJOB,(char *)jdl,ns,"",type_s,num_subjobs,seed) != 0 ) {
                edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_FormatLogLine() error");
@@ -1041,14 +1057,8 @@ int edg_wll_RegisterJobProxyOld(
         const char *            seed,
         edg_wlc_JobId **        subjobs)
 {
-       /* first register with bkserver */
-       int ret = edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
-       if (ret) {
-               edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobProxyOld(): unable to register with bkserver");
-               return edg_wll_Error(ctx,NULL,NULL);
-       }
-       /* and then with L&B Proxy */
-       return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
+       /* first register with bkserver and then with L&B Proxy */
+       return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
 }
 
 #else /* LB_SERIAL_REG */
@@ -1070,21 +1080,12 @@ int edg_wll_RegisterJobProxy(
         const char *            seed,
         edg_wlc_JobId **        subjobs)
 {
-       /* first register with bkserver */
-       int ret = edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
-       if (ret) {
-               edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobProxy(): unable to register with bkserver");
-               return edg_wll_Error(ctx,NULL,NULL);
-       }
-       /* and then with L&B Proxy */
-       return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
+       /* first register with bkserver and then with L&B Proxy */
+       return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
 }
 
 #endif /* LB_SERIAL_REG */
 
-
-#ifndef LB_SERIAL_REG
-
 /**
  *-----------------------------------------------------------------------
  * Register one job with L&B Proxy service ONLY
@@ -1102,11 +1103,9 @@ int edg_wll_RegisterJobProxyOnly(
         const char *            seed,
         edg_wlc_JobId **        subjobs)
 {
-       return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
+       return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs);
 }
 
-#endif /* LB_SERIAL_REG */
-
 /**
  *-----------------------------------------------------------------------
  * Register one subjob with L&B service
@@ -1127,9 +1126,9 @@ int edg_wll_RegisterSubjob(
 {
 /* XXX: what is that ? */
 #ifdef LB_PERF
-       return edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,parent,num_subjobs,seed,subjobs);
+       return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT,job,type,jdl,ns,parent,num_subjobs,seed,subjobs);
 #else
-       return edg_wll_RegisterJobMaster(ctx,LOGFLAG_NORMAL,job,type,jdl,ns,parent,num_subjobs,seed,subjobs);
+       return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_LOCAL,job,type,jdl,ns,parent,num_subjobs,seed,subjobs);
 #endif
 }
 
@@ -1151,7 +1150,7 @@ int edg_wll_RegisterSubjobProxy(
         const char *            seed,
         edg_wlc_JobId **        subjobs)
 {
-       return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,parent,num_subjobs,seed,subjobs);
+       return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,parent,num_subjobs,seed,subjobs);
 }
 
 /**
@@ -1259,7 +1258,7 @@ int edg_wll_ChangeACL(
                enum edg_wll_ACLOperation       operation)
 {
        if ( edg_wll_SetLoggingJob(ctx, jobid, NULL, EDG_WLL_SEQ_NORMAL) == 0 ) {
-               edg_wll_LogEventMaster(ctx, LOGFLAG_NORMAL | LOGFLAG_SYNC, 
+               edg_wll_LogEventMaster(ctx, EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_SYNC, 
                        EDG_WLL_EVENT_CHANGEACL, EDG_WLL_FORMAT_CHANGEACL,
                        user_id, user_id_type, permission, permission_type, operation);
        }
index ffdb97f..65afcc8 100644 (file)
@@ -385,6 +385,15 @@ for my $t (sort { $event->{order}->{$a} <=> $event->{order}->{$b} }
 #define EDG_WLL_FORMAT_SYSCMPSTAT      "DG.SCHED.STATUS=\"%|Us\" "
 #define EDG_WLL_FORMAT_SYSCLSTAT       "DG.SCHED.NODE=\"%|Us\" DG.SCHED.STATUS=\"%|Us\" "
 
+
+/* Flags (priority bits) defining the type of logging */
+
+#define EDG_WLL_LOGFLAG_ASYNC   0 /**< asynchronous logging */
+#define EDG_WLL_LOGFLAG_SYNC    1 /**< synchronous logging */
+#define EDG_WLL_LOGFLAG_LOCAL   2 /**< logging to local logger */
+#define EDG_WLL_LOGFLAG_PROXY   4 /**< logging to L&B Proxy */
+#define EDG_WLL_LOGFLAG_DIRECT  8 /**< logging directly to bkserver */
+
 /* enums for the ChangeACL event */
 
 enum edg_wll_Permission {
index 4a0ea51..83a88cb 100644 (file)
@@ -6,6 +6,7 @@
 #include <stdlib.h>
 
 #include "glite/lb/context-int.h"
+#include "glite/lb/events.h"
 #include "glite/lb/events_parse.h"
 #include "glite/lb/lb_maildir.h"
 #include "purge.h"
@@ -190,9 +191,8 @@ static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev,
   if ( ctx->isProxy ) {
        /*
         *      send event to the proper BK server
+        *      event with priority flag EDG_WLL_LOGFLAG_DIRECT (typically RegJob) is not sent
         */
-       /* XXX: RegJob events, which were logged also directly, are duplicated at server,
-               but it should not harm */
 
 #ifdef LB_PERF
        if( sink_mode == GLITE_LB_SINK_SEND ) {
@@ -200,8 +200,16 @@ static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev,
        } else
 #endif
 
-       if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) )  {
-               return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
+       /* XXX: ending here may break the backward compatibility */
+       if (!(ev->any.priority & EDG_WLL_LOGFLAG_PROXY)) {
+               edg_wll_UpdateError(ctx, 0, "db_actual_store() WARNING: the event is not PROXY");
+               //return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "db_actual_store() ERROR: the event is not PROXY");
+       }
+
+       if (!(ev->any.priority & EDG_WLL_LOGFLAG_DIRECT)) {
+               if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) )  {
+                       return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
+               }
        }
 
        /* LB proxy purge