From 943dad3e5ca2e888d8faf2d4326fde7d347b82de Mon Sep 17 00:00:00 2001 From: cvs2svn Date: Tue, 11 Mar 2008 12:31:44 +0000 Subject: [PATCH] This commit was manufactured by cvs2svn to create tag 'merge_transactions_src'. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Sprout from lb_transactions_only 2008-03-11 12:31:43 UTC Miloš Mulač 'prevent job_reg coredumping if no credentials found' Cherrypick from master 2008-02-29 15:36:01 UTC Jan Pospíšil 'stage and use job_attrs.h': org.glite.lb.state-machine/LICENSE org.glite.lb.state-machine/Makefile org.glite.lb.state-machine/interface/intjobstat.h org.glite.lb.state-machine/interface/lb-job-attrs.xsd org.glite.lb.state-machine/interface/lb-job-record.xsd org.glite.lb.state-machine/interface/seqcode_aux.h org.glite.lb.state-machine/src/job_attrs.xsl org.glite.lb.state-machine/src/lb_plugin.c org.glite.lb.state-machine/src/process_event.c org.glite.lb.state-machine/src/process_event_condor.c org.glite.lb.state-machine/src/process_event_pbs.c org.glite.lb.state-machine/src/seqcode_aux.c --- org.glite.lb.state-machine/LICENSE | 69 ++ org.glite.lb.state-machine/Makefile | 102 ++ org.glite.lb.state-machine/interface/intjobstat.h | 104 ++ .../interface/lb-job-attrs.xsd | 132 +++ .../interface/lb-job-record.xsd | 90 ++ org.glite.lb.state-machine/interface/seqcode_aux.h | 22 + org.glite.lb.state-machine/src/job_attrs.xsl | 25 + org.glite.lb.state-machine/src/lb_plugin.c | 1045 ++++++++++++++++++++ org.glite.lb.state-machine/src/process_event.c | 1011 +++++++++++++++++++ .../src/process_event_condor.c | 207 ++++ org.glite.lb.state-machine/src/process_event_pbs.c | 230 +++++ org.glite.lb.state-machine/src/seqcode_aux.c | 281 ++++++ 12 files changed, 3318 insertions(+) create mode 100644 org.glite.lb.state-machine/LICENSE create mode 100644 org.glite.lb.state-machine/Makefile create mode 100644 org.glite.lb.state-machine/interface/intjobstat.h create mode 100644 org.glite.lb.state-machine/interface/lb-job-attrs.xsd create mode 100644 org.glite.lb.state-machine/interface/lb-job-record.xsd create mode 100644 org.glite.lb.state-machine/interface/seqcode_aux.h create mode 100644 org.glite.lb.state-machine/src/job_attrs.xsl create mode 100644 org.glite.lb.state-machine/src/lb_plugin.c create mode 100644 org.glite.lb.state-machine/src/process_event.c create mode 100644 org.glite.lb.state-machine/src/process_event_condor.c create mode 100644 org.glite.lb.state-machine/src/process_event_pbs.c create mode 100644 org.glite.lb.state-machine/src/seqcode_aux.c diff --git a/org.glite.lb.state-machine/LICENSE b/org.glite.lb.state-machine/LICENSE new file mode 100644 index 0000000..01b973b --- /dev/null +++ b/org.glite.lb.state-machine/LICENSE @@ -0,0 +1,69 @@ +LICENSE file for EGEE Middleware +================================ + +Copyright (c) 2004 on behalf of the EU EGEE Project: +The European Organization for Nuclear Research (CERN), +Istituto Nazionale di Fisica Nucleare (INFN), Italy +Datamat Spa, Italy +Centre National de la Recherche Scientifique (CNRS), France +CS Systeme d'Information (CSSI), France +Royal Institute of Technology, Center for Parallel Computers (KTH-PDC), Sweden +Universiteit van Amsterdam (UvA), Netherlands +University of Helsinki (UH.HIP), Finlan +University of Bergen (UiB), Norway +Council for the Central Laboratory of the Research Councils (CCLRC), United Kingdom + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + +1. Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. + +3. The end-user documentation included with the redistribution, if +any, must include the following acknowledgment: "This product includes +software developed by The EU EGEE Project (http://cern.ch/eu-egee/)." +Alternatively, this acknowledgment may appear in the software itself, if +and wherever such third-party acknowledgments normally appear. + +4. The names EGEE and the EU EGEE Project must not be +used to endorse or promote products derived from this software without +prior written permission. For written permission, please contact +. + +5. You are under no obligation whatsoever to provide anyone with any +bug fixes, patches, or upgrades to the features, functionality or +performance of the Software ("Enhancements") that you may develop over +time; however, if you choose to provide your Enhancements to The EU +EGEE Project, or if you choose to otherwise publish or distribute your +Enhancements, in source code form without contemporaneously requiring +end users of The EU EGEE Proejct to enter into a separate written license +agreement for such Enhancements, then you hereby grant The EU EGEE Project +a non-exclusive, royalty-free perpetual license to install, use, copy, +modify, prepare derivative works, incorporate into the EGEE Middleware +or any other computer software, distribute, and sublicense your +Enhancements or derivative works thereof, in binary and source code +form (if any), whether developed by The EU EGEE Project or third parties. + +THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED +WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL PROJECT OR ITS CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN +IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +This software consists of voluntary contributions made by many +individuals on behalf of the EU EGEE Prject. For more information on The +EU EGEE Project, please see http://cern.ch/eu-egee/. For more information on +EGEE Middleware, please see http://egee-jra1.web.cern.ch/egee-jra1/ + + diff --git a/org.glite.lb.state-machine/Makefile b/org.glite.lb.state-machine/Makefile new file mode 100644 index 0000000..7803ade --- /dev/null +++ b/org.glite.lb.state-machine/Makefile @@ -0,0 +1,102 @@ +# defaults +top_srcdir=. +stagedir=. +distdir=. +globalprefix=glite +lbprefix=lb +package=glite-lb-state-machine +version=0.2.0 +PREFIX=/opt/glite + +-include Makefile.inc + +VPATH = ../src:../interface + +default all: compile + +CC=gcc +CXX=g++ +AT3=${stagedir}/sbin/glite-lb-at3 + +SUFFIXES = .T + +DEBUG:=-g -O0 -Wall + +CFLAGS:= \ + ${DEBUG} \ + -I${classads_prefix}/include \ + -I${stagedir}/include -I${top_srcdir}/src -I. \ + -I${top_srcdir}/interface + +COMPILE:=libtool --mode=compile ${CC} ${CFLAGS} +LINK:=libtool --mode=link ${CC} -rpath ${stagedir}/lib ${LDFLAGS} +SOLINK:=libtool --mode=link ${CC} -module ${LDFLAGS} -rpath ${stagedir}/lib +LINKXX:=libtool --mode=link ${CXX} ${LDFLAGS} +INSTALL:=libtool --mode=install install +LINKXX:=libtool --mode=link ${CXX} -rpath ${stagedir}/lib ${LDFLAGS} +XSLTPROC:=xsltproc --novalid + +EXT_LIBS:= +COMMON_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} -lglite_security_gss_${nothrflavour} +PLUGIN_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}\ + ${classadslib} -lstdc++ ${expatlib} -lexpat\ + +PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo lb_plugin.lo +MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o + +PLUGIN_LIB=glite_lb_plugin.la +MACHINE_LIB=libglite_lb_statemachine.a + +default all: compile + +compile: ${MACHINE_LIB} ${PLUGIN_LIB} + +${PLUGIN_LIB}: ${PLUGIN_LOBJS} + ${SOLINK} -o $@ ${PLUGIN_LOBJS} ${PLUGIN_LIBS} + +${MACHINE_LIB}: ${MACHINE_OBJS} + ar crv $@ ${MACHINE_OBJS} + -ranlib $@ + + +doc: + +stage: compile + $(MAKE) install PREFIX=${stagedir} + +install: + -mkdir -p ${PREFIX}/lib ${PREFIX}/interface ${PREFIX}/include/glite/lb + install -m 644 ${MACHINE_LIB} ${PREFIX}/lib + ${INSTALL} -m 755 ${PLUGIN_LIB} ${PREFIX}/lib + install -m 644 ${top_srcdir}/interface/lb-job-attrs.xsd ${PREFIX}/interface + install -m 644 ${top_srcdir}/interface/lb-job-record.xsd ${PREFIX}/interface + install -m 644 ${top_srcdir}/interface/intjobstat.h ${PREFIX}/include/glite/lb + install -m 644 ${top_srcdir}/interface/seqcode_aux.h ${PREFIX}/include/glite/lb + install -m 644 job_attrs.h ${PREFIX}/include/glite/lb + +clean: + rm -rvf .libs *.o *.lo ${PLUGIN_LIB} ${MACHINE_LIB} job_attrs.h + rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/ + +%.c: %.c.T + rm -f $@ + ${AT3} $< >$@ || rm -f $@ + chmod -w $@ >/dev/null + +%.cpp: %.cpp.T + rm -f $@ + ${AT3} $< >$@ || rm -f $@ + chmod -w $@ >/dev/null + +%.o %.lo: %.c + ${COMPILE} -c $< + +%.lo: %.c + ${COMPILE} -o $@ -c $< + +lb_plugin.lo: lb_plugin.c job_attrs.h + ${COMPILE} -DPLUGIN_DEBUG -o $@ -c $< + +job_attrs.h: lb-job-attrs.xsd job_attrs.xsl + ${XSLTPROC} ../src/job_attrs.xsl $< >$@ + diff --git a/org.glite.lb.state-machine/interface/intjobstat.h b/org.glite.lb.state-machine/interface/intjobstat.h new file mode 100644 index 0000000..45ad4a6 --- /dev/null +++ b/org.glite.lb.state-machine/interface/intjobstat.h @@ -0,0 +1,104 @@ +#ifndef GLITE_LB_INTJOBSTAT_H +#define GLITE_LB_INTJOBSTAT_H + +#ident "$Header$" + +#include "glite/lb/jobstat.h" + +/* + * Internal representation of job state + * (includes edg_wll_JobStat API structure) + */ + +/* convention: revision X.XX - DESCRIPTION */ +/* where X.XX is version from indent + 1 (version after commit) */ +/* and DESCRIPTION is short hit why version changed */ + +#define INTSTAT_VERSION "revision 1.31 - proxy merge" + + +// Internal error codes + +#define RET_FAIL 0 +#define RET_OK 1 +#define RET_FATAL RET_FAIL +#define RET_SOON 2 +#define RET_LATE 3 +#define RET_BADSEQ 4 +#define RET_SUSPECT 5 +#define RET_IGNORE 6 +#define RET_BADBRANCH 7 +#define RET_GOODBRANCH 8 +#define RET_TOOOLD 9 +#define RET_UNREG 10 +#define RET_INTERNAL 100 + + +// shallow resubmission container - holds state of each branch +// (useful when state restore is needed after ReallyRunning event) +// +typedef struct _branch_state { + int branch; + char *destination; + char *ce_node; + char *jdl; + /*!! if adding new field, modify also free_branch_state() */ +} branch_state; + + +typedef struct _intJobStat { + edg_wll_JobStat pub; + int resubmit_type; + char *last_seqcode; + char *last_cancel_seqcode; + char *branch_tag_seqcode; + char *last_branch_seqcode; + char *deep_resubmit_seqcode; + branch_state *branch_states; // branch zero terminated array + + struct timeval last_pbs_event_timestamp; + int pbs_reruning; // true if rerun event arrived + + /*!! if adding new field, modify also destroy_intJobStat_extension() */ + } intJobStat; + +typedef enum _edg_wll_PBSEventSource { + EDG_WLL_PBS_EVENT_SOURCE_UNDEF = 0, + EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER, + EDG_WLL_PBS_EVENT_SOURCE_SERVER, + EDG_WLL_PBS_EVENT_SOURCE_MOM, + EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING, + EDG_WLL_PBS_EVENT_SOURCE__LAST +} edg_wll_PBSEventSource; + +typedef enum _edg_wll_CondorEventSource { + EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF = 0, + EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR, + EDG_WLL_CONDOR_EVENT_SOURCE_MASTER, + EDG_WLL_CONDOR_EVENT_SOURCE_MATCH, + EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR, + EDG_WLL_CONDOR_EVENT_SOURCE_SCHED, + EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW, + EDG_WLL_CONDOR_EVENT_SOURCE_STARTER, + EDG_WLL_CONDOR_EVENT_SOURCE_START, + EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE, + EDG_WLL_CONDOR_EVENT_SOURCE__LAST +} edg_wll_CondorEventSource; + +typedef enum _subjobClassCodes { + SUBJOB_CLASS_UNDEF = 0, + SUBJOB_CLASS_RUNNING, + SUBJOB_CLASS_DONE, + SUBJOB_CLASS_ABORTED, + SUBJOB_CLASS_CLEARED, + SUBJOB_CLASS_REST +} subjobClassCodes; + +void destroy_intJobStat(intJobStat *); +void destroy_intJobStat_extension(intJobStat *p); + + +void init_intJobStat(intJobStat *p); + + +#endif /* GLITE_LB_INTJOBSTAT_H */ diff --git a/org.glite.lb.state-machine/interface/lb-job-attrs.xsd b/org.glite.lb.state-machine/interface/lb-job-attrs.xsd new file mode 100644 index 0000000..a2a68bb --- /dev/null +++ b/org.glite.lb.state-machine/interface/lb-job-attrs.xsd @@ -0,0 +1,132 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Job owner according to LB + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/org.glite.lb.state-machine/interface/lb-job-record.xsd b/org.glite.lb.state-machine/interface/lb-job-record.xsd new file mode 100644 index 0000000..2ad2683 --- /dev/null +++ b/org.glite.lb.state-machine/interface/lb-job-record.xsd @@ -0,0 +1,90 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/org.glite.lb.state-machine/interface/seqcode_aux.h b/org.glite.lb.state-machine/interface/seqcode_aux.h new file mode 100644 index 0000000..5fd1b3a --- /dev/null +++ b/org.glite.lb.state-machine/interface/seqcode_aux.h @@ -0,0 +1,22 @@ +#ident "$Header$" + +int component_seqcode(const char *a, edg_wll_Source index); + +char * set_component_seqcode(char *a,edg_wll_Source index,int val); + +int before_deep_resubmission(const char *a, const char *b); + +int same_branch(const char *a, const char *b) ; + +int edg_wll_compare_pbs_seq(const char *a,const char *b); +#define edg_wll_compare_condor_seq edg_wll_compare_pbs_seq + +edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) ; + +edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) ; + +int edg_wll_compare_seq(const char *a, const char *b); + +int compare_events_by_seq(const void *a, const void *b); + + diff --git a/org.glite.lb.state-machine/src/job_attrs.xsl b/org.glite.lb.state-machine/src/job_attrs.xsl new file mode 100644 index 0000000..0df2f3d --- /dev/null +++ b/org.glite.lb.state-machine/src/job_attrs.xsl @@ -0,0 +1,25 @@ + + + + + + + +#ifndef GLITE_LB_JP_JOB_ATTR_H +#define GLITE_LB_JP_JOB_ATTR_H +#define GLITE_JP_LB_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes" +#define GLITE_JP_LB_JDL_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes:JDL" + +#define GLITE_JP_LB_CLASSAD_NS "http://jdl" +#endif + + + +/** */ +#define GLITE_JP_LB_ GLITE_JP_LB_NS ":" + + + diff --git a/org.glite.lb.state-machine/src/lb_plugin.c b/org.glite.lb.state-machine/src/lb_plugin.c new file mode 100644 index 0000000..afc6e34 --- /dev/null +++ b/org.glite.lb.state-machine/src/lb_plugin.c @@ -0,0 +1,1045 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "glite/lbu/trio.h" + +#include "glite/lb/jobstat.h" +#include "glite/lb/events.h" +#include "glite/lb/producer.h" +#include "glite/lb/events_parse.h" + + +#include "intjobstat.h" +#include "seqcode_aux.h" + +/* +#include "glite/lb/context.h" +#include "glite/lb/jobstat.h" +#include "glite/lb/events.h" +#include "glite/lb/events_parse.h" +#include "glite/lb/producer.h" + +#include "jobstat.h" +#include "get_events.h" +*/ + +#include "glite/jp/types.h" +#include "glite/jp/context.h" +#include "glite/jp/file_plugin.h" +#include "glite/jp/builtin_plugins.h" +#include "glite/jp/backend.h" +#include "glite/jp/attr.h" +#include "glite/jp/utils.h" +#include "glite/jp/known_attr.h" +#include "job_attrs.h" + +#define INITIAL_NUMBER_EVENTS 100 +#define INITIAL_NUMBER_STATES EDG_WLL_NUMBER_OF_STATCODES + +/*typedef struct _lb_buffer_t { + char *buf; + size_t pos, size; + off_t offset; +} lb_buffer_t;*/ + +typedef struct _lb_historyStatus { + edg_wll_JobStatCode state; + struct timeval timestamp; + char *reason; +} lb_historyStatus; + +typedef struct _lb_handle { + edg_wll_Event **events; + edg_wll_JobStat status; + lb_historyStatus **fullStatusHistory, **lastStatusHistory, *finalStatus; + glite_jpps_fplug_data_t* classad_plugin; +} lb_handle; + +#define check_strdup(s) ((s) ? strdup(s) : NULL) + +extern int processEvent(intJobStat *, edg_wll_Event *, int, int, char **); +static void edg_wll_SortPEvents(edg_wll_Event **); + +static int lb_query(void *fpctx, void *handle, const char *attr, glite_jp_attrval_t **attrval); +static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle); +static int lb_close(void *fpctx, void *handle); +static int lb_filecom(void *fpctx, void *handle); +static int lb_status(void *handle); +//static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line); + +static int lb_dummy(void *fpctx, void *handle, int oper, ...) { + puts("lb_dummy() - generic call not used; for testing purposes only..."); + return -1; +} + +int init(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data) { + + data->fpctx = ctx; + + data->uris = calloc(2,sizeof *data->uris); + data->uris[0] = strdup(GLITE_JP_FILETYPE_LB); + + data->classes = calloc(2,sizeof *data->classes); + data->classes[0] = strdup("lb"); + + data->namespaces = calloc(4, sizeof *data->namespaces); + data->namespaces[0] = strdup(GLITE_JP_LB_NS); + data->namespaces[1] = strdup(GLITE_JP_LB_JDL_NS); + data->namespaces[2] = strdup(GLITE_JP_LBTAG_NS); + + data->ops.open = lb_open; + data->ops.close = lb_close; + data->ops.filecom = lb_filecom; + data->ops.attr = lb_query; + data->ops.generic = lb_dummy; + +#ifdef PLUGIN_DEBUG + fprintf(stderr,"lb_plugin: init OK\n"); +#endif + return 0; +} + + +void done(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data) { + free(data->uris[0]); + free(data->classes[0]); + free(data->namespaces[0]); + free(data->namespaces[1]); + free(data->namespaces[2]); + free(data->uris); + free(data->classes); + free(data->namespaces); + memset(data, 0, sizeof(*data)); +} + + +static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle) { + + lb_handle *h; + rl_buffer_t buffer; + glite_jp_context_t ctx = (glite_jp_context_t) fpctx; + char *line; + int retval; + edg_wll_Context context; + int nevents, maxnevents, i; + glite_jp_error_t err; + char *id0 = NULL,*id = NULL; + + glite_jp_clear_error(ctx); + h = calloc(1, sizeof(lb_handle)); + + if ((retval = edg_wll_InitContext(&context)) != 0) return retval; + + // read the file given by bhandle + // parse events into h->events array + memset(&buffer, 0, sizeof(buffer)); + buffer.buf = malloc(BUFSIZ); + maxnevents = INITIAL_NUMBER_EVENTS; + nevents = 0; + h->events = malloc(maxnevents * sizeof(edg_wll_Event *)); + + if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) { + err.code = retval; + err.desc = "reading LB logline"; + err.source = "lb_plugin.c:read_line()"; + glite_jp_stack_error(ctx,&err); + goto fail; + } + while (line) { +#ifdef PLUGIN_DEBUG + //fprintf(stderr,"lb_plugin opened\n", line); +#endif + + if (line[0]) { + if (nevents >= maxnevents) { + maxnevents <<= 1; + h->events = realloc(h->events, maxnevents * sizeof(edg_wll_Event *)); + } + if ((retval = edg_wll_ParseEvent(context, line, &h->events[nevents])) != 0) { + char *ed; + free(line); + err.code = retval; + edg_wll_Error(context,NULL,&ed); + err.desc = ed; + err.source = "edg_wll_ParseEvent()"; + glite_jp_stack_error(ctx,&err); + free(ed); + goto fail; + } + if (nevents == 0) { + id0 = edg_wlc_JobIdGetUnique(h->events[nevents]->any.jobId ); + } else { + id = edg_wlc_JobIdGetUnique(h->events[nevents]->any.jobId ); + if (strcmp(id0,id) != 0) { + char et[BUFSIZ]; + retval = EINVAL; + err.code = retval; + snprintf(et,sizeof et,"Attempt to process different jobs. Id '%s' (event n.%d) differs from '%s'",id,nevents+1,id0); + et[BUFSIZ-1] = 0; + err.desc = et; + err.source = "lb_plugin.c:edg_wlc_JobIdGetUnique()"; + glite_jp_stack_error(ctx,&err); + goto fail; + } + } + + if (id) free(id); id = NULL; + nevents++; + } + free(line); + + if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) { + err.code = retval; + err.desc = "reading LB logline"; + err.source = "lb_plugin.c:read_line()"; + glite_jp_stack_error(ctx,&err); + goto fail; + } + } + free(line); + + free(buffer.buf); + edg_wll_FreeContext(context); + + if (nevents >= maxnevents) { + maxnevents <<= 1; + h->events = realloc(h->events, maxnevents * sizeof(edg_wll_Event *)); + } + h->events[nevents] = NULL; + +#ifdef PLUGIN_DEBUG + fprintf(stderr,"lb_plugin: opened %d events\n", nevents); +#endif + + // find classad plugin, if it is loaded + int j; + h->classad_plugin = NULL; + for (i=0; ctx->plugins[i]; i++){ + glite_jpps_fplug_data_t *pd = ctx->plugins[i]; + if (pd->namespaces) + for (j=0; pd->classes[j]; j++) + if (! strcmp(pd->classes[j], "classad")){ + h->classad_plugin = pd; + goto cont; + } + } +cont: + + /* count state and status history of the job given by the loaded events */ + if ((retval = lb_status(h)) != 0) goto fail; + + *handle = (void *)h; + + return 0; + +fail: +#ifdef PLUGIN_DEBUG + fprintf(stderr,"lb_plugin: open ERROR\n"); +#endif + for (i = 0; i < nevents; i++) { + edg_wll_FreeEvent(h->events[i]); + free(h->events[i]); + } + free(h->events); + free(buffer.buf); + if (id0) free(id0); + if (id) free(id); + edg_wll_FreeContext(context); + free(h); + *handle = NULL; + err.code = EIO; + err.desc = NULL; + err.source = __FUNCTION__; + glite_jp_stack_error(ctx,&err); + + return retval; +} + + +static int lb_close(void *fpctx,void *handle) { + + lb_handle *h = (lb_handle *) handle; + int i; + + // Free allocated stuctures + if (h->events) { + i = 0; + while (h->events[i]) { + edg_wll_FreeEvent(h->events[i]); + free(h->events[i]); + i++; + } + free(h->events); + } + + if (h->status.state != EDG_WLL_JOB_UNDEF) + edg_wll_FreeStatus(&h->status); + + if (h->fullStatusHistory) { + i = 0; + while (h->fullStatusHistory[i]) { + if (h->fullStatusHistory[i]->reason) free(h->fullStatusHistory[i]->reason); + free (h->fullStatusHistory[i]); + i++; + } + h->fullStatusHistory = NULL; + h->lastStatusHistory = NULL; + h->finalStatus = NULL; + } + + free(h); + +#ifdef PLUGIN_DEBUG + fprintf(stderr,"lb_plugin: close OK\n"); +#endif + return 0; +} + +static int get_classad_attr(const char* attr, glite_jp_context_t ctx, lb_handle *h, glite_jp_attrval_t **av){ + printf("attr = %s\n", attr); + glite_jp_error_t err; + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if (! h->classad_plugin){ + err.code = ENOENT; + err.desc = strdup("Classad plugin has not been loaded."); + return glite_jp_stack_error(ctx,&err); + } + // Get the attribute from JDL + int i = 0; + while (h->events[i]){ + if (h->events[i]->type == EDG_WLL_EVENT_REGJOB + && h->events[i]->regJob.jdl + && h->events[i]->regJob.jdl[0]) + { + void *beh; + if (! h->classad_plugin->ops.open_str(h->classad_plugin->fpctx, h->events[i]->regJob.jdl, "", "", &beh)){ + if (! h->classad_plugin->ops.attr(h->classad_plugin->fpctx, beh, attr, av)) + (*av)[0].timestamp = h->events[i]->any.timestamp.tv_sec; + else{ + h->classad_plugin->ops.close(h->classad_plugin->fpctx, beh); + err.code = ENOENT; + err.desc = strdup("Classad attribute not found."); + return glite_jp_stack_error(ctx,&err); + } + h->classad_plugin->ops.close(h->classad_plugin->fpctx, beh); + } + } + i++; + } + return 0; +} + +static int lb_query(void *fpctx,void *handle, const char *attr,glite_jp_attrval_t **attrval) { + lb_handle *h = (lb_handle *) handle; + glite_jp_context_t ctx = (glite_jp_context_t) fpctx; + glite_jp_error_t err; + glite_jp_attrval_t *av = NULL; + int i, j, n_tags; + char *ns = glite_jpps_get_namespace(attr); + char *tag; + + glite_jp_clear_error(ctx); + memset(&err,0,sizeof err); + err.source = __FUNCTION__; + + if ((h->events == NULL) || + (h->status.state == EDG_WLL_JOB_UNDEF) || + (h->fullStatusHistory == NULL) ) { + *attrval = NULL; + err.code = ENOENT; + err.desc = strdup("There is no job information to query."); + return glite_jp_stack_error(ctx,&err); + } + + if (strcmp(ns, GLITE_JP_LB_JDL_NS) == 0){ + if (get_classad_attr(attr, ctx, h, &av)){ + *attrval = NULL; + err.code = ENOENT; + err.desc = strdup("Cannot get attribute from classad."); + free(ns); + return glite_jp_stack_error(ctx,&err); + } + } + else if (strcmp(attr, GLITE_JP_LB_user) == 0) { + if (h->status.owner) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->status.owner); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_jobId) == 0) { + if (h->status.jobId) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = edg_wlc_JobIdUnparse(h->status.jobId); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_parent) == 0) { + if (h->status.parent_job) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = edg_wlc_JobIdUnparse(h->status.parent_job); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_VO) == 0) { + if (get_classad_attr(":VirtualOrganisation", ctx, h, &av)){ + printf("error"); + *attrval = NULL; + err.code = ENOENT; + err.desc = strdup("Cannot get attribute from classad."); + free(ns); + return glite_jp_stack_error(ctx,&err); + } + } else if (strcmp(attr, GLITE_JP_LB_eNodes) == 0) { + if (get_classad_attr(":max_nodes_running", ctx, h, &av)){ + printf("error"); + *attrval = NULL; + err.code = ENOENT; + err.desc = strdup("Cannot get attribute from classad."); + free(ns); + return glite_jp_stack_error(ctx,&err); + } + } else if (strcmp(attr, GLITE_JP_LB_eProc) == 0) { + if (get_classad_attr(":NodeNumber", ctx, h, &av)){ + printf("error"); + *attrval = NULL; + err.code = ENOENT; + err.desc = strdup("Cannot get attribute from classad."); + free(ns); + return glite_jp_stack_error(ctx,&err); + } + } else if (strcmp(attr, GLITE_JP_LB_aTag) == 0 || + strcmp(attr, GLITE_JP_LB_rQType) == 0 || + strcmp(attr, GLITE_JP_LB_eDuration) == 0) { + /* have to be retrieved from JDL, but probably obsolete and not needed at all */ + char et[BUFSIZ]; + *attrval = NULL; + err.code = ENOSYS; + snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr); + et[BUFSIZ-1] = 0; + err.desc = et; + return glite_jp_stack_error(ctx,&err); + } else if (strcmp(attr, GLITE_JP_LB_RB) == 0) { + if (h->status.network_server) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->status.network_server); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_CE) == 0) { + if (h->status.destination) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->status.destination); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_host) == 0) { + if (h->status.ce_node) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->status.ce_node); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_UIHost) == 0) { + i = 0; + while (h->events[i]) { + if (h->events[i]->type == EDG_WLL_EVENT_REGJOB) { + if (h->events[i]->any.host) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->events[i]->any.host); + av[0].size = -1; + av[0].timestamp = h->events[i]->any.timestamp.tv_sec; + } + break; + } + i++; + } + } else if (strcmp(attr, GLITE_JP_LB_CPUTime) == 0) { + if (h->status.cpuTime) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + trio_asprintf(&av[0].value,"%d", h->status.cpuTime); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_NProc) == 0) { + /* currently LB hasn't got the info */ + char et[BUFSIZ]; + *attrval = NULL; + err.code = ENOSYS; + snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr); + et[BUFSIZ-1] = 0; + err.desc = et; + return glite_jp_stack_error(ctx,&err); + } else if (strcmp(attr, GLITE_JP_LB_finalStatus) == 0) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + if (h->finalStatus) { + av[0].value = edg_wll_StatToString(h->finalStatus->state); + av[0].timestamp = h->finalStatus->timestamp.tv_sec; + } else { + av[0].value = edg_wll_StatToString(h->status.state); + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + av[0].size = -1; + } else if (strcmp(attr, GLITE_JP_LB_finalStatusDate) == 0) { + struct tm *t = NULL; + if ( (h->finalStatus) && + ((t = gmtime(&h->finalStatus->timestamp.tv_sec)) != NULL) ) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */ + trio_asprintf(&av[0].value,"%04d-%02d-%02dT%02d:%02d:%02d.%06d", + 1900+t->tm_year, 1+t->tm_mon, t->tm_mday, + t->tm_hour, t->tm_min, t->tm_sec, + h->finalStatus->timestamp.tv_usec); + av[0].size = -1; + av[0].timestamp = h->finalStatus->timestamp.tv_sec; + } else if ((t = gmtime(&h->status.lastUpdateTime.tv_sec)) != NULL) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */ + trio_asprintf(&av[0].value,"%04d-%02d-%02dT%02d:%02d:%02d.%06d", + 1900+t->tm_year, 1+t->tm_mon, t->tm_mday, + t->tm_hour, t->tm_min, t->tm_sec, + h->status.lastUpdateTime.tv_usec); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_finalStatusReason) == 0) { + if (h->finalStatus && h->finalStatus->reason) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->finalStatus->reason); + av[0].size = -1; + av[0].timestamp = h->finalStatus->timestamp.tv_sec; + } else if (h->status.reason) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->status.reason); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_LRMSDoneStatus) == 0) { + i = 0; + j = -1; + while (h->events[i]) { + if ( (h->events[i]->type == EDG_WLL_EVENT_DONE) && + (h->events[i]->any.source == EDG_WLL_SOURCE_LRMS) ) + j = i; + i++; + } + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].size = -1; + if ( j != -1) { + av[0].value = edg_wll_DoneStatus_codeToString(h->events[j]->done.status_code); + av[0].timestamp = h->events[j]->any.timestamp.tv_sec; + } else { + av[0].value = edg_wll_DoneStatus_codeToString(h->status.done_code); + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_LRMSStatusReason) == 0) { + i = 0; + j = -1; + while (h->events[i]) { + if ( (h->events[i]->type == EDG_WLL_EVENT_DONE) && + (h->events[i]->any.source == EDG_WLL_SOURCE_LRMS) ) + j = i; + i++; + } + if ( ( j != -1) && (h->events[j]->done.reason) ) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->events[j]->done.reason); + av[0].size = -1; + av[0].timestamp = h->events[j]->any.timestamp.tv_sec; + } + } else if (strcmp(attr, GLITE_JP_LB_retryCount) == 0) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + trio_asprintf(&av[0].value,"%d", h->status.resubmitted); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } else if (strcmp(attr, GLITE_JP_LB_additionalReason) == 0) { + /* what is it? */ + char et[BUFSIZ]; + *attrval = NULL; + err.code = ENOSYS; + snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr); + et[BUFSIZ-1] = 0; + err.desc = et; + return glite_jp_stack_error(ctx,&err); + } else if (strcmp(attr, GLITE_JP_LB_jobType) == 0) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + switch (h->status.jobtype) { + case EDG_WLL_STAT_SIMPLE: + av[0].value = strdup("SIMPLE"); break; + case EDG_WLL_STAT_DAG: + av[0].value = strdup("DAG"); break; + default: + av[0].value = strdup("UNKNOWN"); break; + } + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } else if (strcmp(attr, GLITE_JP_LB_nsubjobs) == 0) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + trio_asprintf(&av[0].value,"%d", h->status.children_num); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } else if (strcmp(attr, GLITE_JP_LB_subjobs) == 0) { + if (h->status.children_num > 0) { + char *val = NULL, *old_val; + + old_val = strdup (""); + for (i=0; istatus.children_num; i++) { + trio_asprintf(&val,"%s\t\t%s\n", + old_val, h->status.children[i] ? h->status.children[i] : ""); + if (old_val) free(old_val); + old_val = val; val = NULL; + } + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = check_strdup(old_val); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } else { + char et[BUFSIZ]; + *attrval = NULL; + err.code = ENOENT; + snprintf(et,sizeof et,"Value unknown for attribute '%s', there are no subjobs.",attr); + et[BUFSIZ-1] = 0; + err.desc = et; + return glite_jp_stack_error(ctx,&err); + } + } else if (strcmp(attr, GLITE_JP_LB_lastStatusHistory) == 0) { + int i,j; + char *val, *old_val, *s_str, *t_str, *r_str; + struct tm *t; + + val = s_str = t_str = r_str = NULL; + old_val = strdup(""); + t = calloc(1, sizeof(*t)); + /* first record is Submitted - hopefully in fullStatusHistory[0] */ + if ((h->fullStatusHistory[0] && + (h->fullStatusHistory[0]->state == EDG_WLL_JOB_SUBMITTED)) ) { + + s_str = edg_wll_StatToString(h->fullStatusHistory[0]->state); + for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]); + if (gmtime_r(&h->fullStatusHistory[0]->timestamp.tv_sec,t) != NULL) { + /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */ + trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ", + 1900+t->tm_year, 1+t->tm_mon, t->tm_mday, + t->tm_hour, t->tm_min, t->tm_sec, + h->fullStatusHistory[0]->timestamp.tv_usec); + } + if (h->fullStatusHistory[0]->reason) { + trio_asprintf(&r_str,"reason=\"%s\" ",h->fullStatusHistory[0]->reason); + } + trio_asprintf(&val,"%s\t\t\n", + old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : ""); + if (s_str) free(s_str); + if (t_str) free(t_str); + if (r_str) free(t_str); + if (old_val) free(old_val); + old_val = val; val = NULL; + } + /* and the rest is from last Waiting to the end - i.e. all lastStatusHistory[] */ + if (h->lastStatusHistory) { + i = 0; + while (h->lastStatusHistory[i]) { + s_str = edg_wll_StatToString(h->lastStatusHistory[i]->state); + for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]); + if (gmtime_r(&h->lastStatusHistory[i]->timestamp.tv_sec,t) != NULL) { + /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */ + trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ", + 1900+t->tm_year, 1+t->tm_mon, t->tm_mday, + t->tm_hour, t->tm_min, t->tm_sec, + h->lastStatusHistory[i]->timestamp.tv_usec); + } + if (h->lastStatusHistory[i]->reason) { + trio_asprintf(&r_str,"reason=\"%s\" ",h->lastStatusHistory[i]->reason); + } + trio_asprintf(&val,"%s\t\t\n", + old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : ""); + if (s_str) free(s_str); s_str = NULL; + if (t_str) free(t_str); t_str = NULL; + if (r_str) free(r_str); r_str = NULL; + if (old_val) free(old_val); + old_val = val; val = NULL; + i++; + } + } + val = old_val; old_val = NULL; + if (val) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(val); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + free(val); + } + } else if (strcmp(attr, GLITE_JP_LB_fullStatusHistory) == 0) { + int i,j; + char *val, *old_val, *s_str, *t_str, *r_str; + struct tm *t; + + val = s_str = t_str = r_str = NULL; + old_val = strdup(""); + t = calloc(1, sizeof(*t)); + i = 0; + while (h->fullStatusHistory[i]) { + s_str = edg_wll_StatToString(h->fullStatusHistory[i]->state); + for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]); + if (gmtime_r(&h->fullStatusHistory[i]->timestamp.tv_sec,t) != NULL) { + /* dateTime format: yyyy-mm-ddThh:mm:ss:uuuuuu */ + trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ", + 1900+t->tm_year, 1+t->tm_mon, t->tm_mday, + t->tm_hour, t->tm_min, t->tm_sec, + h->fullStatusHistory[i]->timestamp.tv_usec); + } + if (h->fullStatusHistory[i]->reason) { + trio_asprintf(&r_str,"reason=\"%s\" ",h->fullStatusHistory[i]->reason); + } + trio_asprintf(&val,"%s\t\t\n", + old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : ""); + if (s_str) free(s_str); s_str = NULL; + if (t_str) free(t_str); t_str = NULL; + if (r_str) free(r_str); r_str = NULL; + if (old_val) free(old_val); + old_val = val; val = NULL; + i++; + } + val = old_val; old_val = NULL; + if (val) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(val); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + free(val); + } + } else if (strcmp(ns, GLITE_JP_LBTAG_NS) == 0) { + tag = strrchr(attr, ':'); + if (h->events && tag) { + tag++; + i = 0; + n_tags = 0; + + while (h->events[i]) { + if ((h->events[i]->type == EDG_WLL_EVENT_USERTAG) && + (strcasecmp(h->events[i]->userTag.name, tag) == 0) ) { +/* XXX: LB tag names are case-insensitive */ + av = realloc(av, (n_tags+2) * sizeof(glite_jp_attrval_t)); + memset(&av[n_tags], 0, 2 * sizeof(glite_jp_attrval_t)); + + av[n_tags].name = strdup(attr); + av[n_tags].value = check_strdup(h->events[i]->userTag.value); + av[n_tags].timestamp = + h->events[i]->any.timestamp.tv_sec; + av[n_tags].size = -1; + + n_tags++; + } + i++; + } + } + } else if (strcmp(attr, GLITE_JP_LB_JDL) == 0) { + if (h->status.jdl) { + av = calloc(2, sizeof(glite_jp_attrval_t)); + av[0].name = strdup(attr); + av[0].value = strdup(h->status.jdl); + av[0].size = -1; + av[0].timestamp = h->status.lastUpdateTime.tv_sec; + } + } else { + char et[BUFSIZ]; + *attrval = NULL; + err.code = EINVAL; + snprintf(et,sizeof et,"No such attribute '%s'.",attr); + et[BUFSIZ-1] = 0; + err.desc = et; + return glite_jp_stack_error(ctx,&err); + } + + free(ns); + + if (av && av[0].value) { + for (i=0; av[i].name; i++) av[i].origin = GLITE_JP_ATTR_ORIG_FILE; + *attrval = av; + return 0; + } else { + char et[BUFSIZ]; + *attrval = NULL; + err.code = ENOENT; + snprintf(et,sizeof et,"Value unknown for attribute '%s'.",attr); + et[BUFSIZ-1] = 0; + err.desc = et; + if (av) glite_jp_attrval_free(av,1); // XXX: probably not needed + return glite_jp_stack_error(ctx,&err); + } +} + +static int lb_filecom(void *fpctx, void *handle){ + glite_jp_context_t ctx = (glite_jp_context_t) fpctx; + lb_handle *h = (lb_handle *) handle; + glite_jp_attrval_t attr[2]; + memset(attr, 0, 2 * sizeof(glite_jp_attrval_t)); + + if (h->events) { + int i = 0; + while (h->events[i]) { + if (h->events[i]->type == EDG_WLL_EVENT_USERTAG && + strchr(h->events[i]->userTag.name,':')) + { + //printf("%s, %s\n", edg_wlc_JobIdUnparse(h->status.jobId), h->status.jobId); + attr[0].name = h->events[i]->userTag.name; + attr[0].value = h->events[i]->userTag.value; + attr[0].binary = 0; + attr[0].origin = GLITE_JP_ATTR_ORIG_USER; + attr[0].timestamp = time(NULL); + attr[0].origin_detail = NULL; /* XXX */ + attr[1].name = NULL; + glite_jppsbe_append_tag(ctx, edg_wlc_JobIdUnparse(h->status.jobId), attr); + } + i++; + } + } + + return 0; +} + +static int lb_status(void *handle) { + + lb_handle *h = (lb_handle *) handle; + intJobStat *js; + int maxnstates, nstates, i, be_strict = 0, retval; + char *errstring; + edg_wll_JobStatCode old_state = EDG_WLL_JOB_UNDEF; + + js = calloc(1, sizeof(intJobStat)); + init_intJobStat(js); + + edg_wll_SortPEvents(h->events); + + maxnstates = INITIAL_NUMBER_STATES; + nstates = 0; + h->fullStatusHistory = calloc(maxnstates, sizeof(lb_historyStatus *)); + h->lastStatusHistory = NULL; + h->finalStatus = NULL; + i = 0; + while (h->events[i]) + { + /* realloc the fullStatusHistory if needed */ + if (nstates >= maxnstates) { + maxnstates <<= 1; + h->fullStatusHistory = realloc(h->fullStatusHistory, maxnstates * sizeof(lb_historyStatus *)); + } + + /* job owner and jobId not filled from events normally */ + if (h->events[i]->any.type == EDG_WLL_EVENT_REGJOB) { + js->pub.owner = check_strdup(h->events[i]->any.user); + if (edg_wlc_JobIdDup(h->events[i]->any.jobId,&js->pub.jobId)) { + goto err; + } + } + /* Process Event and update the state */ + if (processEvent(js, h->events[i], 0, be_strict, &errstring) == RET_FATAL) { + goto err; + } + + /* if the state has changed, update the status history */ + if (js->pub.state != old_state) { + h->fullStatusHistory[nstates] = calloc(1,sizeof(lb_historyStatus)); + h->fullStatusHistory[nstates]->state = js->pub.state; + h->fullStatusHistory[nstates]->timestamp.tv_sec = js->pub.stateEnterTime.tv_sec; + h->fullStatusHistory[nstates]->timestamp.tv_usec = js->pub.stateEnterTime.tv_usec; + h->fullStatusHistory[nstates]->reason = check_strdup(js->pub.reason); + /* lastStatusHistory starts from the last WAITING state */ + if (js->pub.state == EDG_WLL_JOB_WAITING) { + h->lastStatusHistory = &(h->fullStatusHistory[nstates]); + } + /* finalStatus is the one preceeding the CLEARED state */ + if ( (js->pub.state == EDG_WLL_JOB_CLEARED) && (nstates > 0) ) { + h->finalStatus = h->fullStatusHistory[nstates-1]; + } + old_state = js->pub.state; + nstates++; + } + + i++; + } + h->fullStatusHistory[nstates] = NULL; + /* if there is no CLEARED state, finalStatus is just the last status + and if there is no such thing, leave h->finalStatus NULL and for the attribute + try to read something from the h->status */ + if ( (h->finalStatus == NULL) && (nstates > 0) ) { + h->finalStatus = h->fullStatusHistory[nstates-1]; + } + + /* fill in also subjobs */ + if (js->pub.children_num > 0) { + edg_wll_Context context; + edg_wlc_JobId *subjobs; + + if ((retval = edg_wll_InitContext(&context)) != 0) return retval; + subjobs = calloc(js->pub.children_num, sizeof (*subjobs)); + if ((retval = edg_wll_GenerateSubjobIds(context, + js->pub.jobId, js->pub.children_num, js->pub.seed, &subjobs) ) != 0 ) { + goto err; + } + js->pub.children = calloc(js->pub.children_num + 1, sizeof (*js->pub.children)); + for (i=0; ipub.children_num; i++) { + js->pub.children[i] = edg_wlc_JobIdUnparse(subjobs[i]); + } + edg_wll_FreeContext(context); + free(subjobs); + } + + memcpy(&h->status, &js->pub, sizeof(edg_wll_JobStat)); + + // not very clean, but working + memset(&js->pub, 0, sizeof(edg_wll_JobStat)); + destroy_intJobStat(js); + + return 0; + +err: + destroy_intJobStat(js); + return -1; +} + + +/* + * realloc the line to double size if needed + * + * \return 0 if failed, did nothing + * \return 1 if success + */ +/*int check_realloc_line(char **line, size_t *maxlen, size_t len) { + void *tmp; + + if (len > *maxlen) { + *maxlen <<= 1; + tmp = realloc(*line, *maxlen); + if (!tmp) return 0; + *line = tmp; + } + + return 1; +} +*/ + +/* + * read next line from stream + * + * \return error code + */ +/*static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line) { + + size_t maxlen, len, i; + ssize_t nbytes; + int retval, z, end; + + maxlen = BUFSIZ; + i = 0; + len = 0; + *line = malloc(maxlen); + end = 0; + + do { + // read next portion + if (buffer->pos >= buffer->size) { + buffer->pos = 0; + buffer->size = 0; + if ((retval = glite_jppsbe_pread(ctx, handle, buffer->buf, BUFSIZ, buffer->offset, &nbytes)) == 0) { + if (nbytes < 0) { + retval = EINVAL; + goto fail; + } else { + if (nbytes) { + buffer->size = (size_t)nbytes; + buffer->offset += nbytes; + } else end = 1; + } + } else goto fail; + } + + // we have buffer->size - buffer->pos bytes + i = buffer->pos; + do { + if (i >= buffer->size) z = '\0'; + else { + z = buffer->buf[i]; + if (z == '\n') z = '\0'; + } + len++; + + if (!check_realloc_line(line, &maxlen, len)) { + retval = ENOMEM; + goto fail; + } + (*line)[len - 1] = z; + i++; + } while (z && i < buffer->size); + buffer->pos = i; + } while (len && (*line)[len - 1] != '\0'); + + if ((!len || !(*line)[0]) && end) { + free(*line); + *line = NULL; + } + + return 0; + +fail: + free(*line); + *line = NULL; + return retval; +} +*/ + + +static int compare_pevents_by_seq(const void *a, const void *b) +{ + const edg_wll_Event **e = (const edg_wll_Event **) a; + const edg_wll_Event **f = (const edg_wll_Event **) b; + return compare_events_by_seq(*e,*f); +} + + +static void edg_wll_SortPEvents(edg_wll_Event **e) +{ + edg_wll_Event **p; + int n; + + if (!e) return; + p = e; + for (n=0; *p; n++) { + p++; + } + qsort(e,n,sizeof(*e),compare_pevents_by_seq); +} + diff --git a/org.glite.lb.state-machine/src/process_event.c b/org.glite.lb.state-machine/src/process_event.c new file mode 100644 index 0000000..006c8d1 --- /dev/null +++ b/org.glite.lb.state-machine/src/process_event.c @@ -0,0 +1,1011 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include + +#include "glite/lb/context-int.h" + +#include "glite/lbu/trio.h" + +#include "intjobstat.h" +#include "seqcode_aux.h" + + +/* TBD: share in whole logging or workload */ +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring); +int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring); +int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring); + +int add_stringlist(char ***lptr, const char *new_item); + +int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) +{ + if (js->pub.jobtype == -1 && e->type == EDG_WLL_EVENT_REGJOB) + switch (e->regJob.jobtype) { + case EDG_WLL_REGJOB_SIMPLE: + js->pub.jobtype = EDG_WLL_STAT_SIMPLE; + break; + case EDG_WLL_REGJOB_DAG: + case EDG_WLL_REGJOB_PARTITIONABLE: + case EDG_WLL_REGJOB_PARTITIONED: + js->pub.jobtype = EDG_WLL_STAT_DAG; + break; + case EDG_WLL_REGJOB_COLLECTION: + js->pub.jobtype = EDG_WLL_STAT_COLLECTION; + break; + case EDG_WLL_REGJOB_PBS: + js->pub.jobtype = EDG_WLL_STAT_PBS; + break; + case EDG_WLL_REGJOB_CONDOR: + js->pub.jobtype = EDG_WLL_STAT_CONDOR; + break; + default: + trio_asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype); + return RET_FAIL; + } + + switch (js->pub.jobtype) { + case EDG_WLL_STAT_SIMPLE: + case EDG_WLL_STAT_DAG: + case EDG_WLL_STAT_COLLECTION: + return processEvent_glite(js,e,ev_seq,strict,errstring); + case EDG_WLL_STAT_PBS: + return processEvent_PBS(js,e,ev_seq,strict,errstring); + case EDG_WLL_STAT_CONDOR: + return processEvent_Condor(js,e,ev_seq,strict,errstring); + case -1: return RET_UNREG; + default: + trio_asprintf(errstring,"undefined job type %d",js->pub.jobtype); + return RET_FAIL; + } +} + +#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } +#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } } + +static void free_stringlist(char ***lptr) +{ + char **itptr; + int i; + + if (*lptr) { + for (i = 0, itptr = *lptr; itptr[i] != NULL; i++) + free(itptr[i]); + free(itptr); + *lptr = NULL; + } +} + +static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char *new_item2) +{ + edg_wll_TagValue *itptr; + int i; + + if (*lptr == NULL) { + itptr = (edg_wll_TagValue *) calloc(2,sizeof(edg_wll_TagValue)); + itptr[0].tag = strdup(new_item); + itptr[0].value = strdup(new_item2); + *lptr = itptr; + return 1; + } else { + for (i = 0, itptr = *lptr; itptr[i].tag != NULL; i++) + if ( !strcasecmp(itptr[i].tag, new_item) ) + { + free(itptr[i].value); + itptr[i].value = strdup(new_item2); + return 1; + } + itptr = (edg_wll_TagValue *) realloc(*lptr, (i+2)*sizeof(edg_wll_TagValue)); + if (itptr != NULL) { + itptr[i].tag = strdup(new_item); + itptr[i].value = strdup(new_item2); + itptr[i+1].tag = NULL; + itptr[i+1].value = NULL; + *lptr = itptr; + return 1; + } else { + return 0; + } + } +} + + +static void update_branch_state(char *b, char *d, char *c, char *j, branch_state **bs) +{ + int i = 0, branch; + + + if (!b) + return; + else + branch = component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER); + + if (*bs != NULL) { + while ((*bs)[i].branch) { + if (branch == (*bs)[i].branch) { + if (d) rep((*bs)[i].destination, d); + if (c) rep((*bs)[i].ce_node, c); + if (j) rep((*bs)[i].jdl, j); + + return; + } + i++; + } + } + + *bs = (branch_state *) realloc(*bs, (i+2)*sizeof(branch_state)); + memset(&((*bs)[i]), 0, 2*sizeof(branch_state)); + + (*bs)[i].branch = branch; + rep((*bs)[i].destination, d); + rep((*bs)[i].ce_node, c); + rep((*bs)[i].jdl, j); +} + + +static void free_branch_state(branch_state **bs) +{ + int i = 0; + + if (*bs == NULL) return; + + while ((*bs)[i].branch) { + free((*bs)[i].destination); + free((*bs)[i].ce_node); + free((*bs)[i].jdl); + i++; + } + free(*bs); + *bs = NULL; +} + +static int compare_branch_states(const void *a, const void *b) +{ + branch_state *c = (branch_state *) a; + branch_state *d = (branch_state *) b; + + if (c->branch < d->branch) return -1; + if (c->branch == d->branch) return 0; + /* avoid warning: if (c->branch > d->branch) */ return 1; +} + +static void load_branch_state(intJobStat *js) +{ + int i, j, branch; + + + if ( (!js->branch_tag_seqcode) || (!js->branch_states) ) return; + + branch = component_seqcode(js->branch_tag_seqcode, EDG_WLL_SOURCE_WORKLOAD_MANAGER); + + // count elements + i = 0; + while (js->branch_states[i].branch) i++; + + // sort them + qsort(js->branch_states, (size_t) i, sizeof(branch_state), + compare_branch_states); + + // find row corresponding to ReallyRunning WM seq.code (aka branch) + i = 0; + while (js->branch_states[i].branch) { + if (js->branch_states[i].branch == branch) break; + i++; + } + + // copy this and two before branches data to final state + // (each field - dest,ce,jdl - comes from different event) + // (and these events have most likely different WM seq.codes) + // (even belonging into one logical branch) + // (the newer the more important - so i-th element is copied as last) + // (and may overwrite data from previous elements) + for (j = i - 2; j <= i; j++) { + if (j >= 0) { + if (js->branch_states[j].destination) + rep(js->pub.destination, js->branch_states[j].destination); + if (js->branch_states[j].ce_node) + rep(js->pub.ce_node, js->branch_states[j].ce_node); + if (js->branch_states[j].jdl) + rep(js->pub.matched_jdl, js->branch_states[j].jdl); + } + } +} + +// clear branches (deep resub. or abort) +static void reset_branch(intJobStat *js, edg_wll_Event *e) +{ + js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB; + free_stringlist(&js->pub.possible_destinations); + free_stringlist(&js->pub.possible_ce_nodes); + free_branch_state(&js->branch_states); + js->pub.payload_running = 0; + rep(js->branch_tag_seqcode, NULL); + rep(js->deep_resubmit_seqcode, e->any.seqcode); +} + +static char* location_string(const char *source, const char *host, const char *instance) +{ + char *ret; + trio_asprintf(&ret, "%s/%s/%s", source, host, instance); + return ret; +} + +/* is seq. number of 'es' before WMS higher then 'js' */ +static int after_enter_wm(const char *es,const char *js) +{ + return ((component_seqcode(es,EDG_WLL_SOURCE_NETWORK_SERVER) > + component_seqcode(js,EDG_WLL_SOURCE_NETWORK_SERVER)) + || + (component_seqcode(es,EDG_WLL_SOURCE_USER_INTERFACE) > + component_seqcode(js,EDG_WLL_SOURCE_USER_INTERFACE))); +} + + +static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUSED_VAR) +{ + char *str; + + str = edg_wll_EventToString(e->any.type); + fprintf(stderr, "edg_wll_JobStatus: bad event: type %d (%s)\n", + e->any.type, (str == NULL) ? "unknown" : str); + free(str); + return RET_FATAL; +} + + +// (?) || (0 && 1) => true if (res == RET_OK) +#define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict)) + +// (?) || (1 && 1) => always true +#define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict)) + +#define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH) +#define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE) +#define PARSABLE_SEQCODE(code) (component_seqcode((code),0) >= 0) + +static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) +{ + edg_wll_JobStatCode old_state = js->pub.state; + edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN; + int res = RET_OK, + fine_res = RET_OK; + + int lm_favour_lrms = 0; + + // Aborted may not be terminal state for collection in some cases + // i.e. if some Done/failed subjob is resubmitted + if ( (old_state == EDG_WLL_JOB_ABORTED && e->any.type != EDG_WLL_EVENT_COLLECTIONSTATE) || + old_state == EDG_WLL_JOB_CANCELLED || + old_state == EDG_WLL_JOB_CLEARED) { + res = RET_LATE; + } + +/* new event coming from NS or UI => forget about any resubmission loops */ + if (e->type != EDG_WLL_EVENT_CANCEL && + js->last_seqcode && + after_enter_wm(e->any.seqcode,js->last_seqcode)) + { + rep(js->branch_tag_seqcode,NULL); + rep(js->deep_resubmit_seqcode,NULL); + rep(js->last_branch_seqcode,NULL); + } + + if (js->deep_resubmit_seqcode && + before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) { + res = RET_LATE; + fine_res = RET_TOOOLD; + } + else if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived + if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) { + if ((js->last_branch_seqcode != NULL) && + edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) { + res = RET_LATE; + } + fine_res = RET_GOODBRANCH; + } + else { + res = RET_LATE; + fine_res = RET_BADBRANCH; + } + } + else if ((js->last_seqcode != NULL) && + edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) { + res = RET_LATE; + } + + + switch (e->any.type) { + case EDG_WLL_EVENT_TRANSFER: + if (e->transfer.result == EDG_WLL_TRANSFER_OK) { + switch (e->transfer.source) { + case EDG_WLL_SOURCE_USER_INTERFACE: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_JOB_SUBMISSION: + /* if (LRMS_STATE(old_state)) res = RET_LATE; */ + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + if (LRMS_STATE(old_state)) { + js->pub.stateEnterTimes[1 + EDG_WLL_JOB_SCHEDULED] = + e->any.timestamp.tv_sec; + res = RET_LATE; + } + new_state = EDG_WLL_JOB_SCHEDULED; + lm_favour_lrms = 1; + break; + default: + goto bad_event; break; + } + } else if (e->transfer.result == EDG_WLL_TRANSFER_FAIL) { + /* transfer failed */ + switch (e->transfer.source) { + case EDG_WLL_SOURCE_USER_INTERFACE: + new_state = EDG_WLL_JOB_SUBMITTED; break; + case EDG_WLL_SOURCE_JOB_SUBMISSION: + if (LRMS_STATE(old_state)) res = RET_LATE; + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + if (LRMS_STATE(old_state)) res = RET_LATE; + new_state = EDG_WLL_JOB_READY; break; + default: + goto bad_event; break; + } + } else { + /* e->transfer.result == EDG_WLL_TRANSFER_START */ + res = RET_IGNORE; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + rep(js->pub.reason, e->transfer.reason); + + free(js->pub.location); + if (e->transfer.result == EDG_WLL_TRANSFER_OK) { + js->pub.location = location_string( + edg_wll_SourceToString(e->transfer.destination), + e->transfer.dest_host, + e->transfer.dest_instance); + } else { + js->pub.location = location_string( + edg_wll_SourceToString(e->transfer.source), + e->transfer.host, + e->transfer.src_instance); + } + } + if (USABLE_DATA(res, strict)) { + switch (e->transfer.source) { + case EDG_WLL_SOURCE_USER_INTERFACE: + rep(js->pub.jdl, e->transfer.job); break; + case EDG_WLL_SOURCE_JOB_SUBMISSION: + rep(js->pub.condor_jdl, e->transfer.job); break; + case EDG_WLL_SOURCE_LOG_MONITOR: + rep(js->pub.rsl, e->transfer.job); break; + default: + goto bad_event; break; + + } + } + break; + case EDG_WLL_EVENT_ACCEPTED: + switch (e->accepted.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + if (LRMS_STATE(old_state)) res = RET_LATE; + new_state = EDG_WLL_JOB_READY; + lm_favour_lrms = 1; + break; + case EDG_WLL_SOURCE_LRMS: + new_state = EDG_WLL_JOB_SCHEDULED; break; + default: + goto bad_event; break; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(e->accepted.source), + e->accepted.host, + e->accepted.src_instance); + } + if (USABLE_DATA(res, strict)) { + switch (e->accepted.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + break; /* no WM id */ + case EDG_WLL_SOURCE_LOG_MONITOR: + rep(js->pub.condorId, e->accepted.local_jobid); break; + case EDG_WLL_SOURCE_LRMS: + /* XXX localId */ + rep(js->pub.globusId, e->accepted.local_jobid); break; + default: + goto bad_event; break; + } + } + break; + case EDG_WLL_EVENT_REFUSED: + switch (e->refused.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + new_state = EDG_WLL_JOB_SUBMITTED; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LRMS: + new_state = EDG_WLL_JOB_READY; break; + default: + goto bad_event; break; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + rep(js->pub.reason, e->refused.reason); + + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(e->refused.from), + e->refused.from_host, + e->refused.from_instance); + } + break; + case EDG_WLL_EVENT_ENQUEUED: + if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) { + switch (e->enQueued.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_WORKLOAD_MANAGER: + if (LRMS_STATE(old_state)) res = RET_LATE; + update_branch_state(e->any.seqcode, NULL, + NULL, e->enQueued.job, &js->branch_states); + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + new_state = EDG_WLL_JOB_WAITING; break; + default: + goto bad_event; break; + } + } else if (e->enQueued.result == EDG_WLL_ENQUEUED_FAIL) { + switch (e->enQueued.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_WORKLOAD_MANAGER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + new_state = old_state; break; + default: + goto bad_event; break; + } + } else { + /* e->enQueued.result == EDG_WLL_ENQUEUED_START */ + res = RET_IGNORE; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + rep(js->pub.reason, e->enQueued.reason); + + free(js->pub.location); + if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) { + js->pub.location = location_string( + e->enQueued.queue, + e->enQueued.host, + e->enQueued.src_instance); + if (e->enQueued.source == EDG_WLL_SOURCE_LOG_MONITOR) + js->pub.resubmitted = 1; + } else { + js->pub.location = location_string( + edg_wll_SourceToString(e->enQueued.source), + e->enQueued.host, + e->enQueued.src_instance); + } + } + if (USABLE_DATA(res, strict)) { + switch (e->enQueued.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + rep(js->pub.jdl, e->enQueued.job); break; + case EDG_WLL_SOURCE_WORKLOAD_MANAGER: + if (USABLE_BRANCH(res)) { + rep(js->pub.matched_jdl, e->enQueued.job); + } + break; + case EDG_WLL_SOURCE_LOG_MONITOR: + /* no interim JDL here */ + break; + default: + goto bad_event; break; + } + } + break; + case EDG_WLL_EVENT_DEQUEUED: + switch (e->deQueued.source) { + case EDG_WLL_SOURCE_WORKLOAD_MANAGER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_JOB_SUBMISSION: + if (LRMS_STATE(old_state)) res = RET_LATE; + new_state = EDG_WLL_JOB_READY; break; + default: + goto bad_event; break; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(e->deQueued.source), + e->deQueued.host, + e->deQueued.src_instance); + } + if (USABLE_DATA(res, strict)) { + /* no WM/JSS local jobid */ + } + break; + case EDG_WLL_EVENT_HELPERCALL: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_WAITING; + free(js->pub.location); + js->pub.location = location_string( + e->helperCall.helper_name, + e->helperCall.host, + e->helperCall.src_instance); + /* roles and params used only for debugging */ + } + break; + case EDG_WLL_EVENT_HELPERRETURN: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_WAITING; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), + e->helperReturn.host, + e->helperReturn.src_instance); + /* roles and retvals used only for debugging */ + } + break; + case EDG_WLL_EVENT_RUNNING: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_RUNNING; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS), + "worknode", + e->running.node); + } + if (USABLE_DATA(res, strict)) { + if (USABLE_BRANCH(fine_res)) { + rep(js->pub.ce_node, e->running.node); + } + /* why? if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { */ + if (e->running.node) { + update_branch_state(e->any.seqcode, NULL, + e->running.node, NULL, &js->branch_states); + add_stringlist(&js->pub.possible_ce_nodes, + e->running.node); + } + /* } */ + } + break; + case EDG_WLL_EVENT_REALLYRUNNING: + /* consistence check -- should not receive two contradicting ReallyRunning's within single + deep resub cycle */ + if (fine_res == RET_BADBRANCH) { + syslog(LOG_ERR,"ReallyRunning on bad branch %s", + e->any.source == EDG_WLL_SOURCE_LOG_MONITOR ? e->reallyRunning.wn_seq : e->any.seqcode); + break; + } + /* select the branch unless TOOOLD, i.e. before deep resubmission */ + if (!(res == RET_LATE && fine_res == RET_TOOOLD)) { + if (e->any.source == EDG_WLL_SOURCE_LRMS) { + rep(js->branch_tag_seqcode, e->any.seqcode); + if (res == RET_OK) { + rep(js->last_branch_seqcode, e->any.seqcode); + js->pub.state = EDG_WLL_JOB_RUNNING; + } + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + if (!js->branch_tag_seqcode) { + if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { + rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); + } else + goto bad_event; + } + if (!js->last_branch_seqcode) { + if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { + if (res == RET_OK) { + rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); + js->pub.state = EDG_WLL_JOB_RUNNING; + } + } else + goto bad_event; + } + } + + /* XXX: best effort -- if we are lucky, ReallyRunning is on the last shallow cycle, + so we take in account events processed so far */ + if (res == RET_LATE && !js->last_branch_seqcode) { + if (same_branch(js->last_seqcode,js->branch_tag_seqcode)) + rep(js->last_branch_seqcode,js->last_seqcode); + } + + js->pub.payload_running = 1; + load_branch_state(js); + } +#if 0 + if (USABLE_DATA(res, strict)) { + js->pub.state = EDG_WLL_JOB_RUNNING; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS), + "worknode", + e->running.node); + js->pub.payload_running = 1; + if (e->any.source == EDG_WLL_SOURCE_LRMS) { + rep(js->branch_tag_seqcode, e->any.seqcode); + rep(js->last_branch_seqcode, e->any.seqcode); + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + if (!js->branch_tag_seqcode) { + if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { + rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); + } else + goto bad_event; + } + if (!js->last_branch_seqcode) { + if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { + rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); + } else + goto bad_event; + } + } + load_branch_state(js); + } +#endif + break; + case EDG_WLL_EVENT_SUSPEND: + if (USABLE(res, strict)) { + if (js->pub.state == EDG_WLL_JOB_RUNNING) { + js->pub.suspended = 1; + rep(js->pub.suspend_reason, e->suspend.reason); + } + } + break; + case EDG_WLL_EVENT_RESUME: + if (USABLE(res, strict)) { + if (js->pub.state == EDG_WLL_JOB_RUNNING) { + js->pub.suspended = 0; + rep(js->pub.suspend_reason, e->resume.reason); + } + } + break; + case EDG_WLL_EVENT_RESUBMISSION: + if (USABLE(res, strict)) { + if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { + rep(js->pub.reason, e->resubmission.reason); + } + } + if (USABLE_DATA(res, strict)) { + if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { + js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB; + } + else + if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB && + e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) { + reset_branch(js, e); + } + else + if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) { + js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW; + // deep resubmit stays forever deadline for events + // rep(js->deep_resubmit_seqcode, NULL); + } + } + break; + case EDG_WLL_EVENT_DONE: + if (e->any.source == EDG_WLL_SOURCE_LRMS) { + /* Done from JobWrapper is not sufficient for transition + * to DONE state according its current definition */ + break; + } + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_DONE; + rep(js->pub.reason, e->done.reason); + if (fine_res == RET_GOODBRANCH) { + js->pub.payload_running = 0; + } + switch (e->done.status_code) { + case EDG_WLL_DONE_CANCELLED: + js->pub.state = EDG_WLL_JOB_CANCELLED; + case EDG_WLL_DONE_OK: + rep(js->pub.location, "none"); break; + default: + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(e->done.source), + e->done.host, + e->done.src_instance); + } + } + if (USABLE_DATA(res, strict)) { + switch (e->done.status_code) { + case EDG_WLL_DONE_OK: + js->pub.exit_code = e->done.exit_code; + js->pub.done_code = EDG_WLL_STAT_OK; break; + case EDG_WLL_DONE_CANCELLED: + js->pub.exit_code = 0; + js->pub.done_code = EDG_WLL_STAT_CANCELLED; break; + case EDG_WLL_DONE_FAILED: + js->pub.exit_code = 0; + js->pub.done_code = EDG_WLL_STAT_FAILED; break; + default: + goto bad_event; break; + } + } + break; + case EDG_WLL_EVENT_CANCEL: + if (fine_res != RET_BADBRANCH) { + if (js->last_cancel_seqcode != NULL && + edg_wll_compare_seq(e->any.seqcode, js->last_cancel_seqcode) < 0) { + res = RET_LATE; + } + } + else { + res = RET_LATE; + } + if (USABLE(res, strict)) { + switch (e->cancel.status_code) { + case EDG_WLL_CANCEL_REQ: + js->pub.cancelling = 1; break; + case EDG_WLL_CANCEL_DONE: + js->pub.state = EDG_WLL_JOB_CANCELLED; + js->pub.remove_from_proxy = 1; + rep(js->pub.reason, e->cancel.reason); + rep(js->last_seqcode, e->any.seqcode); + rep(js->pub.location, "none"); + /* fall though */ + case EDG_WLL_CANCEL_ABORT: + js->pub.cancelling = 0; break; + default: + /* do nothing */ + break; + + } + } + if (USABLE_DATA(res, strict)) { + rep(js->pub.cancelReason, e->cancel.reason); + } + break; + case EDG_WLL_EVENT_ABORT: + // XXX: accept Abort from WM in every case + // setting res make USABLE macro true (awful !!) + if (e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) res = RET_OK; + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_ABORTED; + js->pub.remove_from_proxy = 1; + rep(js->pub.reason, e->abort.reason); + rep(js->pub.location, "none"); + + reset_branch(js, e); + } + break; + + case EDG_WLL_EVENT_CLEAR: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_CLEARED; + js->pub.remove_from_proxy = 1; + rep(js->pub.location, "none"); + switch (e->clear.reason) { + case EDG_WLL_CLEAR_USER: + rep(js->pub.reason, "user retrieved output sandbox"); + break; + case EDG_WLL_CLEAR_TIMEOUT: + rep(js->pub.reason, "timed out, resource purge forced"); + break; + case EDG_WLL_CLEAR_NOOUTPUT: + rep(js->pub.reason, "no output was generated"); + break; + default: + goto bad_event; break; + + } + } + break; + case EDG_WLL_EVENT_PURGE: + /* ignore, meta-information only */ + break; + case EDG_WLL_EVENT_MATCH: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_WAITING; + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), + e->match.host, + e->match.src_instance); + } + if (USABLE_DATA(res, strict)) { + if (USABLE_BRANCH(fine_res)) { + rep(js->pub.destination, e->match.dest_id); + } + if (e->match.dest_id) { + update_branch_state(e->any.seqcode, e->match.dest_id, + NULL, NULL, &js->branch_states); + add_stringlist(&js->pub.possible_destinations, + e->match.dest_id); + } + } + break; + case EDG_WLL_EVENT_PENDING: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_WAITING; + rep(js->pub.reason, e->pending.reason); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), + e->match.host, + e->match.src_instance); + } + break; + case EDG_WLL_EVENT_REGJOB: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + } + if (USABLE_DATA(res, strict)) { + rep_cond(js->pub.jdl, e->regJob.jdl); + edg_wlc_JobIdFree(js->pub.parent_job); + edg_wlc_JobIdDup(e->regJob.parent, + &js->pub.parent_job); + rep(js->pub.network_server, e->regJob.ns); + js->pub.children_num = e->regJob.nsubjobs; + switch (e->regJob.jobtype) { + case EDG_WLL_REGJOB_DAG: + case EDG_WLL_REGJOB_PARTITIONED: + js->pub.jobtype = EDG_WLL_STAT_DAG; + js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num; + break; + case EDG_WLL_REGJOB_COLLECTION: + js->pub.jobtype = EDG_WLL_STAT_COLLECTION; + js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num; + break; + default: + break; + } + rep(js->pub.seed, e->regJob.seed); + } + break; + case EDG_WLL_EVENT_USERTAG: + if (USABLE_DATA(res, strict)) { + if (e->userTag.name != NULL && e->userTag.value != NULL) { + add_taglist(&js->pub.user_tags, + e->userTag.name, e->userTag.value); + } else { + goto bad_event; + } + } + break; + case EDG_WLL_EVENT_LISTENER: + /* ignore, listener port is not part of job status */ + break; + case EDG_WLL_EVENT_CURDESCR: + case EDG_WLL_EVENT_CHKPT: + case EDG_WLL_EVENT_CHANGEACL: + /* ignore, only for event log */ + break; + case EDG_WLL_EVENT_COLLECTIONSTATE: + new_state = edg_wll_StringToStat(e->collectionState.state); + if (USABLE(res, strict)) { + js->pub.state = new_state; + if (new_state == EDG_WLL_JOB_DONE) + js->pub.done_code = e->collectionState.done_code; + } + break; + default: + goto bad_event; + break; + } + + if (USABLE(res,strict)) { + js->pub.lastUpdateTime = e->any.timestamp; + if (old_state != js->pub.state) { + js->pub.stateEnterTime = js->pub.lastUpdateTime; + js->pub.stateEnterTimes[1 + js->pub.state] + = (int)js->pub.lastUpdateTime.tv_sec; + } + if (e->any.type == EDG_WLL_EVENT_CANCEL) { + rep(js->last_cancel_seqcode, e->any.seqcode); + } else { + +/* the first set of LM events (Accept, Transfer/XX -> LRMS) + should not should shift the state (to Ready, Scheduled) but NOT to + update js->last_seqcode completely, in order not to block following + LRMS events which are likely to arrive later but should still affect + job state (as there may be no more LM events due to the Condor bug). + However, don't ignore the incoming seqcode completely, to catch up + with possibly delayed WM/JSS events */ + + if (lm_favour_lrms) { + free(js->last_seqcode); + js->last_seqcode = set_component_seqcode(e->any.seqcode,EDG_WLL_SOURCE_LOG_MONITOR,0); + } + else rep(js->last_seqcode, e->any.seqcode); + } + + if (js->pub.state != EDG_WLL_JOB_RUNNING) { + js->pub.suspended = 0; + rep(js->pub.suspend_reason, NULL); + } + + if (fine_res == RET_GOODBRANCH) { + rep(js->last_branch_seqcode, e->any.seqcode); + } + } + + if (USABLE_DATA(res,strict)) { + if (e->any.source == EDG_WLL_SOURCE_NETWORK_SERVER && + js->pub.network_server == NULL) { + char *inst; + inst = e->any.src_instance; + trio_asprintf(&js->pub.network_server, "%s%s%s", + e->any.host, + inst != NULL ? ":" : " ", + inst != NULL ? inst : ""); + } + } + + return res; + +bad_event: + badEvent(js,e,ev_seq); + return RET_SUSPECT; +} + +int add_stringlist(char ***lptr, const char *new_item) +{ + char **itptr; + int i; + + if (*lptr == NULL) { + itptr = (char **) malloc(2*sizeof(char *)); + itptr[0] = strdup(new_item); + itptr[1] = NULL; + *lptr = itptr; + return 1; + } else { + for (i = 0, itptr = *lptr; itptr[i] != NULL; i++); + itptr = (char **) realloc(*lptr, (i+2)*sizeof(char *)); + if (itptr != NULL) { + itptr[i] = strdup(new_item); + itptr[i+1] = NULL; + *lptr = itptr; + return 1; + } else { + return 0; + } + } +} + +void destroy_intJobStat_extension(intJobStat *p) +{ + if (p->last_seqcode) free(p->last_seqcode); + if (p->last_cancel_seqcode) free(p->last_cancel_seqcode); + if (p->branch_tag_seqcode) free(p->branch_tag_seqcode); + if (p->last_branch_seqcode) free(p->last_branch_seqcode); + if (p->deep_resubmit_seqcode) free(p->deep_resubmit_seqcode); + free_branch_state(&p->branch_states); + memset(p,0,sizeof(*p)); +} + +void destroy_intJobStat(intJobStat *p) +{ + edg_wll_FreeStatus(&p->pub); + destroy_intJobStat_extension(p); + memset(p, 0, sizeof(intJobStat)); +} + +void init_intJobStat(intJobStat *p) +{ + memset(p, 0, sizeof(intJobStat)); + p->pub.jobtype = -1 /* why? EDG_WLL_STAT_SIMPLE */; + p->pub.children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int)); + p->pub.children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES; + p->pub.stateEnterTimes = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int)); + p->pub.stateEnterTimes[0] = EDG_WLL_NUMBER_OF_STATCODES; + /* TBD: generate */ +} + diff --git a/org.glite.lb.state-machine/src/process_event_condor.c b/org.glite.lb.state-machine/src/process_event_condor.c new file mode 100644 index 0000000..bdd63f4 --- /dev/null +++ b/org.glite.lb.state-machine/src/process_event_condor.c @@ -0,0 +1,207 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include + +#include "glite/lb/context-int.h" + +#include "intjobstat.h" +#include "seqcode_aux.h" + +/* TBD: share in whole logging or workload */ +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +// XXX: maybe not needed any more +// if not, remove also last_condor_event_timestamp from intJobStat +static int compare_timestamps(struct timeval a, struct timeval b) +{ + if ( (a.tv_sec > b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1; + if ( (a.tv_sec < b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1; + return 0; +} + + +// XXX move this defines into some common place to be reusable +#define USABLE(res) ((res) == RET_OK) +#define USABLE_DATA(res) (1) +#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } +#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } } + +int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) +{ + edg_wll_JobStatCode old_state = js->pub.state; + int res = RET_OK; + + + if ((js->last_seqcode != NULL) && + (edg_wll_compare_condor_seq(js->last_seqcode, e->any.seqcode) > 0) ) { + res = RET_LATE; + } + + switch (e->any.type) { + case EDG_WLL_EVENT_REGJOB: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + rep(js->pub.condor_status, "Idle"); + } + if (USABLE_DATA(res)) { + rep(js->pub.jdl, e->regJob.jdl); + } + break; + case EDG_WLL_EVENT_CONDORMATCH: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_READY; + rep(js->pub.condor_status, "Idle"); + } + if (USABLE_DATA(res)) { + rep_cond(js->pub.condor_dest_host,e->CondorMatch.dest_host); + } + break; + case EDG_WLL_EVENT_CONDORREJECT: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_ABORTED; + rep(js->pub.condor_status, "Unexpanded"); + } + if (USABLE_DATA(res)) { + switch(e->CondorReject.status_code) { + case EDG_WLL_CONDORREJECT_NOMATCH: + rep(js->pub.condor_reason,"No match found."); + break; + case EDG_WLL_CONDORREJECT_OTHER: + default: + break; + } + } + break; + case EDG_WLL_EVENT_CONDORSHADOWSTARTED: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_READY; + rep(js->pub.condor_status, "Idle"); + } + if (USABLE_DATA(res)) { + switch (get_condor_event_source(e->any.seqcode)) { + case EDG_WLL_CONDOR_EVENT_SOURCE_SCHED: + js->pub.condor_shadow_pid = e->CondorShadowStarted.shadow_pid; + break; + default: + break; + } + } + break; + case EDG_WLL_EVENT_CONDORSHADOWEXITED: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_DONE; + rep(js->pub.condor_status, "Completed"); + } + if (USABLE_DATA(res)) { + switch (get_condor_event_source(e->any.seqcode)) { + case EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW: + js->pub.condor_shadow_exit_status = e->CondorShadowExited.shadow_exit_status; + break; + default: + break; + } + } + break; + case EDG_WLL_EVENT_CONDORSTARTERSTARTED: + if (USABLE(res)) { + switch (get_condor_event_source(e->any.seqcode)) { + case EDG_WLL_CONDOR_EVENT_SOURCE_START: + js->pub.state = EDG_WLL_JOB_SCHEDULED; + rep(js->pub.condor_status, "Idle"); + break; + case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER: + js->pub.state = EDG_WLL_JOB_RUNNING; + rep(js->pub.condor_status, "Running"); + break; + default: + break; + } + } + if (USABLE_DATA(res)) { + switch (get_condor_event_source(e->any.seqcode)) { + case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER: + rep(js->pub.condor_universe, e->CondorStarterStarted.universe); + js->pub.condor_starter_pid = e->CondorStarterStarted.starter_pid; + break; + default: + break; + } + } + break; + case EDG_WLL_EVENT_CONDORSTARTEREXITED: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_DONE; + rep(js->pub.condor_status, "Completed"); + } + if (USABLE_DATA(res)) { + switch (get_condor_event_source(e->any.seqcode)) { + case EDG_WLL_CONDOR_EVENT_SOURCE_START: + js->pub.condor_starter_pid = e->CondorStarterExited.starter_pid; + js->pub.condor_starter_exit_status = e->CondorStarterExited.starter_exit_status; + break; + case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER: + js->pub.condor_starter_pid = e->CondorStarterExited.starter_pid; + js->pub.condor_job_pid = e->CondorStarterExited.job_pid; + js->pub.condor_job_exit_status = e->CondorStarterExited.job_exit_status; + break; + default: + break; + } + } + break; + case EDG_WLL_EVENT_CONDORRESOURCEUSAGE: + if (USABLE(res)) { + } + if (USABLE_DATA(res)) { + } + break; + case EDG_WLL_EVENT_CONDORERROR: + if (USABLE(res)) { + } + if (USABLE_DATA(res)) { + } + break; + + default: + break; + } + +/* XXX : just debug output - remove */ + + printf("processEvent_Condor(): %s (%s), state: %s --> %s\n ", + edg_wll_EventToString(e->any.type), + (res == RET_LATE) ? "RET_LATE" : "RET_OK", + edg_wll_StatToString(old_state), + edg_wll_StatToString(js->pub.state) ); + printf("\t%s\n",e->any.seqcode); + printf("\t(last=%s)\n",js->last_seqcode); + +/*----------------------------------*/ + + if (USABLE(res)) { + rep(js->last_seqcode, e->any.seqcode); + + js->pub.lastUpdateTime = e->any.timestamp; + if (old_state != js->pub.state) { + js->pub.stateEnterTime = js->pub.lastUpdateTime; + js->pub.stateEnterTimes[1 + js->pub.state] + = (int)js->pub.lastUpdateTime.tv_sec; + } + } + if (! js->pub.location) js->pub.location = strdup("this is CONDOR"); + + + return RET_OK; +} + diff --git a/org.glite.lb.state-machine/src/process_event_pbs.c b/org.glite.lb.state-machine/src/process_event_pbs.c new file mode 100644 index 0000000..842c5d4 --- /dev/null +++ b/org.glite.lb.state-machine/src/process_event_pbs.c @@ -0,0 +1,230 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include + +#include "glite/lb/context-int.h" + +#include "glite/lbu/trio.h" + +#include "intjobstat.h" +#include "seqcode_aux.h" + +/* TBD: share in whole logging or workload */ +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +// XXX: maybe not needed any more +// if not, remove also last_pbs_event_timestamp from intJobStat +static int compare_timestamps(struct timeval a, struct timeval b) +{ + if ( (a.tv_sec > b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1; + if ( (a.tv_sec < b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1; + return 0; +} + + +// XXX move this defines into some common place to be reusable +#define USABLE(res) ((res) == RET_OK) +#define USABLE_DATA(res) (1) +#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } +#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } } + +int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) +{ + edg_wll_JobStatCode old_state = js->pub.state; + int res = RET_OK; + + + if ((js->last_seqcode != NULL) && + (edg_wll_compare_pbs_seq(js->last_seqcode, e->any.seqcode) > 0) ) { + res = RET_LATE; + } + + switch (e->any.type) { + case EDG_WLL_EVENT_REGJOB: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + rep(js->pub.pbs_state, "Q"); + } + if (USABLE_DATA(res)) { + } + break; + case EDG_WLL_EVENT_PBSQUEUED: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_WAITING; + rep(js->pub.pbs_state, "Q"); + } + if (USABLE_DATA(res)) { + if (!js->pub.pbs_queue) + js->pub.pbs_queue = strdup(e->PBSQueued.queue); + assert(!strcmp(js->pub.pbs_queue, e->PBSQueued.queue)); + rep_cond(js->pub.pbs_owner,e->PBSQueued.owner); + rep_cond(js->pub.pbs_name,e->PBSQueued.name); + } + break; + case EDG_WLL_EVENT_PBSMATCH: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_READY; + rep(js->pub.pbs_state, "Q"); + } + if (USABLE_DATA(res)) { + rep_cond(js->pub.pbs_dest_host,e->PBSMatch.dest_host); + } + break; + case EDG_WLL_EVENT_PBSPENDING: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_WAITING; + rep(js->pub.pbs_state, "Q"); + js->pbs_reruning = 0; // reset possible reruning flag + } + if (USABLE_DATA(res)) { + rep_cond(js->pub.pbs_reason,e->PBSPending.reason); + } + break; + case EDG_WLL_EVENT_PBSRUN: + if (USABLE(res)) { + switch (get_pbs_event_source(e->any.seqcode)) { + case EDG_WLL_PBS_EVENT_SOURCE_SERVER: + js->pub.state = EDG_WLL_JOB_SCHEDULED; + rep(js->pub.pbs_state, "Q"); + break; + case EDG_WLL_PBS_EVENT_SOURCE_MOM: + js->pub.state = EDG_WLL_JOB_RUNNING; + rep(js->pub.pbs_state, "R"); + break; + default: + assert(0); // running event from strange source + break; + } + } + if (USABLE_DATA(res)) { + rep_cond(js->pub.pbs_scheduler, e->PBSRun.scheduler); + rep_cond(js->pub.pbs_dest_host, e->PBSRun.dest_host); + js->pub.pbs_pid = e->PBSRun.pid; + } + break; + case EDG_WLL_EVENT_PBSRERUN: + if (USABLE(res)) { + switch (get_pbs_event_source(e->any.seqcode)) { + case EDG_WLL_PBS_EVENT_SOURCE_SERVER: + js->pub.state = EDG_WLL_JOB_WAITING; + rep(js->pub.pbs_state, "Q"); + break; + case EDG_WLL_PBS_EVENT_SOURCE_MOM: + js->pub.state = EDG_WLL_JOB_WAITING; + rep(js->pub.pbs_state, "E"); + js->pbs_reruning = 1; + break; + default: + assert(0); // running event from strande source + break; + } + } + if (USABLE_DATA(res)) { + } + break; + case EDG_WLL_EVENT_PBSDONE: + if (USABLE(res)) { + switch (get_pbs_event_source(e->any.seqcode)) { + case EDG_WLL_PBS_EVENT_SOURCE_SERVER: + js->pub.state = EDG_WLL_JOB_DONE; + js->pub.done_code = EDG_WLL_STAT_OK; + rep(js->pub.pbs_state, "C"); + break; + case EDG_WLL_PBS_EVENT_SOURCE_MOM: + if (!js->pbs_reruning) { + js->pub.state = EDG_WLL_JOB_DONE; + js->pub.done_code = EDG_WLL_STAT_OK; + rep(js->pub.pbs_state, "C"); + } + break; + default: + assert(0); //done event from strange source + break; + } + } + if (USABLE_DATA(res)) { + js->pub.pbs_exit_status = e->PBSDone.exit_status; + } + break; + case EDG_WLL_EVENT_PBSRESOURCEUSAGE: + if (USABLE(res)) { + // signalize state done, done_code uknown + js->pub.state = EDG_WLL_JOB_DONE; + rep(js->pub.pbs_state, "C"); + } + if (USABLE_DATA(res)) { + char *new_resource_usage; + + trio_asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]", + (js->pub.pbs_resource_usage) ? js->pub.pbs_resource_usage : "", + (js->pub.pbs_resource_usage) ? "\n": "", + e->PBSResourceUsage.name, + e->PBSResourceUsage.quantity, + e->PBSResourceUsage.unit); + + if (js->pub.pbs_resource_usage) free(js->pub.pbs_resource_usage); + js->pub.pbs_resource_usage = new_resource_usage; + } + break; + case EDG_WLL_EVENT_PBSERROR: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_DONE; + js->pub.done_code = EDG_WLL_STAT_FAILED; + rep(js->pub.pbs_state, "C"); + } + if (USABLE_DATA(res)) { + char *new_error_desc; + + trio_asprintf(&new_error_desc,"%s%s\t%s", + (js->pub.pbs_error_desc) ? js->pub.pbs_error_desc : "", + (js->pub.pbs_error_desc) ? "\n" : "", + e->PBSError.error_desc); + + if (js->pub.pbs_error_desc) free(js->pub.pbs_error_desc); + js->pub.pbs_error_desc = new_error_desc; + } + break; + + default: + break; + } + +/* XXX : just debug output - remove */ + + printf("processEvent_PBS(): %s (%s), state: %s --> %s\n ", + edg_wll_EventToString(e->any.type), + (res == RET_LATE) ? "RET_LATE" : "RET_OK", + edg_wll_StatToString(old_state), + edg_wll_StatToString(js->pub.state) ); + printf("\t%s\n",e->any.seqcode); + printf("\t(last=%s)\n",js->last_seqcode); + +/*----------------------------------*/ + + if (USABLE(res)) { + rep(js->last_seqcode, e->any.seqcode); + + js->pub.lastUpdateTime = e->any.timestamp; + if (old_state != js->pub.state) { + js->pub.stateEnterTime = js->pub.lastUpdateTime; + js->pub.stateEnterTimes[1 + js->pub.state] + = (int)js->pub.lastUpdateTime.tv_sec; + } + } + if (! js->pub.location) js->pub.location = strdup("this is PBS"); + + + return RET_OK; +} + diff --git a/org.glite.lb.state-machine/src/seqcode_aux.c b/org.glite.lb.state-machine/src/seqcode_aux.c new file mode 100644 index 0000000..f1d9b90 --- /dev/null +++ b/org.glite.lb.state-machine/src/seqcode_aux.c @@ -0,0 +1,281 @@ +#ident "$Header$" + +#include +#include +#include +#include + +#include "glite/lbu/trio.h" +#include "glite/lb/context-int.h" + +#include "intjobstat.h" +#include "seqcode_aux.h" + + +/* +#include +#include +#include +#include +#include +#include +#include +#include + +#include "glite/jobid/cjobid.h" +#include "glite/lbu/trio.h" +#include "glite/lbu/db.h" +#include "glite/lb/context-int.h" + +#include "store.h" +#include "index.h" +#include "jobstat.h" +#include "get_events.h" +*/ + +int component_seqcode(const char *a, edg_wll_Source index) +{ + unsigned int c[EDG_WLL_SOURCE__LAST]; + int res; + char sc[EDG_WLL_SEQ_SIZE]; + + if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a); + else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a); + + res = sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d", + &c[EDG_WLL_SOURCE_USER_INTERFACE], + &c[EDG_WLL_SOURCE_NETWORK_SERVER], + &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + &c[EDG_WLL_SOURCE_BIG_HELPER], + &c[EDG_WLL_SOURCE_JOB_SUBMISSION], + &c[EDG_WLL_SOURCE_LOG_MONITOR], + &c[EDG_WLL_SOURCE_LRMS], + &c[EDG_WLL_SOURCE_APPLICATION], + &c[EDG_WLL_SOURCE_LB_SERVER]); + if (res != EDG_WLL_SOURCE__LAST-1) { +/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sc); */ + fprintf(stderr, "unparsable sequence code %s\n", sc); + return -1; + } + + return(c[index]); +} + +char * set_component_seqcode(char *a,edg_wll_Source index,int val) +{ + unsigned int c[EDG_WLL_SOURCE__LAST]; + int res; + char *ret; + char sc[EDG_WLL_SEQ_SIZE]; + + if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a); + else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a); + + res = sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d", + &c[EDG_WLL_SOURCE_USER_INTERFACE], + &c[EDG_WLL_SOURCE_NETWORK_SERVER], + &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + &c[EDG_WLL_SOURCE_BIG_HELPER], + &c[EDG_WLL_SOURCE_JOB_SUBMISSION], + &c[EDG_WLL_SOURCE_LOG_MONITOR], + &c[EDG_WLL_SOURCE_LRMS], + &c[EDG_WLL_SOURCE_APPLICATION], + &c[EDG_WLL_SOURCE_LB_SERVER]); + if (res != EDG_WLL_SOURCE__LAST-1) { +/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sc); */ + fprintf(stderr, "unparsable sequence code %s\n", sc); + return NULL; + } + + c[index] = val; + trio_asprintf(&ret,"UI=%06d:NS=%010d:WM=%06d:BH=%010d:JSS=%06d" + ":LM=%06d:LRMS=%06d:APP=%06d:LBS=%06d", + c[EDG_WLL_SOURCE_USER_INTERFACE], + c[EDG_WLL_SOURCE_NETWORK_SERVER], + c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + c[EDG_WLL_SOURCE_BIG_HELPER], + c[EDG_WLL_SOURCE_JOB_SUBMISSION], + c[EDG_WLL_SOURCE_LOG_MONITOR], + c[EDG_WLL_SOURCE_LRMS], + c[EDG_WLL_SOURCE_APPLICATION], + c[EDG_WLL_SOURCE_LB_SERVER]); + return ret; +} + +int before_deep_resubmission(const char *a, const char *b) +{ + if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) < + component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ) + return(1); + else + return(0); + +} + +int same_branch(const char *a, const char *b) +{ + if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) == + component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ) + return(1); + else + return(0); +} + +int edg_wll_compare_pbs_seq(const char *a,const char *b) +{ + char timestamp_a[14], pos_a[10], src_a; + char timestamp_b[14], pos_b[10], src_b; + int ev_code_a, ev_code_b; + int res; + + res = sscanf(a,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_a, pos_a, &ev_code_a, &src_a); + + if (res != 4) { +/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", a); */ + fprintf(stderr, "unparsable sequence code %s\n", a); + return -1; + } + + res = sscanf(b,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_b, pos_b, &ev_code_b, &src_b); + + if (res != 4) { +/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", b); */ + fprintf(stderr, "unparsable sequence code %s\n", b); + return -1; + } + + /* wild card for PBSJobReg - this event should always come as firt one */ + /* bacause it hold job.type, which is necessary for further event processing */ + if (ev_code_a == EDG_WLL_EVENT_REGJOB) return -1; + if (ev_code_b == EDG_WLL_EVENT_REGJOB) return 1; + + /* sort event w.t.r. to timestamps */ + if ((res = strcmp(timestamp_a,timestamp_b)) != 0) { + return res; + } + else { + /* if timestamps equal, sort if w.t.r. to file position */ + /* if you both events come from the same log file */ + if (src_a == src_b) { + /* zero mean in fact duplicate events in log */ + return strcmp(pos_a,pos_b); + } + /* if the events come from diffrent log files */ + /* it is possible to prioritize some src log file */ + else { + /* prioritize events from pbs_mom */ + if (src_a == 'm') return 1; + if (src_b == 'm') return -1; + + /* then prioritize events from pbs_server */ + if (src_a == 's') return 1; + if (src_b == 's') return -1; + + /* other priorities comes here... */ + } + } + + return 0; +} + +edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) { + switch (pbs_seq_num[EDG_WLL_SEQ_PBS_SIZE - 2]) { + case 'c': return(EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER); + case 's': return(EDG_WLL_PBS_EVENT_SOURCE_SERVER); + case 'm': return(EDG_WLL_PBS_EVENT_SOURCE_MOM); + case 'a': return(EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING); + default: return(EDG_WLL_PBS_EVENT_SOURCE_UNDEF); + } +} + +edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) { + switch (condor_seq_num[EDG_WLL_SEQ_CONDOR_SIZE - 2]) { + case 'L': return(EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR); + case 'M': return(EDG_WLL_CONDOR_EVENT_SOURCE_MASTER); + case 'm': return(EDG_WLL_CONDOR_EVENT_SOURCE_MATCH); + case 'N': return(EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR); + case 'C': return(EDG_WLL_CONDOR_EVENT_SOURCE_SCHED); + case 'H': return(EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW); + case 's': return(EDG_WLL_CONDOR_EVENT_SOURCE_STARTER); + case 'S': return(EDG_WLL_CONDOR_EVENT_SOURCE_START); + case 'j': return(EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE); + default: return(EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF); + } +} + +int edg_wll_compare_seq(const char *a, const char *b) +{ + unsigned int c[EDG_WLL_SOURCE__LAST]; + unsigned int d[EDG_WLL_SOURCE__LAST]; + int res, i; + char sca[EDG_WLL_SEQ_SIZE], scb[EDG_WLL_SEQ_SIZE]; + + + if ( (strstr(a,"TIMESTAMP=") == a) && (strstr(b,"TIMESTAMP=") == b) ) + return edg_wll_compare_pbs_seq(a,b); + + if (!strstr(a, "LBS")) snprintf(sca,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a); + else snprintf(sca,EDG_WLL_SEQ_SIZE,"%s",a); + if (!strstr(b, "LBS")) snprintf(scb,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",b); + else snprintf(scb,EDG_WLL_SEQ_SIZE,"%s",b); + + assert(EDG_WLL_SOURCE__LAST == 10); + + res = sscanf(sca, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d", + &c[EDG_WLL_SOURCE_USER_INTERFACE], + &c[EDG_WLL_SOURCE_NETWORK_SERVER], + &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + &c[EDG_WLL_SOURCE_BIG_HELPER], + &c[EDG_WLL_SOURCE_JOB_SUBMISSION], + &c[EDG_WLL_SOURCE_LOG_MONITOR], + &c[EDG_WLL_SOURCE_LRMS], + &c[EDG_WLL_SOURCE_APPLICATION], + &c[EDG_WLL_SOURCE_LB_SERVER]); + if (res != EDG_WLL_SOURCE__LAST-1) { +/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sca); */ + fprintf(stderr, "unparsable sequence code %s\n", sca); + return -1; + } + + res = sscanf(scb, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d", + &d[EDG_WLL_SOURCE_USER_INTERFACE], + &d[EDG_WLL_SOURCE_NETWORK_SERVER], + &d[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + &d[EDG_WLL_SOURCE_BIG_HELPER], + &d[EDG_WLL_SOURCE_JOB_SUBMISSION], + &d[EDG_WLL_SOURCE_LOG_MONITOR], + &d[EDG_WLL_SOURCE_LRMS], + &d[EDG_WLL_SOURCE_APPLICATION], + &d[EDG_WLL_SOURCE_LB_SERVER]); + if (res != EDG_WLL_SOURCE__LAST-1) { +/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", scb); */ + fprintf(stderr, "unparsable sequence code %s\n", scb); + return 1; + } + + for (i = EDG_WLL_SOURCE_USER_INTERFACE ; i < EDG_WLL_SOURCE__LAST; i++) { + if (c[i] < d[i]) return -1; + if (c[i] > d[i]) return 1; + } + + return 0; +} + + +int compare_events_by_seq(const void *a, const void *b) +{ + const edg_wll_Event *e = (edg_wll_Event *) a; + const edg_wll_Event *f = (edg_wll_Event *) b; + int ret; + + + ret = edg_wll_compare_seq(e->any.seqcode, f->any.seqcode); + if (ret) return ret; + + if (e->any.timestamp.tv_sec < f->any.timestamp.tv_sec) return -1; + if (e->any.timestamp.tv_sec > f->any.timestamp.tv_sec) return 1; + if (e->any.timestamp.tv_usec < f->any.timestamp.tv_usec) return -1; + if (e->any.timestamp.tv_usec > f->any.timestamp.tv_usec) return 1; + return 0; +} + -- 1.8.2.3