From: Jan Pospíšil Date: Fri, 12 Mar 2010 15:43:19 +0000 (+0000) Subject: huge client library cleanup X-Git-Tag: merge_20_2_dst~11 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=963fd1262cfe0fb945e7860f781ef64c8b40769e;p=jra1mw.git huge client library cleanup - unified calling of LogEvent*, RegisterJob* - no duplicity in the code anymore - build and elementary tests o.k., still needs testing --- diff --git a/org.glite.lb.client/examples/stresslog.c b/org.glite.lb.client/examples/stresslog.c index 67d7c07..7ec2acd 100644 --- a/org.glite.lb.client/examples/stresslog.c +++ b/org.glite.lb.client/examples/stresslog.c @@ -141,17 +141,17 @@ for (i = 0; i 0) { + fd_n=0; + if (flags & EDG_WLL_LOGFLAG_DIRECT) { + FD_SET(con_bkserver.sock,&fdset); + if (con_bkserver.sock > fd_n) fd_n = con_bkserver.sock; + } + if (flags & EDG_WLL_LOGFLAG_PROXY) { + FD_SET(con_lbproxy.sock,&fdset); + if (con_lbproxy.sock > fd_n) fd_n = con_lbproxy.sock; + } + fd_n += 1; - /* connect to bkserver */ - if ((ret = edg_wll_log_direct_connect(ctx,&conn))) { - edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventDirect(): edg_wll_log_direct_connect error"); - goto edg_wll_DoLogEventDirect_end; +#ifdef EDG_WLL_LOG_STUB + fprintf(stderr,"edg_wll_DoLogEventServer(): calling select (remaining timeout %d.%06d sec)\n", + (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); +#endif + fd = select(fd_n,&fdset,NULL,NULL,&ctx->p_tmp_timeout); + switch (fd) { + case 0: /* timeout */ + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_DoLogEventServer(): select() timeouted"); + count = 0; + goto inc_seq_code; + break; + case -1: /* error */ + switch(errno) { + case EINTR: + continue; + default: + edg_wll_UpdateError(ctx,errno,"edg_wll_DoLogEventServer(): select() error"); + goto inc_seq_code; + } + default: + break; + } + /* XXX: read only from an apropriate descriptor + FD_ISSET can't be true unless mathching bit in flags was set + */ + if (FD_ISSET(con_lbproxy.sock,&fdset)) { + /* read answer from lbproxy */ + if ((ret = edg_wll_log_proxy_read(ctx,&con_lbproxy)) == -1) { + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_DoLogEventServer(): edg_wll_log_proxy_read error"); + goto inc_seq_code; + } + count -= 1; + } + if (FD_ISSET(con_bkserver.sock,&fdset)) { + /* read answer from bkserver */ + if ((ret = edg_wll_log_direct_read(ctx,&con_bkserver)) == -1) { + edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_DoLogEventServer(): edg_wll_log_direct_read error"); + goto inc_seq_code; + } + count -= 1; + } } - /* send message */ - if ((ret = edg_wll_log_direct_write(ctx,&conn,logline)) == -1) { - edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventDirect(): edg_wll_log_direct_write error"); - goto edg_wll_DoLogEventDirect_end; - } +inc_seq_code: + edg_wll_IncSequenceCode(ctx); /* XXX: should not fail, called second time */ - /* get answer */ - if ((ret = edg_wll_log_direct_read(ctx,&conn)) == -1) { - edg_wll_UpdateError(ctx,EDG_WLL_IL_PROTO,"edg_wll_DoLogEventDirect(): edg_wll_log_direct_read error"); - } else { - answer = edg_wll_Error(ctx, NULL, NULL); - } +edg_wll_DoLogEventServer_end: + edg_wll_log_proxy_close(ctx,&con_lbproxy); + edg_wll_log_direct_close(ctx,&con_bkserver); -edg_wll_DoLogEventDirect_end: - edg_wll_log_direct_close(ctx,&conn); + return handle_errors(ctx,answer,"edg_wll_DoLogEventServer()"); - return handle_errors(ctx,answer,"edg_wll_DoLogEventDirect()"); } #endif /* FAKE_VERSION */ @@ -295,7 +340,7 @@ static int edg_wll_FormatLogLine( } size = strlen(out); - if (priority && (size > EDG_WLL_LOG_SYNC_MAXMSGSIZE)) { + if ((flags & (EDG_WLL_LOGFLAG_DIRECT|EDG_WLL_LOGFLAG_SYNC)) && (size > EDG_WLL_LOG_SYNC_MAXMSGSIZE)) { edg_wll_SetError(ctx,ret = ENOSPC,"edg_wll_FormatLogLine(): Message size too large for synchronous transfer"); goto edg_wll_formatlogline_end; } @@ -325,32 +370,34 @@ edg_wll_formatlogline_end: /** *---------------------------------------------------------------------- - * Formats a logging message and sends it to local-logger + * Formats a logging message and sends it to a correct destination * \brief master logging event function * \param[in,out] ctx context to work with, - * \param[in] flags as defined by EDG_WLL_LOGFLAG_* + * \param[in] flags logging flags indicating the destination + * EDG_WLL_LOGFLAG_LOCAL - local-logger + * EDG_WLL_LOGFLAG_PROXY - lbproxy + * EDG_WLL_LOGFLAG_DIRECT - bkserver * \param[in] event type of the event, * \param[in] fmt printf()-like format string, - * \param[in] ... event specific values/data according to fmt. + * \param[in] fmt_args event specific values/data according to fmt. *---------------------------------------------------------------------- */ -static int edg_wll_LogEventMaster( +static int edg_wll_LogEventMasterVa( edg_wll_Context ctx, int flags, edg_wll_EventCode event, - char *fmt, ...) + char *fmt, va_list fmt_args) { - va_list fmt_args; - int priority; - int ret; +// va_list fmt_args; + int ret = 0; edg_wll_LogLine in = NULL, out = NULL; - priority = flags; + if ((flags & (EDG_WLL_LOGFLAG_LOCAL|EDG_WLL_LOGFLAG_PROXY|EDG_WLL_LOGFLAG_DIRECT)) == 0) { + return edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_LogEventMaster(): no known flag specified"); + } - edg_wll_ResetError(ctx); - - /* format the message: */ - va_start(fmt_args,fmt); + /* format the message */ + //va_start(fmt_args,fmt); if (trio_vasprintf(&in,fmt,fmt_args) == -1) { edg_wll_UpdateError(ctx,ret = ENOMEM,"edg_wll_LogEventMaster(): trio_vasprintf() error"); @@ -366,34 +413,23 @@ static int edg_wll_LogEventMaster( // fprintf(stderr,"edg_wll_LogEventMaster (%d chars): %s",strlen(out),out); #endif - /* and send the message */ #ifndef LB_PERF_DROP - if (flags & EDG_WLL_LOGFLAG_LOCAL) { - /* to the local-logger: */ - ctx->p_tmp_timeout = priority ? ctx->p_sync_timeout : ctx->p_log_timeout; - ret = edg_wll_DoLogEvent(ctx, out); - if (ret) goto edg_wll_logeventmaster_end; - } - if (flags & EDG_WLL_LOGFLAG_PROXY) { - /* to the L&B Proxy: */ - ctx->p_tmp_timeout = priority ? ctx->p_sync_timeout : ctx->p_log_timeout; - ret = edg_wll_DoLogEventProxy(ctx, out); - if (ret) goto edg_wll_logeventmaster_end; - } - if (flags & EDG_WLL_LOGFLAG_DIRECT) { - /* directly to the bkserver: */ - ctx->p_tmp_timeout = priority ? ctx->p_sync_timeout : ctx->p_log_timeout; - ret = edg_wll_DoLogEventDirect(ctx, out); - if (ret) goto edg_wll_logeventmaster_end; - } + ctx->p_tmp_timeout = (flags & (EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_SYNC)) + ? ctx->p_sync_timeout : ctx->p_log_timeout; - if ((flags & (EDG_WLL_LOGFLAG_LOCAL|EDG_WLL_LOGFLAG_PROXY|EDG_WLL_LOGFLAG_DIRECT)) == 0) { - edg_wll_SetError(ctx,ret = EINVAL,"edg_wll_LogEventMaster(): no known flag specified"); - } + /* send the message and read answer back */ + if (flags & EDG_WLL_LOGFLAG_LOCAL) { + ret = edg_wll_DoLogEvent(ctx, out); + if (ret) goto edg_wll_logeventmaster_end; + } + if (flags & (EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_DIRECT)) { + ret = edg_wll_DoLogEventServer(ctx, flags, out); + if (ret) goto edg_wll_logeventmaster_end; + } #endif edg_wll_logeventmaster_end: - va_end(fmt_args); +// va_end(fmt_args); if (in) free(in); if (out) free(out); @@ -406,6 +442,35 @@ edg_wll_logeventmaster_end: return edg_wll_Error(ctx,NULL,NULL); } +/** + *---------------------------------------------------------------------- + * Formats a logging message and sends it to a correct destination + * \brief master logging event function + * \note simple wrapper around edg_wll_LogEventMasterVa() + * \brief master logging event function + * \param[in,out] ctx context to work with, + * \param[in] flags logging flags indicating the destination + * EDG_WLL_LOGFLAG_LOCAL - local-logger + * EDG_WLL_LOGFLAG_PROXY - lbproxy + * EDG_WLL_LOGFLAG_DIRECT - bkserver + * \param[in] event type of the event, + * \param[in] fmt printf()-like format string, + * \param[in] ... event specific values/data according to fmt. + *---------------------------------------------------------------------- + */ +static int edg_wll_LogEventMaster( + edg_wll_Context ctx, + int flags, + edg_wll_EventCode event, + char *fmt, ...) +{ + int ret; + va_list fmt_args; + va_start(fmt_args,fmt); + ret = edg_wll_LogEventMasterVa(ctx,flags,event,fmt,fmt_args); + va_end(fmt_args); + return ret; +} /** *---------------------------------------------------------------------- @@ -419,24 +484,15 @@ int edg_wll_LogEvent( char *fmt, ...) { int ret=0; - char *list=NULL; va_list fmt_args; edg_wll_ResetError(ctx); va_start(fmt_args,fmt); - if (trio_vasprintf(&list,fmt,fmt_args) == -1) { - edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_LogEvent(): trio_vasprintf() error"); - goto edg_wll_logevent_end; - } - - ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_ASYNC,event,"%s",list); - -edg_wll_logevent_end: - va_end(fmt_args); - if (list) free(list); - + ret=edg_wll_LogEventMasterVa(ctx,EDG_WLL_LOGFLAG_LOCAL, + event,fmt,fmt_args); if (ret) edg_wll_UpdateError(ctx,0,"edg_wll_LogEvent(): "); + va_end(fmt_args); return edg_wll_Error(ctx,NULL,NULL); } @@ -454,24 +510,15 @@ int edg_wll_LogEventSync( char *fmt, ...) { int ret=0; - char *list=NULL; va_list fmt_args; edg_wll_ResetError(ctx); va_start(fmt_args,fmt); - if (trio_vasprintf(&list,fmt,fmt_args) == -1) { - edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_LogEventSync(): trio_vasprintf() error"); - goto edg_wll_logeventsync_end; - } - - ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_SYNC,event,"%s",list); - -edg_wll_logeventsync_end: - va_end(fmt_args); - if (list) free(list); - + ret=edg_wll_LogEventMasterVa(ctx,EDG_WLL_LOGFLAG_LOCAL | EDG_WLL_LOGFLAG_SYNC, + event,fmt,fmt_args); if (ret) edg_wll_UpdateError(ctx,0,"edg_wll_LogEventSync(): "); + va_end(fmt_args); return edg_wll_Error(ctx,NULL,NULL); } @@ -489,24 +536,15 @@ int edg_wll_LogEventProxy( char *fmt, ...) { int ret=0; - char *list=NULL; va_list fmt_args; edg_wll_ResetError(ctx); va_start(fmt_args,fmt); - if (trio_vasprintf(&list,fmt,fmt_args) == -1) { - edg_wll_SetError(ctx,ret = ENOMEM,"edg_wll_LogEventProxy(): trio_vasprintf() error"); - goto edg_wll_logevent_end; - } - - ret=edg_wll_LogEventMaster(ctx,EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_SYNC, event,"%s",list); - -edg_wll_logevent_end: - va_end(fmt_args); - if (list) free(list); - + ret=edg_wll_LogEventMasterVa(ctx,EDG_WLL_LOGFLAG_PROXY, + event,fmt,fmt_args); if (ret) edg_wll_UpdateError(ctx,0,"edg_wll_LogEventProxy(): "); + va_end(fmt_args); return edg_wll_Error(ctx,NULL,NULL); } @@ -611,86 +649,39 @@ edg_wll_logflushall_end: /** *----------------------------------------------------------------------- - * Set a current job for given context. + * Master function for setting a current job for given context. * \note Should be called before any logging call. + * \param[in,out] context context to work with + * \param[in] job further logging calls are related to this job + * \param[in] code sequence code as obtained from previous component + * \param[in] user user credentials + * \param[in] seq_code_flags flags on code handling (\see API documentation) + * \param[in] logging flags as defined by EDG_WLL_LOGFLAG_* *----------------------------------------------------------------------- */ -int edg_wll_SetLoggingJob( - edg_wll_Context ctx, - glite_jobid_const_t job, - const char *code, - int flags) -{ - int err; - - edg_wll_ResetError(ctx); - - if (!job) return edg_wll_SetError(ctx,EINVAL,"edg_wll_SetLoggingJob(): jobid is null"); - - edg_wlc_JobIdFree(ctx->p_jobid); - if ((err = edg_wlc_JobIdDup(job,&ctx->p_jobid))) { - edg_wll_SetError(ctx,err,"edg_wll_SetLoggingJob(): edg_wlc_JobIdDup() error"); - } else if (!edg_wll_SetSequenceCode(ctx,code,flags)) { - edg_wll_IncSequenceCode(ctx); - } - - /* add user credentials to context */ - { - edg_wll_GssStatus gss_stat; - edg_wll_GssCred cred = NULL; - - /* acquire gss credentials */ - err = edg_wll_gss_acquire_cred_gsi( - ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename, - ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename, - &cred, &gss_stat); - /* give up if unable to acquire prescribed credentials */ - if (err) { - edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat); - - // XXX: stop here - further changes need to be done in - // edg_wll_gss_connect() to support annonymous connetion - return edg_wll_Error(ctx,NULL,NULL); - - edg_wll_SetParamString(ctx, EDG_WLL_PARAM_LBPROXY_USER, EDG_WLL_LOG_USER_DEFAULT); - } else { - edg_wll_SetParamString(ctx, EDG_WLL_PARAM_LBPROXY_USER, cred->name); - } - if (cred != NULL) - edg_wll_gss_release_cred(&cred, NULL); - } - - return edg_wll_Error(ctx,NULL,NULL); -} - -/** - *----------------------------------------------------------------------- - * Set a current job for given context. - * \note Should be called before any logging call. - *----------------------------------------------------------------------- - */ -int edg_wll_SetLoggingJobProxy( +static int edg_wll_SetLoggingJobMaster( edg_wll_Context ctx, glite_jobid_const_t job, const char *code, const char *user, - int flags) + int seq_code_flags, + int logging_flags) { int err; char *code_loc = NULL; edg_wll_ResetError(ctx); - if (!job) return edg_wll_SetError(ctx,EINVAL,"edg_wll_SetLoggingJobProxy(): jobid is null"); + if (!job) return edg_wll_SetError(ctx,EINVAL,"edg_wll_SetLoggingJobMaster(): jobid is null"); edg_wlc_JobIdFree(ctx->p_jobid); if ((err = edg_wlc_JobIdDup(job,&ctx->p_jobid))) { - edg_wll_SetError(ctx,err,"edg_wll_SetLoggingJobProxy(): edg_wlc_JobIdDup() error"); - goto edg_wll_setloggingjobproxy_end; + edg_wll_SetError(ctx,err,"edg_wll_SetLoggingJobMaster(): edg_wlc_JobIdDup() error"); + goto edg_wll_setloggingjobmaster_end; } /* add user credentials to context */ - if (user) { + if ((logging_flags & EDG_WLL_LOGFLAG_PROXY) && user) { edg_wll_SetParamString(ctx, EDG_WLL_PARAM_LBPROXY_USER, user); } else { edg_wll_GssStatus gss_stat; @@ -719,19 +710,21 @@ int edg_wll_SetLoggingJobProxy( } /* query LBProxyServer for sequence code if not user-suplied */ -/* TODO: merge - check if it is really working properly after the unification of proxy and server */ - if (!code) { - if (edg_wll_QuerySequenceCodeProxy(ctx, job, &code_loc)) - goto edg_wll_setloggingjobproxy_end; - } else { - code_loc = strdup(code); - } - - if (!edg_wll_SetSequenceCode(ctx,code_loc,flags)) { - edg_wll_IncSequenceCode(ctx); + /* TODO: merge - check if it is really working properly after the unification of proxy and server */ + if (logging_flags & EDG_WLL_LOGFLAG_PROXY) { + if (!code) { + if (edg_wll_QuerySequenceCodeProxy(ctx, job, &code_loc)) + goto edg_wll_setloggingjobmaster_end; + } else { + code_loc = strdup(code); + } + + if (!edg_wll_SetSequenceCode(ctx,code_loc,seq_code_flags)) { + edg_wll_IncSequenceCode(ctx); + } } -edg_wll_setloggingjobproxy_end: +edg_wll_setloggingjobmaster_end: if (code_loc) free(code_loc); return edg_wll_Error(ctx,NULL,NULL); @@ -739,7 +732,45 @@ edg_wll_setloggingjobproxy_end: /** *----------------------------------------------------------------------- - * Register job with L&B service. + * Set a current job for given context. + * \note simple wrappers around edg_wll_SetLoggingJobMaster() + *----------------------------------------------------------------------- + */ +int edg_wll_SetLoggingJob( + edg_wll_Context ctx, + glite_jobid_const_t job, + const char *code, + int seq_code_flags) +{ + return edg_wll_SetLoggingJobMaster(ctx,job,NULL,code,seq_code_flags,/* XXX */ 0); +} + +int edg_wll_SetLoggingJobProxy( + edg_wll_Context ctx, + glite_jobid_const_t job, + const char *code, + const char *user, + int seq_code_flags) +{ + return edg_wll_SetLoggingJobMaster(ctx,job,user,code,seq_code_flags,EDG_WLL_LOGFLAG_PROXY); +} + +/** + *----------------------------------------------------------------------- + * Master function for registering a job with L&B service. + * \brief generic job registration + * \param[in,out] context context to work with + * \param[in] flags as defined by EDG_WLL_LOGFLAG_* + * \param[in] job jobId + * \param[in] type EDG_WLL_JOB_SIMPLE, EDG_WLL_JOB_DAG, or EDG_WLL_JOB_PARTITIONABLE + * \param[in] jdl user-specified JDL + * \param[in] ns network server contact + * \param[in] num_subjobs number of subjobs to create + * \param[in] seed seed used for subjob id's generator. + * Use non-NULL value to be able to regenerate the set of jobid's + * \param[out] subjobs returned subjob id's + * \param[in] wms_dn DN of WMS handling the job + * *----------------------------------------------------------------------- */ static int edg_wll_RegisterJobMaster( @@ -764,6 +795,15 @@ static int edg_wll_RegisterJobMaster( edg_wll_ResetError(ctx); memcpy(&sync_to, &ctx->p_sync_timeout, sizeof sync_to); + if ( ((flags & (EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_LOCAL)) == + (EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_LOCAL)) || + ((flags & (EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_LOCAL)) == + (EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_LOCAL)) + ) { + edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): wrong flag specified"); + goto edg_wll_registerjobmaster_end; + } + type_s = edg_wll_RegJobJobtypeToString(type); if (!type_s) { edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): no jobtype specified"); @@ -783,6 +823,7 @@ static int edg_wll_RegisterJobMaster( edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): edg_wll_GenerateSubjobIds() error"); goto edg_wll_registerjobmaster_end; } + parent_s = parent ? edg_wlc_JobIdUnparse(parent) : strdup(""); if (wms_dn) { char *aux,*aux2; @@ -795,28 +836,31 @@ static int edg_wll_RegisterJobMaster( free(aux2); aux2 = NULL; } - if ( ((flags & (EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_LOCAL)) == - (EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_LOCAL)) || - ((flags & (EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_LOCAL)) == - (EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_LOCAL)) - ) { - edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): wrong flag specified"); - goto edg_wll_registerjobmaster_end; + if (flags & EDG_WLL_LOGFLAG_PROXY) { + /* XXX: it was here but we don't know why + edg_wll_SetSequenceCode(ctx, NULL, EDG_WLL_SEQ_NORMAL); + seq = edg_wll_GetSequenceCode(ctx); + */ + err=edg_wll_SetLoggingJobProxy(ctx,job,/* seq */ NULL,NULL,EDG_WLL_SEQ_NORMAL); + } else { + err=edg_wll_SetLoggingJob(ctx,job,NULL,EDG_WLL_SEQ_NORMAL); } + // ret=edg_wll_SetLoggingJobMaster(ctx,job,seq,NULL,EDG_WLL_SEQ_NORMAL,flags); + + if (err != 0) { + edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): unable to set logging job"); + goto edg_wll_registerjobmaster_end; - if (edg_wll_SetLoggingJob(ctx,job,NULL,EDG_WLL_SEQ_NORMAL) == 0) { - err = edg_wll_LogEventMaster(ctx, flags, - EDG_WLL_EVENT_REGJOB, EDG_WLL_FORMAT_REGJOB, - (char *)jdl, ns, parent_s, type_s, num_subjobs, seed, wms_dn_s); - if (err) { - edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to register with bkserver"); - goto edg_wll_registerjobmaster_end; - } - } else { - edg_wll_UpdateError(ctx,0,"edg_wll_RegisterJobMaster(): unable to set logging job (direct)"); - goto edg_wll_registerjobmaster_end; } + /* send the RegJob event message */ + if ((err = edg_wll_LogEventMaster(ctx,flags, + EDG_WLL_EVENT_REGJOB, EDG_WLL_FORMAT_REGJOB, + (char *)jdl,ns,parent,type_s,num_subjobs,seed,wms_dn_s)) != 0 ) { + edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobMaster(): unable to register job"); + goto edg_wll_registerjobmaster_end; + } + edg_wll_registerjobmaster_end: memcpy(&ctx->p_sync_timeout, &sync_to, sizeof sync_to); if (seq) free(seq); @@ -827,25 +871,6 @@ edg_wll_registerjobmaster_end: /** *----------------------------------------------------------------------- - * Register synchronously one job with L&B service - * \note simple wrapper around edg_wll_RegisterJobMaster() - *----------------------------------------------------------------------- - */ -int edg_wll_RegisterJobSync( - edg_wll_Context ctx, - glite_jobid_const_t job, - enum edg_wll_RegJobJobtype type, - const char * jdl, - const char * ns, - int num_subjobs, - const char * seed, - edg_wlc_JobId ** subjobs) -{ - return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_SYNC,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,NULL); -} - -/** - *----------------------------------------------------------------------- * Register (asynchronously) one job with L&B service * \note simple wrapper around edg_wll_RegisterJobMaster() *----------------------------------------------------------------------- @@ -860,7 +885,7 @@ int edg_wll_RegisterJob( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_ASYNC,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,NULL); + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,NULL); } /** @@ -869,30 +894,7 @@ int edg_wll_RegisterJob( * \note simple wrapper around edg_wll_RegisterJobMaster() *----------------------------------------------------------------------- */ -int edg_wll_RegisterJobACL( - edg_wll_Context ctx, - glite_jobid_const_t job, - enum edg_wll_RegJobJobtype type, - const char * jdl, - const char * ns, - int num_subjobs, - const char * seed, - edg_wlc_JobId ** subjobs, - char ** wms_dn) -{ - return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_ASYNC,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,wms_dn); -} - - -#ifndef LB_SERIAL_REG - -/** - *----------------------------------------------------------------------- - * Register job with L&B Proxy service. - *----------------------------------------------------------------------- - */ - -int edg_wll_RegisterJobProxyMaster( +int edg_wll_RegisterJobExt( edg_wll_Context ctx, glite_jobid_const_t job, enum edg_wll_RegJobJobtype type, @@ -901,164 +903,14 @@ int edg_wll_RegisterJobProxyMaster( int num_subjobs, const char * seed, edg_wlc_JobId ** subjobs, - char ** wms_dn) + char ** wms_dn, + int logging_flags) { - char *seq,*type_s,*wms_dn_s; - edg_wll_LogLine logline = NULL; - int ret = 0,n,count,fd,i; - struct timeval sync_to; - edg_wll_GssConnection con_bkserver; - edg_wll_PlainConnection con_lbproxy; - fd_set fdset; - - seq = type_s = wms_dn_s = NULL; - - edg_wll_ResetError(ctx); - memcpy(&sync_to, &ctx->p_sync_timeout, sizeof sync_to); - memset(&con_bkserver, 0, sizeof(con_bkserver)); - memset(&con_lbproxy, 0, sizeof(con_lbproxy)); - - FD_ZERO(&fdset); - - - type_s = edg_wll_RegJobJobtypeToString(type); - if (!type_s) { - edg_wll_SetError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): no jobtype specified"); - goto edg_wll_registerjobproxy_end; - } - if (wms_dn) { - char *aux,*aux2; - aux2 = strdup(""); - for (i=0; wms_dn[i]; i++) { - asprintf(&aux,"%s\n%s",aux2,wms_dn[i]); - free(aux2); aux2 = aux; aux = NULL; - } - wms_dn_s = strdup(aux2); - free(aux2); aux2 = NULL; - } - if ((type == EDG_WLL_REGJOB_DAG || - type == EDG_WLL_REGJOB_PARTITIONED || - type == EDG_WLL_REGJOB_COLLECTION) - && num_subjobs > 0) { - ret = edg_wll_GenerateSubjobIds(ctx,job,num_subjobs,seed,subjobs); - /* increase log timeout on client (the same as on BK server) */ - ctx->p_sync_timeout.tv_sec += num_subjobs; - if (ctx->p_sync_timeout.tv_sec > 86400) ctx->p_sync_timeout.tv_sec = 86400; - } - if (ret) { - edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_GenerateSubjobIds() error"); - goto edg_wll_registerjobproxy_end; - } - - edg_wll_SetSequenceCode(ctx, NULL, EDG_WLL_SEQ_NORMAL); - seq = edg_wll_GetSequenceCode(ctx); - if (edg_wll_SetLoggingJobProxy(ctx,job,seq,NULL,EDG_WLL_SEQ_NORMAL) != 0) { - edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_SetLoggingJobProxy() error"); - goto edg_wll_registerjobproxy_end; - } + int flags; + flags = logging_flags & EDG_WLL_LOGLFLAG_EXCL; /* the only supported flag */ + flags |= EDG_WLL_LOGFLAG_DIRECT; - /* format the RegJob event message */ - if (edg_wll_FormatLogLine(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY, - EDG_WLL_EVENT_REGJOB,&logline, - EDG_WLL_FORMAT_REGJOB,(char *)jdl,ns,"",type_s,num_subjobs,seed,wms_dn_s) != 0 ) { - edg_wll_UpdateError(ctx,EINVAL,"edg_wll_RegisterJobProxy(): edg_wll_FormatLogLine() error"); - goto edg_wll_registerjobproxy_end; - } - - /* do not forget to set the timeout!!! */ - ctx->p_tmp_timeout = ctx->p_sync_timeout; - - /* and now do the pseudo-parallel registration: */ - -#ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_RegisterJobProxy: start (remaining timeout %d.%06d sec)\n", - (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); -#endif - /* connect to bkserver */ - if ((ret = edg_wll_log_direct_connect(ctx,&con_bkserver))) { - edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_connect error"); - goto edg_wll_registerjobproxy_end; - } - /* connect to lbproxy */ - if ((ret = edg_wll_log_proxy_connect(ctx,&con_lbproxy))) { - edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_connect error"); - goto edg_wll_registerjobproxy_end; - } - /* send to bkserver */ - if ((ret = edg_wll_log_direct_write(ctx,&con_bkserver,logline)) == -1) { - edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_write error"); - goto edg_wll_registerjobproxy_end; - } - /* send to lbproxy */ - if ((ret = edg_wll_log_proxy_write(ctx,&con_lbproxy,logline)) == -1) { - edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_write error"); - goto edg_wll_registerjobproxy_end; - } - /* select and read the answers */ - count = 2; - while (count > 0) { - FD_SET(con_bkserver.sock,&fdset); - n = con_bkserver.sock; - FD_SET(con_lbproxy.sock,&fdset); - if (con_lbproxy.sock > n) n = con_lbproxy.sock; - n += 1; -#ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_RegisterJobProxy: calling select (remaining timeout %d.%06d sec)\n", - (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); -#endif - fd = select(n,&fdset,NULL,NULL,&ctx->p_tmp_timeout); - switch (fd) { - case 0: /* timeout */ - edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): select() timeouted"); - count = 0; - break; - case -1: /* error */ - switch(errno) { - case EINTR: - continue; - default: - edg_wll_UpdateError(ctx,errno,"edg_wll_RegisterJobProxy(): select() error"); - } - default: - break; - } - if (FD_ISSET(con_bkserver.sock,&fdset)) { - /* read answer from bkserver */ - if ((ret = edg_wll_log_direct_read(ctx,&con_bkserver)) == -1) { - edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_direct_read error"); - goto inc_seq_code; - } - count -= 1; - } - if (FD_ISSET(con_lbproxy.sock,&fdset)) { - /* read answer from lbproxy */ - if ((ret = edg_wll_log_proxy_read(ctx,&con_lbproxy)) == -1) { - edg_wll_UpdateError(ctx,EAGAIN,"edg_wll_RegisterJobProxy(): edg_wll_log_proxy_read error"); - goto inc_seq_code; - } - count -= 1; - } - } - - -inc_seq_code: - edg_wll_IncSequenceCode(ctx); /* XXX: should not fail, called second time */ - -edg_wll_registerjobproxy_end: - -#ifdef EDG_WLL_LOG_STUB - fprintf(stderr,"edg_wll_RegisterJobProxy: done (remaining timeout %d.%06d sec)\n", - (int) ctx->p_tmp_timeout.tv_sec, (int) ctx->p_tmp_timeout.tv_usec); -#endif - if (con_bkserver.sock) edg_wll_gss_close(&con_bkserver,&ctx->p_tmp_timeout); - if (con_lbproxy.sock) edg_wll_plain_close(&con_lbproxy); - - memcpy(&ctx->p_sync_timeout, &sync_to, sizeof sync_to); - if (type_s) free(type_s); - if (seq) free(seq); - if (logline) free(logline); - - return edg_wll_Error(ctx,NULL,NULL); + return edg_wll_RegisterJobMaster(ctx,flags,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,wms_dn); } /** @@ -1078,7 +930,7 @@ int edg_wll_RegisterJobProxy( const char * seed, edg_wlc_JobId ** subjobs) { - return edg_wll_RegisterJobProxyMaster(ctx,job,type,jdl,ns,num_subjobs,seed,subjobs,NULL); + return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_DIRECT,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,NULL); } /** @@ -1087,7 +939,7 @@ int edg_wll_RegisterJobProxy( * \note simple wrapper around edg_wll_RegisterJobProxyMaster() *----------------------------------------------------------------------- */ -int edg_wll_RegisterJobProxyACL( +int edg_wll_RegisterJobProxyExt( edg_wll_Context ctx, glite_jobid_const_t job, enum edg_wll_RegJobJobtype type, @@ -1096,72 +948,17 @@ int edg_wll_RegisterJobProxyACL( int num_subjobs, const char * seed, edg_wlc_JobId ** subjobs, - char ** wms_dn) -{ - return edg_wll_RegisterJobProxyMaster(ctx,job,type,jdl,ns,num_subjobs,seed,subjobs,wms_dn); -} - -/** - *----------------------------------------------------------------------- - * Register one job with L&B Proxy service - * \note simple wrapper around edg_wll_RegisterJobMaster() - * this is original edg_wll_RegisterJobProxy - *----------------------------------------------------------------------- - */ -int edg_wll_RegisterJobProxyOld( - edg_wll_Context ctx, - glite_jobid_const_t job, - enum edg_wll_RegJobJobtype type, - const char * jdl, - const char * ns, - int num_subjobs, - const char * seed, - edg_wlc_JobId ** subjobs) -{ - /* first register with bkserver and then with L&B Proxy */ - return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,NULL); -} - -#else /* LB_SERIAL_REG */ - -/** - *----------------------------------------------------------------------- - * Register one job with L&B Proxy service - * \note simple wrapper around edg_wll_RegisterJobMaster() - * this is original edg_wll_RegisterJobProxy - *----------------------------------------------------------------------- - */ -int edg_wll_RegisterJobProxy( - edg_wll_Context ctx, - glite_jobid_const_t job, - enum edg_wll_RegJobJobtype type, - const char * jdl, - const char * ns, - int num_subjobs, - const char * seed, - edg_wlc_JobId ** subjobs) + char ** wms_dn, + int logging_flags) { - /* first register with bkserver and then with L&B Proxy */ - return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,NULL); -} + int flags; + flags = logging_flags & EDG_WLL_LOGLFLAG_EXCL; /* the only supported flag */ + flags |= (EDG_WLL_LOGFLAG_PROXY | EDG_WLL_LOGFLAG_DIRECT); -int edg_wll_RegisterJobProxyACL( - edg_wll_Context ctx, - glite_jobid_const_t job, - enum edg_wll_RegJobJobtype type, - const char * jdl, - const char * ns, - int num_subjobs, - const char * seed, - edg_wlc_JobId ** subjobs, - char ** wms_dn) -{ - /* first register with bkserver and then with L&B Proxy */ - return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_DIRECT | EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,wms_dn); + return edg_wll_RegisterJobMaster(ctx,flags,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,wms_dn); } -#endif /* LB_SERIAL_REG */ - +#ifdef LB_PERF /** *----------------------------------------------------------------------- * Register one job with L&B Proxy service ONLY @@ -1181,6 +978,7 @@ int edg_wll_RegisterJobProxyOnly( { return edg_wll_RegisterJobMaster(ctx,EDG_WLL_LOGFLAG_PROXY,job,type,jdl,ns,NULL,num_subjobs,seed,subjobs,NULL); } +#endif /** *----------------------------------------------------------------------- diff --git a/org.glite.lb.common/interface/events.h.T b/org.glite.lb.common/interface/events.h.T index 030c1a2..1995e4c 100644 --- a/org.glite.lb.common/interface/events.h.T +++ b/org.glite.lb.common/interface/events.h.T @@ -441,13 +441,14 @@ for my $t (sort { $event->{order}->{$a} <=> $event->{order}->{$b} } /* Flags (priority bits) defining the type of logging */ -#define EDG_WLL_LOGFLAG_ASYNC 0 /**< asynchronous logging */ -#define EDG_WLL_LOGFLAG_SYNC_COMPAT 1 /**< synchronous logging in previous versions*/ -#define EDG_WLL_LOGFLAG_SYNC 2 /**< synchronous logging */ -#define EDG_WLL_LOGFLAG_LOCAL 4 /**< logging to local logger */ -#define EDG_WLL_LOGFLAG_PROXY 8 /**< logging to L&B Proxy */ -#define EDG_WLL_LOGFLAG_DIRECT 16 /**< logging directly to bkserver */ -#define EDG_WLL_LOGFLAG_INTERNAL 32 /**< internally generated event, don't forward */ +#define EDG_WLL_LOGFLAG_ASYNC 0 /**< asynchronous logging */ +#define EDG_WLL_LOGFLAG_SYNC_COMPAT 1 /**< synchronous logging in previous versions*/ +#define EDG_WLL_LOGFLAG_SYNC 2 /**< synchronous logging */ +#define EDG_WLL_LOGFLAG_LOCAL 4 /**< logging to local logger */ +#define EDG_WLL_LOGFLAG_PROXY 8 /**< logging to L&B Proxy */ +#define EDG_WLL_LOGFLAG_DIRECT 16 /**< logging directly to bkserver */ +#define EDG_WLL_LOGFLAG_INTERNAL 32 /**< internally generated event, don't forward */ +#define EDG_WLL_LOGLFLAG_EXCL 64 /**< exclusive flag, to be used with RegisterJob only (if job already exists, than registration fails) */ /* *@} end of group