From 9a1e007234cc6586eaeaae824e329e21e300942d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jan=20Posp=C3=AD=C5=A1il?= Date: Thu, 26 Jul 2007 09:25:52 +0000 Subject: [PATCH] lb.dualreg stuff: - LOGFLAG_*defines renamed to EDG_WLL_LOGFLAG_* and moved to events.h - Set proper priority flag on client side (dualregjob event has both EDG_WLL_LOGFLAG_PROXY and EDG_WLL_LOGFLAG_DIRECT) - on LB proxy, do not send to bkserver the events that has already the flag EDG_WLL_LOGFLAG_DIRECT, i.e. those already sent - still missing: storing priority flags to DB, requires addition to DB schema --- org.glite.lb.client/src/producer.c | 137 +++++++++++++++---------------- org.glite.lb.common/interface/events.h.T | 9 ++ org.glite.lb.server/src/db_store.c | 16 +++- 3 files changed, 89 insertions(+), 73 deletions(-) diff --git a/org.glite.lb.client/src/producer.c b/org.glite.lb.client/src/producer.c index b3617a9..3d40357 100644 --- a/org.glite.lb.client/src/producer.c +++ b/org.glite.lb.client/src/producer.c @@ -19,11 +19,6 @@ #include "glite/lb/prod_proto.h" #include "glite/lb/consumer.h" // for QuerySequenceCode -/* XXX: paralel registration is disabled until the race condition (via proxy first) - * job owner assignment is solved */ - -#define LB_SERIAL_REG - #ifdef FAKE_VERSION int edg_wll_DoLogEvent(edg_wll_Context ctx, edg_wll_LogLine logline); int edg_wll_DoLogEventProxy(edg_wll_Context ctx, edg_wll_LogLine logline); @@ -210,18 +205,12 @@ edg_wll_DoLogEventDirect_end: #endif /* FAKE_VERSION */ -#define LOGFLAG_ASYNC 0 /**< asynchronous logging */ -#define LOGFLAG_SYNC 1 /**< synchronous logging */ -#define LOGFLAG_NORMAL 2 /**< logging to local logger */ -#define LOGFLAG_PROXY 4 /**< logging to L&B Proxy */ -#define LOGFLAG_DIRECT 8 /**< logging directly to bkserver */ - /** *---------------------------------------------------------------------- * Formats a logging message * \brief formats a logging message * \param[in,out] ctx context to work with, - * \param[in] flags as defined by LOGFLAG_* + * \param[in] flags as defined by EDG_WLL_LOGFLAG_* * \param[in] event type of the event, * \param[out] logline formated logging message * \param[in] fmt printf()-like format string, @@ -248,7 +237,7 @@ static int edg_wll_FormatLogLine( i = errno = size = ret = 0; seq = fix = var = dguser = out = source = eventName = lvl = fullid = NULL; - priority = flags & LOGFLAG_SYNC; + priority = flags; edg_wll_ResetError(ctx); @@ -281,7 +270,7 @@ static int edg_wll_FormatLogLine( goto edg_wll_formatlogline_end; } /* TODO: add always, probably new ctx->p_user */ - if ( ( (flags & LOGFLAG_PROXY) || (flags & LOGFLAG_DIRECT) ) && + if ( ( (flags & EDG_WLL_LOGFLAG_PROXY) || (flags & EDG_WLL_LOGFLAG_DIRECT) ) && (ctx->p_user_lbproxy) ) { if (trio_asprintf(&dguser,EDG_WLL_FORMAT_USER,ctx->p_user_lbproxy) == -1) { edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_FormatLogLine(): trio_asprintf() error"); @@ -306,7 +295,7 @@ static int edg_wll_FormatLogLine( } #ifdef EDG_WLL_LOG_STUB -// fprintf(stderr,"edg_wll_FormatLogLine (%d chars): %s",size,out); + fprintf(stderr,"edg_wll_FormatLogLine (%d chars): %s",size,out); #endif if (out) { *logline = out; @@ -333,7 +322,7 @@ edg_wll_formatlogline_end: * Formats a logging message and sends it to local-logger * \brief master logging event function * \param[in,out] ctx context to work with, - * \param[in] flags as defined by LOGFLAG_* + * \param[in] flags as defined by EDG_WLL_LOGFLAG_* * \param[in] event type of the event, * \param[in] fmt printf()-like format string, * \param[in] ... event specific values/data according to fmt. @@ -350,7 +339,7 @@ static int edg_wll_LogEventMaster( int ret; edg_wll_LogLine in = NULL, out = NULL; - priority = flags & LOGFLAG_SYNC; + priority = flags; edg_wll_ResetError(ctx); @@ -385,13 +374,13 @@ static int edg_wll_LogEventMaster( /* and send the message */ #ifndef LB_PERF_DROP - if (flags & LOGFLAG_NORMAL) { + if (flags & EDG_WLL_LOGFLAG_LOCAL) { /* to the local-logger: */ ret = edg_wll_DoLogEvent(ctx, out); - } else if (flags & LOGFLAG_PROXY) { + } else if (flags & EDG_WLL_LOGFLAG_PROXY) { /* to the L&B Proxy: */ ret = edg_wll_DoLogEventProxy(ctx, out); - } else if (flags & LOGFLAG_DIRECT) { + } else if (flags & EDG_WLL_LOGFLAG_DIRECT) { /* directly to the bkserver: */ ret = edg_wll_DoLogEventDirect(ctx, out); } else { @@ -437,7 +426,7 @@ int edg_wll_LogEvent( goto edg_wll_logevent_end; } - ret=edg_wll_LogEventMaster(ctx,LOGFLAG_NORMAL | LOGFLAG_ASYNC,event,"%s",list); + ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_ASYNC,event,"%s",list); edg_wll_logevent_end: va_end(fmt_args); @@ -472,7 +461,7 @@ int edg_wll_LogEventSync( goto edg_wll_logeventsync_end; } - ret=edg_wll_LogEventMaster(ctx,LOGFLAG_NORMAL | LOGFLAG_SYNC,event,"%s",list); + ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_SYNC,event,"%s",list); edg_wll_logeventsync_end: va_end(fmt_args); @@ -507,7 +496,7 @@ int edg_wll_LogEventProxy( goto edg_wll_logevent_end; } - ret=edg_wll_LogEventMaster(ctx,LOGFLAG_PROXY | LOGFLAG_SYNC, event,"%s",list); + ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_SYNC, event,"%s",list); edg_wll_logevent_end: va_end(fmt_args); @@ -787,32 +776,59 @@ static int edg_wll_RegisterJobMaster( } parent_s = parent ? edg_wlc_JobIdUnparse(parent) : strdup(""); - if (flags & LOGFLAG_DIRECT) { - /* SetLoggingJob and log directly the message */ + if ( (flags & EDG_WLL_LOGFLAG_DIRECT & EDG_WLL_LOGFLAG_LOCAL) || + (flags & EDG_WLL_LOGFLAG_PROXY & EDG_WLL_LOGFLAG_LOCAL) || + !(flags | EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_LOCAL) ) { + edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): wrong flag specified"); + goto edg_wll_registerjobmaster_end; + } + if (flags & EDG_WLL_LOGFLAG_DIRECT) { + /* SetLoggingJob and log the message directly to bkserver */ if (edg_wll_SetLoggingJob(ctx,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) { - edg_wll_LogEventMaster(ctx,LOGFLAG_DIRECT | LOGFLAG_SYNC, - EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB, - (char *)jdl,ns,parent_s,type_s,num_subjobs,seed); + err = edg_wll_LogEventMaster(ctx, flags, + EDG_WLL_EVENT_REGJOB, EDG_WLL_FORMAT_REGJOB, + (char *)jdl, ns, parent_s, type_s, num_subjobs, seed); + if (err) { + edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to register with bkserver"); + goto edg_wll_registerjobmaster_end; + } + } else { + edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to set logging job (direct)"); + goto edg_wll_registerjobmaster_end; } - } else if (flags & LOGFLAG_PROXY) { - /* SetLoggingJobProxy and and log to proxy */ + } + if (flags & EDG_WLL_LOGFLAG_PROXY) { + /* SetLoggingJobProxy and log the message to lbproxy */ edg_wll_SetSequenceCode(ctx, NULL, EDG_WLL_SEQ_NORMAL); if (seq) free(seq); seq = edg_wll_GetSequenceCode(ctx); if (edg_wll_SetLoggingJobProxy(ctx,job,seq,NULL,EDG_WLL_SEQ_NORMAL) == 0) { - edg_wll_LogEventMaster(ctx,LOGFLAG_PROXY | LOGFLAG_SYNC, - EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB, - (char *)jdl,ns,parent_s,type_s,num_subjobs,seed); + err = edg_wll_LogEventMaster(ctx, flags, + EDG_WLL_EVENT_REGJOB, EDG_WLL_FORMAT_REGJOB, + (char *)jdl, ns, parent_s, type_s, num_subjobs, seed); + if (err) { + edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to register with lbproxy"); + goto edg_wll_registerjobmaster_end; + } + } else { + edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to set logging job (proxy)"); + goto edg_wll_registerjobmaster_end; } - } else if (flags & LOGFLAG_NORMAL) { - /* SetLoggingJob and log normally the message through the local-logger */ + } + if (flags & EDG_WLL_LOGFLAG_LOCAL) { + /* SetLoggingJob and log the message through the local-logger */ if (edg_wll_SetLoggingJob(ctx,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) { - edg_wll_LogEventMaster(ctx, LOGFLAG_NORMAL, - EDG_WLL_EVENT_REGJOB,EDG_WLL_FORMAT_REGJOB, - (char *)jdl,ns,parent_s,type_s,num_subjobs,seed); + err = edg_wll_LogEventMaster(ctx, flags, + EDG_WLL_EVENT_REGJOB, EDG_WLL_FORMAT_REGJOB, + (char *)jdl, ns, parent_s, type_s, num_subjobs, seed); + if (err) { + edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to register through local-logger"); + goto edg_wll_registerjobmaster_end; + } + } else { + edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to set logging job (local)"); + goto edg_wll_registerjobmaster_end; } - } else { - edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): wrong flag specified"); } edg_wll_registerjobmaster_end: @@ -839,7 +855,7 @@ int edg_wll_RegisterJobSync( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_SYNC,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); } /** @@ -858,7 +874,7 @@ int edg_wll_RegisterJob( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_ASYNC,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); } #ifndef LB_SERIAL_REG @@ -926,7 +942,7 @@ int edg_wll_RegisterJobProxy( } /* format the RegJob event message */ - if (edg_wll_FormatLogLine(ctx,LOGFLAG_SYNC | LOGFLAG_PROXY | LOGFLAG_PROXY, + if (edg_wll_FormatLogLine(ctx,EDG_WLL_LOGFLAG_SYNC | EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY, EDG_WLL_EVENT_REGJOB,&logline, EDG_WLL_FORMAT_REGJOB,(char *)jdl,ns,"",type_s,num_subjobs,seed) != 0 ) { edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_FormatLogLine() error"); @@ -1041,14 +1057,8 @@ int edg_wll_RegisterJobProxyOld( const char * seed, edg_wlc_JobId ** subjobs) { - /* first register with bkserver */ - int ret = edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); - if (ret) { - edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobProxyOld(): unable to register with bkserver"); - return edg_wll_Error(ctx,NULL,NULL); - } - /* and then with L&B Proxy */ - return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); + /* first register with bkserver and then with L&B Proxy */ + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); } #else /* LB_SERIAL_REG */ @@ -1070,21 +1080,12 @@ int edg_wll_RegisterJobProxy( const char * seed, edg_wlc_JobId ** subjobs) { - /* first register with bkserver */ - int ret = edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); - if (ret) { - edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobProxy(): unable to register with bkserver"); - return edg_wll_Error(ctx,NULL,NULL); - } - /* and then with L&B Proxy */ - return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); + /* first register with bkserver and then with L&B Proxy */ + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); } #endif /* LB_SERIAL_REG */ - -#ifndef LB_SERIAL_REG - /** *----------------------------------------------------------------------- * Register one job with L&B Proxy service ONLY @@ -1102,11 +1103,9 @@ int edg_wll_RegisterJobProxyOnly( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs); } -#endif /* LB_SERIAL_REG */ - /** *----------------------------------------------------------------------- * Register one subjob with L&B service @@ -1127,9 +1126,9 @@ int edg_wll_RegisterSubjob( { /* XXX: what is that ? */ #ifdef LB_PERF - return edg_wll_RegisterJobMaster(ctx,LOGFLAG_DIRECT,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); #else - return edg_wll_RegisterJobMaster(ctx,LOGFLAG_NORMAL,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_LOCAL,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); #endif } @@ -1151,7 +1150,7 @@ int edg_wll_RegisterSubjobProxy( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(ctx,LOGFLAG_PROXY,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,parent,num_subjobs,seed,subjobs); } /** @@ -1259,7 +1258,7 @@ int edg_wll_ChangeACL( enum edg_wll_ACLOperation operation) { if ( edg_wll_SetLoggingJob(ctx, jobid, NULL, EDG_WLL_SEQ_NORMAL) == 0 ) { - edg_wll_LogEventMaster(ctx, LOGFLAG_NORMAL | LOGFLAG_SYNC, + edg_wll_LogEventMaster(ctx, EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_SYNC, EDG_WLL_EVENT_CHANGEACL, EDG_WLL_FORMAT_CHANGEACL, user_id, user_id_type, permission, permission_type, operation); } diff --git a/org.glite.lb.common/interface/events.h.T b/org.glite.lb.common/interface/events.h.T index ffdb97f..65afcc8 100644 --- a/org.glite.lb.common/interface/events.h.T +++ b/org.glite.lb.common/interface/events.h.T @@ -385,6 +385,15 @@ for my $t (sort { $event->{order}->{$a} <=> $event->{order}->{$b} } #define EDG_WLL_FORMAT_SYSCMPSTAT "DG.SCHED.STATUS=\"%|Us\" " #define EDG_WLL_FORMAT_SYSCLSTAT "DG.SCHED.NODE=\"%|Us\" DG.SCHED.STATUS=\"%|Us\" " + +/* Flags (priority bits) defining the type of logging */ + +#define EDG_WLL_LOGFLAG_ASYNC 0 /**< asynchronous logging */ +#define EDG_WLL_LOGFLAG_SYNC 1 /**< synchronous logging */ +#define EDG_WLL_LOGFLAG_LOCAL 2 /**< logging to local logger */ +#define EDG_WLL_LOGFLAG_PROXY 4 /**< logging to L&B Proxy */ +#define EDG_WLL_LOGFLAG_DIRECT 8 /**< logging directly to bkserver */ + /* enums for the ChangeACL event */ enum edg_wll_Permission { diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index 4a0ea51..83a88cb 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -6,6 +6,7 @@ #include #include "glite/lb/context-int.h" +#include "glite/lb/events.h" #include "glite/lb/events_parse.h" #include "glite/lb/lb_maildir.h" #include "purge.h" @@ -190,9 +191,8 @@ static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev, if ( ctx->isProxy ) { /* * send event to the proper BK server + * event with priority flag EDG_WLL_LOGFLAG_DIRECT (typically RegJob) is not sent */ - /* XXX: RegJob events, which were logged also directly, are duplicated at server, - but it should not harm */ #ifdef LB_PERF if( sink_mode == GLITE_LB_SINK_SEND ) { @@ -200,8 +200,16 @@ static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev, } else #endif - if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) { - return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error."); + /* XXX: ending here may break the backward compatibility */ + if (!(ev->any.priority & EDG_WLL_LOGFLAG_PROXY)) { + edg_wll_UpdateError(ctx, 0, "db_actual_store() WARNING: the event is not PROXY"); + //return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "db_actual_store() ERROR: the event is not PROXY"); + } + + if (!(ev->any.priority & EDG_WLL_LOGFLAG_DIRECT)) { + if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) { + return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error."); + } } /* LB proxy purge -- 1.8.2.3