'merge_transactions_src'.
Sprout from lb_transactions_only 2008-03-11 12:31:43 UTC Miloš Mulač <mulac@civ.zcu.cz> 'prevent job_reg coredumping if no credentials found'
Cherrypick from master 2008-02-29 15:36:01 UTC Jan Pospíšil <honik@ntc.zcu.cz> 'stage and use job_attrs.h':
org.glite.lb.state-machine/LICENSE
org.glite.lb.state-machine/Makefile
org.glite.lb.state-machine/interface/intjobstat.h
org.glite.lb.state-machine/interface/lb-job-attrs.xsd
org.glite.lb.state-machine/interface/lb-job-record.xsd
org.glite.lb.state-machine/interface/seqcode_aux.h
org.glite.lb.state-machine/src/job_attrs.xsl
org.glite.lb.state-machine/src/lb_plugin.c
org.glite.lb.state-machine/src/process_event.c
org.glite.lb.state-machine/src/process_event_condor.c
org.glite.lb.state-machine/src/process_event_pbs.c
org.glite.lb.state-machine/src/seqcode_aux.c
--- /dev/null
+LICENSE file for EGEE Middleware
+================================
+
+Copyright (c) 2004 on behalf of the EU EGEE Project:
+The European Organization for Nuclear Research (CERN),
+Istituto Nazionale di Fisica Nucleare (INFN), Italy
+Datamat Spa, Italy
+Centre National de la Recherche Scientifique (CNRS), France
+CS Systeme d'Information (CSSI), France
+Royal Institute of Technology, Center for Parallel Computers (KTH-PDC), Sweden
+Universiteit van Amsterdam (UvA), Netherlands
+University of Helsinki (UH.HIP), Finlan
+University of Bergen (UiB), Norway
+Council for the Central Laboratory of the Research Councils (CCLRC), United Kingdom
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+1. Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+2. Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+
+3. The end-user documentation included with the redistribution, if
+any, must include the following acknowledgment: "This product includes
+software developed by The EU EGEE Project (http://cern.ch/eu-egee/)."
+Alternatively, this acknowledgment may appear in the software itself, if
+and wherever such third-party acknowledgments normally appear.
+
+4. The names EGEE and the EU EGEE Project must not be
+used to endorse or promote products derived from this software without
+prior written permission. For written permission, please contact
+<email address>.
+
+5. You are under no obligation whatsoever to provide anyone with any
+bug fixes, patches, or upgrades to the features, functionality or
+performance of the Software ("Enhancements") that you may develop over
+time; however, if you choose to provide your Enhancements to The EU
+EGEE Project, or if you choose to otherwise publish or distribute your
+Enhancements, in source code form without contemporaneously requiring
+end users of The EU EGEE Proejct to enter into a separate written license
+agreement for such Enhancements, then you hereby grant The EU EGEE Project
+a non-exclusive, royalty-free perpetual license to install, use, copy,
+modify, prepare derivative works, incorporate into the EGEE Middleware
+or any other computer software, distribute, and sublicense your
+Enhancements or derivative works thereof, in binary and source code
+form (if any), whether developed by The EU EGEE Project or third parties.
+
+THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
+WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL PROJECT OR ITS CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
+IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+This software consists of voluntary contributions made by many
+individuals on behalf of the EU EGEE Prject. For more information on The
+EU EGEE Project, please see http://cern.ch/eu-egee/. For more information on
+EGEE Middleware, please see http://egee-jra1.web.cern.ch/egee-jra1/
+
+
--- /dev/null
+# defaults
+top_srcdir=.
+stagedir=.
+distdir=.
+globalprefix=glite
+lbprefix=lb
+package=glite-lb-state-machine
+version=0.2.0
+PREFIX=/opt/glite
+
+-include Makefile.inc
+
+VPATH = ../src:../interface
+
+default all: compile
+
+CC=gcc
+CXX=g++
+AT3=${stagedir}/sbin/glite-lb-at3
+
+SUFFIXES = .T
+
+DEBUG:=-g -O0 -Wall
+
+CFLAGS:= \
+ ${DEBUG} \
+ -I${classads_prefix}/include \
+ -I${stagedir}/include -I${top_srcdir}/src -I. \
+ -I${top_srcdir}/interface
+
+COMPILE:=libtool --mode=compile ${CC} ${CFLAGS}
+LINK:=libtool --mode=link ${CC} -rpath ${stagedir}/lib ${LDFLAGS}
+SOLINK:=libtool --mode=link ${CC} -module ${LDFLAGS} -rpath ${stagedir}/lib
+LINKXX:=libtool --mode=link ${CXX} ${LDFLAGS}
+INSTALL:=libtool --mode=install install
+LINKXX:=libtool --mode=link ${CXX} -rpath ${stagedir}/lib ${LDFLAGS}
+XSLTPROC:=xsltproc --novalid
+
+EXT_LIBS:=
+COMMON_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} -lglite_security_gss_${nothrflavour}
+PLUGIN_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}\
+ ${classadslib} -lstdc++ ${expatlib} -lexpat\
+
+PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo lb_plugin.lo
+MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o
+
+PLUGIN_LIB=glite_lb_plugin.la
+MACHINE_LIB=libglite_lb_statemachine.a
+
+default all: compile
+
+compile: ${MACHINE_LIB} ${PLUGIN_LIB}
+
+${PLUGIN_LIB}: ${PLUGIN_LOBJS}
+ ${SOLINK} -o $@ ${PLUGIN_LOBJS} ${PLUGIN_LIBS}
+
+${MACHINE_LIB}: ${MACHINE_OBJS}
+ ar crv $@ ${MACHINE_OBJS}
+ -ranlib $@
+
+
+doc:
+
+stage: compile
+ $(MAKE) install PREFIX=${stagedir}
+
+install:
+ -mkdir -p ${PREFIX}/lib ${PREFIX}/interface ${PREFIX}/include/glite/lb
+ install -m 644 ${MACHINE_LIB} ${PREFIX}/lib
+ ${INSTALL} -m 755 ${PLUGIN_LIB} ${PREFIX}/lib
+ install -m 644 ${top_srcdir}/interface/lb-job-attrs.xsd ${PREFIX}/interface
+ install -m 644 ${top_srcdir}/interface/lb-job-record.xsd ${PREFIX}/interface
+ install -m 644 ${top_srcdir}/interface/intjobstat.h ${PREFIX}/include/glite/lb
+ install -m 644 ${top_srcdir}/interface/seqcode_aux.h ${PREFIX}/include/glite/lb
+ install -m 644 job_attrs.h ${PREFIX}/include/glite/lb
+
+clean:
+ rm -rvf .libs *.o *.lo ${PLUGIN_LIB} ${MACHINE_LIB} job_attrs.h
+ rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/
+
+%.c: %.c.T
+ rm -f $@
+ ${AT3} $< >$@ || rm -f $@
+ chmod -w $@ >/dev/null
+
+%.cpp: %.cpp.T
+ rm -f $@
+ ${AT3} $< >$@ || rm -f $@
+ chmod -w $@ >/dev/null
+
+%.o %.lo: %.c
+ ${COMPILE} -c $<
+
+%.lo: %.c
+ ${COMPILE} -o $@ -c $<
+
+lb_plugin.lo: lb_plugin.c job_attrs.h
+ ${COMPILE} -DPLUGIN_DEBUG -o $@ -c $<
+
+job_attrs.h: lb-job-attrs.xsd job_attrs.xsl
+ ${XSLTPROC} ../src/job_attrs.xsl $< >$@
+
--- /dev/null
+#ifndef GLITE_LB_INTJOBSTAT_H
+#define GLITE_LB_INTJOBSTAT_H
+
+#ident "$Header$"
+
+#include "glite/lb/jobstat.h"
+
+/*
+ * Internal representation of job state
+ * (includes edg_wll_JobStat API structure)
+ */
+
+/* convention: revision X.XX - DESCRIPTION */
+/* where X.XX is version from indent + 1 (version after commit) */
+/* and DESCRIPTION is short hit why version changed */
+
+#define INTSTAT_VERSION "revision 1.31 - proxy merge"
+
+
+// Internal error codes
+
+#define RET_FAIL 0
+#define RET_OK 1
+#define RET_FATAL RET_FAIL
+#define RET_SOON 2
+#define RET_LATE 3
+#define RET_BADSEQ 4
+#define RET_SUSPECT 5
+#define RET_IGNORE 6
+#define RET_BADBRANCH 7
+#define RET_GOODBRANCH 8
+#define RET_TOOOLD 9
+#define RET_UNREG 10
+#define RET_INTERNAL 100
+
+
+// shallow resubmission container - holds state of each branch
+// (useful when state restore is needed after ReallyRunning event)
+//
+typedef struct _branch_state {
+ int branch;
+ char *destination;
+ char *ce_node;
+ char *jdl;
+ /*!! if adding new field, modify also free_branch_state() */
+} branch_state;
+
+
+typedef struct _intJobStat {
+ edg_wll_JobStat pub;
+ int resubmit_type;
+ char *last_seqcode;
+ char *last_cancel_seqcode;
+ char *branch_tag_seqcode;
+ char *last_branch_seqcode;
+ char *deep_resubmit_seqcode;
+ branch_state *branch_states; // branch zero terminated array
+
+ struct timeval last_pbs_event_timestamp;
+ int pbs_reruning; // true if rerun event arrived
+
+ /*!! if adding new field, modify also destroy_intJobStat_extension() */
+ } intJobStat;
+
+typedef enum _edg_wll_PBSEventSource {
+ EDG_WLL_PBS_EVENT_SOURCE_UNDEF = 0,
+ EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER,
+ EDG_WLL_PBS_EVENT_SOURCE_SERVER,
+ EDG_WLL_PBS_EVENT_SOURCE_MOM,
+ EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING,
+ EDG_WLL_PBS_EVENT_SOURCE__LAST
+} edg_wll_PBSEventSource;
+
+typedef enum _edg_wll_CondorEventSource {
+ EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF = 0,
+ EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR,
+ EDG_WLL_CONDOR_EVENT_SOURCE_MASTER,
+ EDG_WLL_CONDOR_EVENT_SOURCE_MATCH,
+ EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR,
+ EDG_WLL_CONDOR_EVENT_SOURCE_SCHED,
+ EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW,
+ EDG_WLL_CONDOR_EVENT_SOURCE_STARTER,
+ EDG_WLL_CONDOR_EVENT_SOURCE_START,
+ EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE,
+ EDG_WLL_CONDOR_EVENT_SOURCE__LAST
+} edg_wll_CondorEventSource;
+
+typedef enum _subjobClassCodes {
+ SUBJOB_CLASS_UNDEF = 0,
+ SUBJOB_CLASS_RUNNING,
+ SUBJOB_CLASS_DONE,
+ SUBJOB_CLASS_ABORTED,
+ SUBJOB_CLASS_CLEARED,
+ SUBJOB_CLASS_REST
+} subjobClassCodes;
+
+void destroy_intJobStat(intJobStat *);
+void destroy_intJobStat_extension(intJobStat *p);
+
+
+void init_intJobStat(intJobStat *p);
+
+
+#endif /* GLITE_LB_INTJOBSTAT_H */
--- /dev/null
+<?xml version="1.0"?>
+<!DOCTYPE schema PUBLIC "-//W3C//DTD XMLSCHEMA 200102//EN" "http://www.w3.org/2001/XMLSchema.dtd">
+
+<!-- $Header$ -->
+
+<xs:schema
+ xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:a="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ targetNamespace="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ version="1.0"
+ elementFormDefault="qualified"
+ attributeFormDefault="unqualified"
+>
+ <xs:simpleType name="statusType">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="SUBMITTED" />
+ <xs:enumeration value="WAITING" />
+ <xs:enumeration value="READY" />
+ <xs:enumeration value="SCHEDULED" />
+ <xs:enumeration value="RUNNING" />
+ <xs:enumeration value="DONE" />
+ <xs:enumeration value="CLEARED" />
+ <xs:enumeration value="ABORTED" />
+ <xs:enumeration value="CANCELLED" />
+ <xs:enumeration value="UNKNOWN" />
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:simpleType name="doneType">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="OK"/>
+ <xs:enumeration value="FAIL"/>
+ <xs:enumeration value="CANCEL"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:complexType name="historyStatusType">
+ <xs:complexContent>
+ <xs:restriction base="xs:anyType">
+ <xs:attribute name="name" type="a:statusType" use="required"/>
+ <xs:attribute name="timestamp" type="xs:dateTime" use="required"/>
+ <xs:attribute name="reason" type="xs:string" use="optional"/>
+ </xs:restriction>
+ </xs:complexContent>
+ </xs:complexType>
+
+ <xs:simpleType name="jobTypeType">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="SIMPLE"/>
+ <xs:enumeration value="DAG"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:complexType name="statusSequenceType">
+ <xs:sequence>
+ <xs:element name="status" type="a:historyStatusType" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="jobIdSequenceType">
+ <xs:sequence>
+ <xs:element name="jobId" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+
+
+ <xs:element name="jobId" type="xs:string" />
+ <xs:element name="LBserver" type="xs:string" />
+
+ <xs:element name="user" type="xs:string">
+ <xs:annotation> <xs:documentation>Job owner according to LB</xs:documentation> </xs:annotation>
+ </xs:element>
+
+ <!-- Parent jobId of DAG -->
+ <xs:element name="parent" type="xs:string" />
+
+ <!-- the following 6 elements have to be retrieved from JDL -->
+ <xs:element name="VO" type="xs:string" />
+ <xs:element name="aTag" type="xs:string" />
+ <xs:element name="rQType" type="xs:string" />
+ <xs:element name="eDuration" type="xs:duration" />
+ <xs:element name="eNodes" type="xs:int" />
+ <xs:element name="eProc" type="xs:int" />
+
+ <xs:element name="RB" type="xs:string" />
+ <xs:element name="CE" type="xs:string" />
+ <xs:element name="host" type="xs:string" /> <!-- worker node -->
+
+ <!-- Genevieve's spec has "site" and "country" here, but I've got no idea how to retrieve them -->
+ <xs:element name="UIHost" type="xs:string" />
+
+ <!-- not mandatory, currently LB hasn't got the info -->
+ <xs:element name="CPUTime" type="xs:duration" />
+ <xs:element name="NProc" type="xs:int" />
+
+ <!-- In LB the real final status is Cleared
+ However, Done, Aborted, or Cancelled should be reported here -->
+ <xs:element name="finalStatus" type="a:statusType" />
+ <xs:element name="finalStatusDate" type="xs:dateTime" />
+ <xs:element name="finalStatusReason" type="xs:string" />
+ <xs:element name="LRMSDoneStatus" type="a:doneType" />
+ <xs:element name="LRMSStatusReason" type="xs:string" />
+
+ <xs:element name="retryCount" type="xs:int" />
+ <xs:element name="additionalReason" type="xs:string" />
+
+ <xs:element name="jobType" type="a:jobTypeType" />
+ <xs:element name="nsubjobs" type="xs:int" />
+ <xs:element name="subjobs" type="a:jobIdSequenceType" />
+
+ <!-- timestamps of the state history of the last resubmission cycle,
+ i.e. it is guaranteed that each state apears here only once.
+ Cf. stateEnterTimes in LB JobStatus -->
+ <xs:element name="lastStatusHistory" type="a:statusSequenceType" />
+
+ <!-- timestamps of the whole state history, including all resubmission cycles -->
+ <xs:element name="fullStatusHistory" type="a:statusSequenceType" />
+
+ <xs:element name="JDL" type="xs:string" />
+
+<!-- No idea where to get these from:
+
+ ENVIRONMENT
+
+ testbed production, preproduction, specific
+ release middleware release (LCG, g-lite...)
+ version version of middleware
+ job_history_version in case of structure changes
+-->
+
+</xs:schema>
--- /dev/null
+<?xml version="1.0"?>
+<!DOCTYPE schema PUBLIC "-//W3C//DTD XMLSCHEMA 200102//EN" "http://www.w3.org/2001/XMLSchema.dtd">
+
+<!-- $Header$ -->
+
+<xs:schema
+ xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:jr="http://egee.cesnet.cz/en/Schema/LB/JobRecord"
+ xmlns:a="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ targetNamespace="http://egee.cesnet.cz/en/Schema/LB/JobRecord"
+ version="1.0"
+ elementFormDefault="qualified"
+ attributeFormDefault="unqualified"
+>
+
+ <xs:import
+ namespace="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ schemaLocation="http://egee.cesnet.cz/en/Schema/LB/Attributes"
+ />
+
+ <xs:complexType name="jobRecordType">
+ <xs:sequence>
+
+ <xs:element ref="a:jobId" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:user" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:parent" minOccurs="0" maxOccurs="1"/>
+
+ <!-- the following 6 elements have to be retrieved from JDL -->
+ <xs:element ref="a:VO" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:aTag" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:rQType" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:eDuration" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:eNodes" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:eProc" minOccurs="0" maxOccurs="1"/>
+
+ <xs:element ref="a:RB" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:CE" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:host" minOccurs="0" maxOccurs="1"/> <!-- worker node -->
+
+ <!-- Genevieve's spec has "site" and "country" here, but I've got no idea how to retrieve them -->
+ <xs:element ref="a:UIHost" minOccurs="0" maxOccurs="1"/>
+
+ <!-- not mandatory, currently LB hasn't got the info -->
+ <xs:element ref="a:CPUTime" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:NProc" minOccurs="0" maxOccurs="1"/>
+
+ <!-- In LB the real final status is Cleared
+ However, Done, Aborted, or Cancelled should be reported here -->
+ <xs:element ref="a:finalStatus" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:finalStatusDate" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:finalStatusReason" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:LRMSDoneStatus" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:LRMSStatusReason" minOccurs="0" maxOccurs="1"/>
+
+ <xs:element ref="a:retryCount" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:additionalReason" minOccurs="0" maxOccurs="1"/>
+
+ <xs:element ref="a:jobType" minOccurs="1" maxOccurs="1"/>
+ <xs:element ref="a:nsubjobs" minOccurs="0" maxOccurs="1"/>
+ <xs:element ref="a:subjobs" minOccurs="0" maxOccurs="1"/>
+
+ <!-- timestamps of the state history of the last resubmission cycle,
+ i.e. it is guaranteed that each state apears here only once.
+ Cf. stateEnterTimes in LB JobStatus -->
+
+ <xs:element ref="a:lastStatusHistory" minOccurs="1" maxOccurs="1"/>
+
+ <!-- timestamps of the whole state history, including all resubmission cycles -->
+ <xs:element ref="a:fullStatusHistory" minOccurs="1" maxOccurs="1"/>
+
+ <xs:element ref="a:JDL" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+
+ <!-- No idea where to get these from:
+
+ ENVIRONMENT
+
+ testbed production, preproduction, specific
+ release middleware release (LCG, g-lite...)
+ version version of middleware
+ job_history_version in case of structure changes
+ -->
+
+ <xs:attribute name="jobid" type="xs:string" use="required"/>
+ </xs:complexType>
+
+
+ <xs:element name="jobRecord" type="jr:jobRecordType"/>
+
+</xs:schema>
--- /dev/null
+#ident "$Header$"
+
+int component_seqcode(const char *a, edg_wll_Source index);
+
+char * set_component_seqcode(char *a,edg_wll_Source index,int val);
+
+int before_deep_resubmission(const char *a, const char *b);
+
+int same_branch(const char *a, const char *b) ;
+
+int edg_wll_compare_pbs_seq(const char *a,const char *b);
+#define edg_wll_compare_condor_seq edg_wll_compare_pbs_seq
+
+edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) ;
+
+edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) ;
+
+int edg_wll_compare_seq(const char *a, const char *b);
+
+int compare_events_by_seq(const void *a, const void *b);
+
+
--- /dev/null
+<?xml version="1.0"?>
+
+<xsl:stylesheet version="1.0"
+ xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
+ xmlns:xs="http://www.w3.org/2001/XMLSchema"
+>
+
+<xsl:output method="text"/>
+
+<xsl:template match="xs:schema">
+#ifndef GLITE_LB_JP_JOB_ATTR_H
+#define GLITE_LB_JP_JOB_ATTR_H
+#define GLITE_JP_LB_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes"
+#define GLITE_JP_LB_JDL_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes:JDL"
+ <xsl:apply-templates select="xs:element"/>
+#define GLITE_JP_LB_CLASSAD_NS "http://jdl"
+#endif
+</xsl:template>
+
+<xsl:template match="xs:element">
+/** <xsl:value-of select="xs:documentation/text()"/> */
+#define GLITE_JP_LB_<xsl:value-of select="@name"/> GLITE_JP_LB_NS ":<xsl:value-of select="@name"/>"
+</xsl:template>
+
+</xsl:stylesheet>
--- /dev/null
+#ident "$Header$"
+
+#include <stdlib.h>
+#include <assert.h>
+#include <stdarg.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <time.h>
+#include <ctype.h>
+
+#include <cclassad.h>
+
+#include "glite/lbu/trio.h"
+
+#include "glite/lb/jobstat.h"
+#include "glite/lb/events.h"
+#include "glite/lb/producer.h"
+#include "glite/lb/events_parse.h"
+
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+/*
+#include "glite/lb/context.h"
+#include "glite/lb/jobstat.h"
+#include "glite/lb/events.h"
+#include "glite/lb/events_parse.h"
+#include "glite/lb/producer.h"
+
+#include "jobstat.h"
+#include "get_events.h"
+*/
+
+#include "glite/jp/types.h"
+#include "glite/jp/context.h"
+#include "glite/jp/file_plugin.h"
+#include "glite/jp/builtin_plugins.h"
+#include "glite/jp/backend.h"
+#include "glite/jp/attr.h"
+#include "glite/jp/utils.h"
+#include "glite/jp/known_attr.h"
+#include "job_attrs.h"
+
+#define INITIAL_NUMBER_EVENTS 100
+#define INITIAL_NUMBER_STATES EDG_WLL_NUMBER_OF_STATCODES
+
+/*typedef struct _lb_buffer_t {
+ char *buf;
+ size_t pos, size;
+ off_t offset;
+} lb_buffer_t;*/
+
+typedef struct _lb_historyStatus {
+ edg_wll_JobStatCode state;
+ struct timeval timestamp;
+ char *reason;
+} lb_historyStatus;
+
+typedef struct _lb_handle {
+ edg_wll_Event **events;
+ edg_wll_JobStat status;
+ lb_historyStatus **fullStatusHistory, **lastStatusHistory, *finalStatus;
+ glite_jpps_fplug_data_t* classad_plugin;
+} lb_handle;
+
+#define check_strdup(s) ((s) ? strdup(s) : NULL)
+
+extern int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
+static void edg_wll_SortPEvents(edg_wll_Event **);
+
+static int lb_query(void *fpctx, void *handle, const char *attr, glite_jp_attrval_t **attrval);
+static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle);
+static int lb_close(void *fpctx, void *handle);
+static int lb_filecom(void *fpctx, void *handle);
+static int lb_status(void *handle);
+//static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line);
+
+static int lb_dummy(void *fpctx, void *handle, int oper, ...) {
+ puts("lb_dummy() - generic call not used; for testing purposes only...");
+ return -1;
+}
+
+int init(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data) {
+
+ data->fpctx = ctx;
+
+ data->uris = calloc(2,sizeof *data->uris);
+ data->uris[0] = strdup(GLITE_JP_FILETYPE_LB);
+
+ data->classes = calloc(2,sizeof *data->classes);
+ data->classes[0] = strdup("lb");
+
+ data->namespaces = calloc(4, sizeof *data->namespaces);
+ data->namespaces[0] = strdup(GLITE_JP_LB_NS);
+ data->namespaces[1] = strdup(GLITE_JP_LB_JDL_NS);
+ data->namespaces[2] = strdup(GLITE_JP_LBTAG_NS);
+
+ data->ops.open = lb_open;
+ data->ops.close = lb_close;
+ data->ops.filecom = lb_filecom;
+ data->ops.attr = lb_query;
+ data->ops.generic = lb_dummy;
+
+#ifdef PLUGIN_DEBUG
+ fprintf(stderr,"lb_plugin: init OK\n");
+#endif
+ return 0;
+}
+
+
+void done(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data) {
+ free(data->uris[0]);
+ free(data->classes[0]);
+ free(data->namespaces[0]);
+ free(data->namespaces[1]);
+ free(data->namespaces[2]);
+ free(data->uris);
+ free(data->classes);
+ free(data->namespaces);
+ memset(data, 0, sizeof(*data));
+}
+
+
+static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle) {
+
+ lb_handle *h;
+ rl_buffer_t buffer;
+ glite_jp_context_t ctx = (glite_jp_context_t) fpctx;
+ char *line;
+ int retval;
+ edg_wll_Context context;
+ int nevents, maxnevents, i;
+ glite_jp_error_t err;
+ char *id0 = NULL,*id = NULL;
+
+ glite_jp_clear_error(ctx);
+ h = calloc(1, sizeof(lb_handle));
+
+ if ((retval = edg_wll_InitContext(&context)) != 0) return retval;
+
+ // read the file given by bhandle
+ // parse events into h->events array
+ memset(&buffer, 0, sizeof(buffer));
+ buffer.buf = malloc(BUFSIZ);
+ maxnevents = INITIAL_NUMBER_EVENTS;
+ nevents = 0;
+ h->events = malloc(maxnevents * sizeof(edg_wll_Event *));
+
+ if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) {
+ err.code = retval;
+ err.desc = "reading LB logline";
+ err.source = "lb_plugin.c:read_line()";
+ glite_jp_stack_error(ctx,&err);
+ goto fail;
+ }
+ while (line) {
+#ifdef PLUGIN_DEBUG
+ //fprintf(stderr,"lb_plugin opened\n", line);
+#endif
+
+ if (line[0]) {
+ if (nevents >= maxnevents) {
+ maxnevents <<= 1;
+ h->events = realloc(h->events, maxnevents * sizeof(edg_wll_Event *));
+ }
+ if ((retval = edg_wll_ParseEvent(context, line, &h->events[nevents])) != 0) {
+ char *ed;
+ free(line);
+ err.code = retval;
+ edg_wll_Error(context,NULL,&ed);
+ err.desc = ed;
+ err.source = "edg_wll_ParseEvent()";
+ glite_jp_stack_error(ctx,&err);
+ free(ed);
+ goto fail;
+ }
+ if (nevents == 0) {
+ id0 = edg_wlc_JobIdGetUnique(h->events[nevents]->any.jobId );
+ } else {
+ id = edg_wlc_JobIdGetUnique(h->events[nevents]->any.jobId );
+ if (strcmp(id0,id) != 0) {
+ char et[BUFSIZ];
+ retval = EINVAL;
+ err.code = retval;
+ snprintf(et,sizeof et,"Attempt to process different jobs. Id '%s' (event n.%d) differs from '%s'",id,nevents+1,id0);
+ et[BUFSIZ-1] = 0;
+ err.desc = et;
+ err.source = "lb_plugin.c:edg_wlc_JobIdGetUnique()";
+ glite_jp_stack_error(ctx,&err);
+ goto fail;
+ }
+ }
+
+ if (id) free(id); id = NULL;
+ nevents++;
+ }
+ free(line);
+
+ if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) {
+ err.code = retval;
+ err.desc = "reading LB logline";
+ err.source = "lb_plugin.c:read_line()";
+ glite_jp_stack_error(ctx,&err);
+ goto fail;
+ }
+ }
+ free(line);
+
+ free(buffer.buf);
+ edg_wll_FreeContext(context);
+
+ if (nevents >= maxnevents) {
+ maxnevents <<= 1;
+ h->events = realloc(h->events, maxnevents * sizeof(edg_wll_Event *));
+ }
+ h->events[nevents] = NULL;
+
+#ifdef PLUGIN_DEBUG
+ fprintf(stderr,"lb_plugin: opened %d events\n", nevents);
+#endif
+
+ // find classad plugin, if it is loaded
+ int j;
+ h->classad_plugin = NULL;
+ for (i=0; ctx->plugins[i]; i++){
+ glite_jpps_fplug_data_t *pd = ctx->plugins[i];
+ if (pd->namespaces)
+ for (j=0; pd->classes[j]; j++)
+ if (! strcmp(pd->classes[j], "classad")){
+ h->classad_plugin = pd;
+ goto cont;
+ }
+ }
+cont:
+
+ /* count state and status history of the job given by the loaded events */
+ if ((retval = lb_status(h)) != 0) goto fail;
+
+ *handle = (void *)h;
+
+ return 0;
+
+fail:
+#ifdef PLUGIN_DEBUG
+ fprintf(stderr,"lb_plugin: open ERROR\n");
+#endif
+ for (i = 0; i < nevents; i++) {
+ edg_wll_FreeEvent(h->events[i]);
+ free(h->events[i]);
+ }
+ free(h->events);
+ free(buffer.buf);
+ if (id0) free(id0);
+ if (id) free(id);
+ edg_wll_FreeContext(context);
+ free(h);
+ *handle = NULL;
+ err.code = EIO;
+ err.desc = NULL;
+ err.source = __FUNCTION__;
+ glite_jp_stack_error(ctx,&err);
+
+ return retval;
+}
+
+
+static int lb_close(void *fpctx,void *handle) {
+
+ lb_handle *h = (lb_handle *) handle;
+ int i;
+
+ // Free allocated stuctures
+ if (h->events) {
+ i = 0;
+ while (h->events[i]) {
+ edg_wll_FreeEvent(h->events[i]);
+ free(h->events[i]);
+ i++;
+ }
+ free(h->events);
+ }
+
+ if (h->status.state != EDG_WLL_JOB_UNDEF)
+ edg_wll_FreeStatus(&h->status);
+
+ if (h->fullStatusHistory) {
+ i = 0;
+ while (h->fullStatusHistory[i]) {
+ if (h->fullStatusHistory[i]->reason) free(h->fullStatusHistory[i]->reason);
+ free (h->fullStatusHistory[i]);
+ i++;
+ }
+ h->fullStatusHistory = NULL;
+ h->lastStatusHistory = NULL;
+ h->finalStatus = NULL;
+ }
+
+ free(h);
+
+#ifdef PLUGIN_DEBUG
+ fprintf(stderr,"lb_plugin: close OK\n");
+#endif
+ return 0;
+}
+
+static int get_classad_attr(const char* attr, glite_jp_context_t ctx, lb_handle *h, glite_jp_attrval_t **av){
+ printf("attr = %s\n", attr);
+ glite_jp_error_t err;
+ glite_jp_clear_error(ctx);
+ memset(&err,0,sizeof err);
+ err.source = __FUNCTION__;
+
+ if (! h->classad_plugin){
+ err.code = ENOENT;
+ err.desc = strdup("Classad plugin has not been loaded.");
+ return glite_jp_stack_error(ctx,&err);
+ }
+ // Get the attribute from JDL
+ int i = 0;
+ while (h->events[i]){
+ if (h->events[i]->type == EDG_WLL_EVENT_REGJOB
+ && h->events[i]->regJob.jdl
+ && h->events[i]->regJob.jdl[0])
+ {
+ void *beh;
+ if (! h->classad_plugin->ops.open_str(h->classad_plugin->fpctx, h->events[i]->regJob.jdl, "", "", &beh)){
+ if (! h->classad_plugin->ops.attr(h->classad_plugin->fpctx, beh, attr, av))
+ (*av)[0].timestamp = h->events[i]->any.timestamp.tv_sec;
+ else{
+ h->classad_plugin->ops.close(h->classad_plugin->fpctx, beh);
+ err.code = ENOENT;
+ err.desc = strdup("Classad attribute not found.");
+ return glite_jp_stack_error(ctx,&err);
+ }
+ h->classad_plugin->ops.close(h->classad_plugin->fpctx, beh);
+ }
+ }
+ i++;
+ }
+ return 0;
+}
+
+static int lb_query(void *fpctx,void *handle, const char *attr,glite_jp_attrval_t **attrval) {
+ lb_handle *h = (lb_handle *) handle;
+ glite_jp_context_t ctx = (glite_jp_context_t) fpctx;
+ glite_jp_error_t err;
+ glite_jp_attrval_t *av = NULL;
+ int i, j, n_tags;
+ char *ns = glite_jpps_get_namespace(attr);
+ char *tag;
+
+ glite_jp_clear_error(ctx);
+ memset(&err,0,sizeof err);
+ err.source = __FUNCTION__;
+
+ if ((h->events == NULL) ||
+ (h->status.state == EDG_WLL_JOB_UNDEF) ||
+ (h->fullStatusHistory == NULL) ) {
+ *attrval = NULL;
+ err.code = ENOENT;
+ err.desc = strdup("There is no job information to query.");
+ return glite_jp_stack_error(ctx,&err);
+ }
+
+ if (strcmp(ns, GLITE_JP_LB_JDL_NS) == 0){
+ if (get_classad_attr(attr, ctx, h, &av)){
+ *attrval = NULL;
+ err.code = ENOENT;
+ err.desc = strdup("Cannot get attribute from classad.");
+ free(ns);
+ return glite_jp_stack_error(ctx,&err);
+ }
+ }
+ else if (strcmp(attr, GLITE_JP_LB_user) == 0) {
+ if (h->status.owner) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->status.owner);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_jobId) == 0) {
+ if (h->status.jobId) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = edg_wlc_JobIdUnparse(h->status.jobId);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_parent) == 0) {
+ if (h->status.parent_job) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = edg_wlc_JobIdUnparse(h->status.parent_job);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_VO) == 0) {
+ if (get_classad_attr(":VirtualOrganisation", ctx, h, &av)){
+ printf("error");
+ *attrval = NULL;
+ err.code = ENOENT;
+ err.desc = strdup("Cannot get attribute from classad.");
+ free(ns);
+ return glite_jp_stack_error(ctx,&err);
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_eNodes) == 0) {
+ if (get_classad_attr(":max_nodes_running", ctx, h, &av)){
+ printf("error");
+ *attrval = NULL;
+ err.code = ENOENT;
+ err.desc = strdup("Cannot get attribute from classad.");
+ free(ns);
+ return glite_jp_stack_error(ctx,&err);
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_eProc) == 0) {
+ if (get_classad_attr(":NodeNumber", ctx, h, &av)){
+ printf("error");
+ *attrval = NULL;
+ err.code = ENOENT;
+ err.desc = strdup("Cannot get attribute from classad.");
+ free(ns);
+ return glite_jp_stack_error(ctx,&err);
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_aTag) == 0 ||
+ strcmp(attr, GLITE_JP_LB_rQType) == 0 ||
+ strcmp(attr, GLITE_JP_LB_eDuration) == 0) {
+ /* have to be retrieved from JDL, but probably obsolete and not needed at all */
+ char et[BUFSIZ];
+ *attrval = NULL;
+ err.code = ENOSYS;
+ snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr);
+ et[BUFSIZ-1] = 0;
+ err.desc = et;
+ return glite_jp_stack_error(ctx,&err);
+ } else if (strcmp(attr, GLITE_JP_LB_RB) == 0) {
+ if (h->status.network_server) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->status.network_server);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_CE) == 0) {
+ if (h->status.destination) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->status.destination);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_host) == 0) {
+ if (h->status.ce_node) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->status.ce_node);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_UIHost) == 0) {
+ i = 0;
+ while (h->events[i]) {
+ if (h->events[i]->type == EDG_WLL_EVENT_REGJOB) {
+ if (h->events[i]->any.host) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->events[i]->any.host);
+ av[0].size = -1;
+ av[0].timestamp = h->events[i]->any.timestamp.tv_sec;
+ }
+ break;
+ }
+ i++;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_CPUTime) == 0) {
+ if (h->status.cpuTime) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ trio_asprintf(&av[0].value,"%d", h->status.cpuTime);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_NProc) == 0) {
+ /* currently LB hasn't got the info */
+ char et[BUFSIZ];
+ *attrval = NULL;
+ err.code = ENOSYS;
+ snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr);
+ et[BUFSIZ-1] = 0;
+ err.desc = et;
+ return glite_jp_stack_error(ctx,&err);
+ } else if (strcmp(attr, GLITE_JP_LB_finalStatus) == 0) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ if (h->finalStatus) {
+ av[0].value = edg_wll_StatToString(h->finalStatus->state);
+ av[0].timestamp = h->finalStatus->timestamp.tv_sec;
+ } else {
+ av[0].value = edg_wll_StatToString(h->status.state);
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ av[0].size = -1;
+ } else if (strcmp(attr, GLITE_JP_LB_finalStatusDate) == 0) {
+ struct tm *t = NULL;
+ if ( (h->finalStatus) &&
+ ((t = gmtime(&h->finalStatus->timestamp.tv_sec)) != NULL) ) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */
+ trio_asprintf(&av[0].value,"%04d-%02d-%02dT%02d:%02d:%02d.%06d",
+ 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
+ t->tm_hour, t->tm_min, t->tm_sec,
+ h->finalStatus->timestamp.tv_usec);
+ av[0].size = -1;
+ av[0].timestamp = h->finalStatus->timestamp.tv_sec;
+ } else if ((t = gmtime(&h->status.lastUpdateTime.tv_sec)) != NULL) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */
+ trio_asprintf(&av[0].value,"%04d-%02d-%02dT%02d:%02d:%02d.%06d",
+ 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
+ t->tm_hour, t->tm_min, t->tm_sec,
+ h->status.lastUpdateTime.tv_usec);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_finalStatusReason) == 0) {
+ if (h->finalStatus && h->finalStatus->reason) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->finalStatus->reason);
+ av[0].size = -1;
+ av[0].timestamp = h->finalStatus->timestamp.tv_sec;
+ } else if (h->status.reason) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->status.reason);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_LRMSDoneStatus) == 0) {
+ i = 0;
+ j = -1;
+ while (h->events[i]) {
+ if ( (h->events[i]->type == EDG_WLL_EVENT_DONE) &&
+ (h->events[i]->any.source == EDG_WLL_SOURCE_LRMS) )
+ j = i;
+ i++;
+ }
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].size = -1;
+ if ( j != -1) {
+ av[0].value = edg_wll_DoneStatus_codeToString(h->events[j]->done.status_code);
+ av[0].timestamp = h->events[j]->any.timestamp.tv_sec;
+ } else {
+ av[0].value = edg_wll_DoneStatus_codeToString(h->status.done_code);
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_LRMSStatusReason) == 0) {
+ i = 0;
+ j = -1;
+ while (h->events[i]) {
+ if ( (h->events[i]->type == EDG_WLL_EVENT_DONE) &&
+ (h->events[i]->any.source == EDG_WLL_SOURCE_LRMS) )
+ j = i;
+ i++;
+ }
+ if ( ( j != -1) && (h->events[j]->done.reason) ) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->events[j]->done.reason);
+ av[0].size = -1;
+ av[0].timestamp = h->events[j]->any.timestamp.tv_sec;
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_retryCount) == 0) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ trio_asprintf(&av[0].value,"%d", h->status.resubmitted);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ } else if (strcmp(attr, GLITE_JP_LB_additionalReason) == 0) {
+ /* what is it? */
+ char et[BUFSIZ];
+ *attrval = NULL;
+ err.code = ENOSYS;
+ snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr);
+ et[BUFSIZ-1] = 0;
+ err.desc = et;
+ return glite_jp_stack_error(ctx,&err);
+ } else if (strcmp(attr, GLITE_JP_LB_jobType) == 0) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ switch (h->status.jobtype) {
+ case EDG_WLL_STAT_SIMPLE:
+ av[0].value = strdup("SIMPLE"); break;
+ case EDG_WLL_STAT_DAG:
+ av[0].value = strdup("DAG"); break;
+ default:
+ av[0].value = strdup("UNKNOWN"); break;
+ }
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ } else if (strcmp(attr, GLITE_JP_LB_nsubjobs) == 0) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ trio_asprintf(&av[0].value,"%d", h->status.children_num);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ } else if (strcmp(attr, GLITE_JP_LB_subjobs) == 0) {
+ if (h->status.children_num > 0) {
+ char *val = NULL, *old_val;
+
+ old_val = strdup ("");
+ for (i=0; i<h->status.children_num; i++) {
+ trio_asprintf(&val,"%s\t\t<jobId>%s</jobId>\n",
+ old_val, h->status.children[i] ? h->status.children[i] : "");
+ if (old_val) free(old_val);
+ old_val = val; val = NULL;
+ }
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = check_strdup(old_val);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ } else {
+ char et[BUFSIZ];
+ *attrval = NULL;
+ err.code = ENOENT;
+ snprintf(et,sizeof et,"Value unknown for attribute '%s', there are no subjobs.",attr);
+ et[BUFSIZ-1] = 0;
+ err.desc = et;
+ return glite_jp_stack_error(ctx,&err);
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_lastStatusHistory) == 0) {
+ int i,j;
+ char *val, *old_val, *s_str, *t_str, *r_str;
+ struct tm *t;
+
+ val = s_str = t_str = r_str = NULL;
+ old_val = strdup("");
+ t = calloc(1, sizeof(*t));
+ /* first record is Submitted - hopefully in fullStatusHistory[0] */
+ if ((h->fullStatusHistory[0] &&
+ (h->fullStatusHistory[0]->state == EDG_WLL_JOB_SUBMITTED)) ) {
+
+ s_str = edg_wll_StatToString(h->fullStatusHistory[0]->state);
+ for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]);
+ if (gmtime_r(&h->fullStatusHistory[0]->timestamp.tv_sec,t) != NULL) {
+ /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */
+ trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ",
+ 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
+ t->tm_hour, t->tm_min, t->tm_sec,
+ h->fullStatusHistory[0]->timestamp.tv_usec);
+ }
+ if (h->fullStatusHistory[0]->reason) {
+ trio_asprintf(&r_str,"reason=\"%s\" ",h->fullStatusHistory[0]->reason);
+ }
+ trio_asprintf(&val,"%s\t\t<status xmlns=\"" GLITE_JP_LB_NS "\" name=\"%s\" %s%s/>\n",
+ old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : "");
+ if (s_str) free(s_str);
+ if (t_str) free(t_str);
+ if (r_str) free(t_str);
+ if (old_val) free(old_val);
+ old_val = val; val = NULL;
+ }
+ /* and the rest is from last Waiting to the end - i.e. all lastStatusHistory[] */
+ if (h->lastStatusHistory) {
+ i = 0;
+ while (h->lastStatusHistory[i]) {
+ s_str = edg_wll_StatToString(h->lastStatusHistory[i]->state);
+ for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]);
+ if (gmtime_r(&h->lastStatusHistory[i]->timestamp.tv_sec,t) != NULL) {
+ /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */
+ trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ",
+ 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
+ t->tm_hour, t->tm_min, t->tm_sec,
+ h->lastStatusHistory[i]->timestamp.tv_usec);
+ }
+ if (h->lastStatusHistory[i]->reason) {
+ trio_asprintf(&r_str,"reason=\"%s\" ",h->lastStatusHistory[i]->reason);
+ }
+ trio_asprintf(&val,"%s\t\t<status xmlns=\"" GLITE_JP_LB_NS "\" name=\"%s\" %s%s/>\n",
+ old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : "");
+ if (s_str) free(s_str); s_str = NULL;
+ if (t_str) free(t_str); t_str = NULL;
+ if (r_str) free(r_str); r_str = NULL;
+ if (old_val) free(old_val);
+ old_val = val; val = NULL;
+ i++;
+ }
+ }
+ val = old_val; old_val = NULL;
+ if (val) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(val);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ free(val);
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_fullStatusHistory) == 0) {
+ int i,j;
+ char *val, *old_val, *s_str, *t_str, *r_str;
+ struct tm *t;
+
+ val = s_str = t_str = r_str = NULL;
+ old_val = strdup("");
+ t = calloc(1, sizeof(*t));
+ i = 0;
+ while (h->fullStatusHistory[i]) {
+ s_str = edg_wll_StatToString(h->fullStatusHistory[i]->state);
+ for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]);
+ if (gmtime_r(&h->fullStatusHistory[i]->timestamp.tv_sec,t) != NULL) {
+ /* dateTime format: yyyy-mm-ddThh:mm:ss:uuuuuu */
+ trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ",
+ 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
+ t->tm_hour, t->tm_min, t->tm_sec,
+ h->fullStatusHistory[i]->timestamp.tv_usec);
+ }
+ if (h->fullStatusHistory[i]->reason) {
+ trio_asprintf(&r_str,"reason=\"%s\" ",h->fullStatusHistory[i]->reason);
+ }
+ trio_asprintf(&val,"%s\t\t<status xmlns=\"" GLITE_JP_LB_NS "\" name=\"%s\" %s%s/>\n",
+ old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : "");
+ if (s_str) free(s_str); s_str = NULL;
+ if (t_str) free(t_str); t_str = NULL;
+ if (r_str) free(r_str); r_str = NULL;
+ if (old_val) free(old_val);
+ old_val = val; val = NULL;
+ i++;
+ }
+ val = old_val; old_val = NULL;
+ if (val) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(val);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ free(val);
+ }
+ } else if (strcmp(ns, GLITE_JP_LBTAG_NS) == 0) {
+ tag = strrchr(attr, ':');
+ if (h->events && tag) {
+ tag++;
+ i = 0;
+ n_tags = 0;
+
+ while (h->events[i]) {
+ if ((h->events[i]->type == EDG_WLL_EVENT_USERTAG) &&
+ (strcasecmp(h->events[i]->userTag.name, tag) == 0) ) {
+/* XXX: LB tag names are case-insensitive */
+ av = realloc(av, (n_tags+2) * sizeof(glite_jp_attrval_t));
+ memset(&av[n_tags], 0, 2 * sizeof(glite_jp_attrval_t));
+
+ av[n_tags].name = strdup(attr);
+ av[n_tags].value = check_strdup(h->events[i]->userTag.value);
+ av[n_tags].timestamp =
+ h->events[i]->any.timestamp.tv_sec;
+ av[n_tags].size = -1;
+
+ n_tags++;
+ }
+ i++;
+ }
+ }
+ } else if (strcmp(attr, GLITE_JP_LB_JDL) == 0) {
+ if (h->status.jdl) {
+ av = calloc(2, sizeof(glite_jp_attrval_t));
+ av[0].name = strdup(attr);
+ av[0].value = strdup(h->status.jdl);
+ av[0].size = -1;
+ av[0].timestamp = h->status.lastUpdateTime.tv_sec;
+ }
+ } else {
+ char et[BUFSIZ];
+ *attrval = NULL;
+ err.code = EINVAL;
+ snprintf(et,sizeof et,"No such attribute '%s'.",attr);
+ et[BUFSIZ-1] = 0;
+ err.desc = et;
+ return glite_jp_stack_error(ctx,&err);
+ }
+
+ free(ns);
+
+ if (av && av[0].value) {
+ for (i=0; av[i].name; i++) av[i].origin = GLITE_JP_ATTR_ORIG_FILE;
+ *attrval = av;
+ return 0;
+ } else {
+ char et[BUFSIZ];
+ *attrval = NULL;
+ err.code = ENOENT;
+ snprintf(et,sizeof et,"Value unknown for attribute '%s'.",attr);
+ et[BUFSIZ-1] = 0;
+ err.desc = et;
+ if (av) glite_jp_attrval_free(av,1); // XXX: probably not needed
+ return glite_jp_stack_error(ctx,&err);
+ }
+}
+
+static int lb_filecom(void *fpctx, void *handle){
+ glite_jp_context_t ctx = (glite_jp_context_t) fpctx;
+ lb_handle *h = (lb_handle *) handle;
+ glite_jp_attrval_t attr[2];
+ memset(attr, 0, 2 * sizeof(glite_jp_attrval_t));
+
+ if (h->events) {
+ int i = 0;
+ while (h->events[i]) {
+ if (h->events[i]->type == EDG_WLL_EVENT_USERTAG &&
+ strchr(h->events[i]->userTag.name,':'))
+ {
+ //printf("%s, %s\n", edg_wlc_JobIdUnparse(h->status.jobId), h->status.jobId);
+ attr[0].name = h->events[i]->userTag.name;
+ attr[0].value = h->events[i]->userTag.value;
+ attr[0].binary = 0;
+ attr[0].origin = GLITE_JP_ATTR_ORIG_USER;
+ attr[0].timestamp = time(NULL);
+ attr[0].origin_detail = NULL; /* XXX */
+ attr[1].name = NULL;
+ glite_jppsbe_append_tag(ctx, edg_wlc_JobIdUnparse(h->status.jobId), attr);
+ }
+ i++;
+ }
+ }
+
+ return 0;
+}
+
+static int lb_status(void *handle) {
+
+ lb_handle *h = (lb_handle *) handle;
+ intJobStat *js;
+ int maxnstates, nstates, i, be_strict = 0, retval;
+ char *errstring;
+ edg_wll_JobStatCode old_state = EDG_WLL_JOB_UNDEF;
+
+ js = calloc(1, sizeof(intJobStat));
+ init_intJobStat(js);
+
+ edg_wll_SortPEvents(h->events);
+
+ maxnstates = INITIAL_NUMBER_STATES;
+ nstates = 0;
+ h->fullStatusHistory = calloc(maxnstates, sizeof(lb_historyStatus *));
+ h->lastStatusHistory = NULL;
+ h->finalStatus = NULL;
+ i = 0;
+ while (h->events[i])
+ {
+ /* realloc the fullStatusHistory if needed */
+ if (nstates >= maxnstates) {
+ maxnstates <<= 1;
+ h->fullStatusHistory = realloc(h->fullStatusHistory, maxnstates * sizeof(lb_historyStatus *));
+ }
+
+ /* job owner and jobId not filled from events normally */
+ if (h->events[i]->any.type == EDG_WLL_EVENT_REGJOB) {
+ js->pub.owner = check_strdup(h->events[i]->any.user);
+ if (edg_wlc_JobIdDup(h->events[i]->any.jobId,&js->pub.jobId)) {
+ goto err;
+ }
+ }
+ /* Process Event and update the state */
+ if (processEvent(js, h->events[i], 0, be_strict, &errstring) == RET_FATAL) {
+ goto err;
+ }
+
+ /* if the state has changed, update the status history */
+ if (js->pub.state != old_state) {
+ h->fullStatusHistory[nstates] = calloc(1,sizeof(lb_historyStatus));
+ h->fullStatusHistory[nstates]->state = js->pub.state;
+ h->fullStatusHistory[nstates]->timestamp.tv_sec = js->pub.stateEnterTime.tv_sec;
+ h->fullStatusHistory[nstates]->timestamp.tv_usec = js->pub.stateEnterTime.tv_usec;
+ h->fullStatusHistory[nstates]->reason = check_strdup(js->pub.reason);
+ /* lastStatusHistory starts from the last WAITING state */
+ if (js->pub.state == EDG_WLL_JOB_WAITING) {
+ h->lastStatusHistory = &(h->fullStatusHistory[nstates]);
+ }
+ /* finalStatus is the one preceeding the CLEARED state */
+ if ( (js->pub.state == EDG_WLL_JOB_CLEARED) && (nstates > 0) ) {
+ h->finalStatus = h->fullStatusHistory[nstates-1];
+ }
+ old_state = js->pub.state;
+ nstates++;
+ }
+
+ i++;
+ }
+ h->fullStatusHistory[nstates] = NULL;
+ /* if there is no CLEARED state, finalStatus is just the last status
+ and if there is no such thing, leave h->finalStatus NULL and for the attribute
+ try to read something from the h->status */
+ if ( (h->finalStatus == NULL) && (nstates > 0) ) {
+ h->finalStatus = h->fullStatusHistory[nstates-1];
+ }
+
+ /* fill in also subjobs */
+ if (js->pub.children_num > 0) {
+ edg_wll_Context context;
+ edg_wlc_JobId *subjobs;
+
+ if ((retval = edg_wll_InitContext(&context)) != 0) return retval;
+ subjobs = calloc(js->pub.children_num, sizeof (*subjobs));
+ if ((retval = edg_wll_GenerateSubjobIds(context,
+ js->pub.jobId, js->pub.children_num, js->pub.seed, &subjobs) ) != 0 ) {
+ goto err;
+ }
+ js->pub.children = calloc(js->pub.children_num + 1, sizeof (*js->pub.children));
+ for (i=0; i<js->pub.children_num; i++) {
+ js->pub.children[i] = edg_wlc_JobIdUnparse(subjobs[i]);
+ }
+ edg_wll_FreeContext(context);
+ free(subjobs);
+ }
+
+ memcpy(&h->status, &js->pub, sizeof(edg_wll_JobStat));
+
+ // not very clean, but working
+ memset(&js->pub, 0, sizeof(edg_wll_JobStat));
+ destroy_intJobStat(js);
+
+ return 0;
+
+err:
+ destroy_intJobStat(js);
+ return -1;
+}
+
+
+/*
+ * realloc the line to double size if needed
+ *
+ * \return 0 if failed, did nothing
+ * \return 1 if success
+ */
+/*int check_realloc_line(char **line, size_t *maxlen, size_t len) {
+ void *tmp;
+
+ if (len > *maxlen) {
+ *maxlen <<= 1;
+ tmp = realloc(*line, *maxlen);
+ if (!tmp) return 0;
+ *line = tmp;
+ }
+
+ return 1;
+}
+*/
+
+/*
+ * read next line from stream
+ *
+ * \return error code
+ */
+/*static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line) {
+
+ size_t maxlen, len, i;
+ ssize_t nbytes;
+ int retval, z, end;
+
+ maxlen = BUFSIZ;
+ i = 0;
+ len = 0;
+ *line = malloc(maxlen);
+ end = 0;
+
+ do {
+ // read next portion
+ if (buffer->pos >= buffer->size) {
+ buffer->pos = 0;
+ buffer->size = 0;
+ if ((retval = glite_jppsbe_pread(ctx, handle, buffer->buf, BUFSIZ, buffer->offset, &nbytes)) == 0) {
+ if (nbytes < 0) {
+ retval = EINVAL;
+ goto fail;
+ } else {
+ if (nbytes) {
+ buffer->size = (size_t)nbytes;
+ buffer->offset += nbytes;
+ } else end = 1;
+ }
+ } else goto fail;
+ }
+
+ // we have buffer->size - buffer->pos bytes
+ i = buffer->pos;
+ do {
+ if (i >= buffer->size) z = '\0';
+ else {
+ z = buffer->buf[i];
+ if (z == '\n') z = '\0';
+ }
+ len++;
+
+ if (!check_realloc_line(line, &maxlen, len)) {
+ retval = ENOMEM;
+ goto fail;
+ }
+ (*line)[len - 1] = z;
+ i++;
+ } while (z && i < buffer->size);
+ buffer->pos = i;
+ } while (len && (*line)[len - 1] != '\0');
+
+ if ((!len || !(*line)[0]) && end) {
+ free(*line);
+ *line = NULL;
+ }
+
+ return 0;
+
+fail:
+ free(*line);
+ *line = NULL;
+ return retval;
+}
+*/
+
+
+static int compare_pevents_by_seq(const void *a, const void *b)
+{
+ const edg_wll_Event **e = (const edg_wll_Event **) a;
+ const edg_wll_Event **f = (const edg_wll_Event **) b;
+ return compare_events_by_seq(*e,*f);
+}
+
+
+static void edg_wll_SortPEvents(edg_wll_Event **e)
+{
+ edg_wll_Event **p;
+ int n;
+
+ if (!e) return;
+ p = e;
+ for (n=0; *p; n++) {
+ p++;
+ }
+ qsort(e,n,sizeof(*e),compare_pevents_by_seq);
+}
+
--- /dev/null
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <syslog.h>
+
+#include "glite/lb/context-int.h"
+
+#include "glite/lbu/trio.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+
+/* TBD: share in whole logging or workload */
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+
+int add_stringlist(char ***lptr, const char *new_item);
+
+int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
+{
+ if (js->pub.jobtype == -1 && e->type == EDG_WLL_EVENT_REGJOB)
+ switch (e->regJob.jobtype) {
+ case EDG_WLL_REGJOB_SIMPLE:
+ js->pub.jobtype = EDG_WLL_STAT_SIMPLE;
+ break;
+ case EDG_WLL_REGJOB_DAG:
+ case EDG_WLL_REGJOB_PARTITIONABLE:
+ case EDG_WLL_REGJOB_PARTITIONED:
+ js->pub.jobtype = EDG_WLL_STAT_DAG;
+ break;
+ case EDG_WLL_REGJOB_COLLECTION:
+ js->pub.jobtype = EDG_WLL_STAT_COLLECTION;
+ break;
+ case EDG_WLL_REGJOB_PBS:
+ js->pub.jobtype = EDG_WLL_STAT_PBS;
+ break;
+ case EDG_WLL_REGJOB_CONDOR:
+ js->pub.jobtype = EDG_WLL_STAT_CONDOR;
+ break;
+ default:
+ trio_asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype);
+ return RET_FAIL;
+ }
+
+ switch (js->pub.jobtype) {
+ case EDG_WLL_STAT_SIMPLE:
+ case EDG_WLL_STAT_DAG:
+ case EDG_WLL_STAT_COLLECTION:
+ return processEvent_glite(js,e,ev_seq,strict,errstring);
+ case EDG_WLL_STAT_PBS:
+ return processEvent_PBS(js,e,ev_seq,strict,errstring);
+ case EDG_WLL_STAT_CONDOR:
+ return processEvent_Condor(js,e,ev_seq,strict,errstring);
+ case -1: return RET_UNREG;
+ default:
+ trio_asprintf(errstring,"undefined job type %d",js->pub.jobtype);
+ return RET_FAIL;
+ }
+}
+
+#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
+#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } }
+
+static void free_stringlist(char ***lptr)
+{
+ char **itptr;
+ int i;
+
+ if (*lptr) {
+ for (i = 0, itptr = *lptr; itptr[i] != NULL; i++)
+ free(itptr[i]);
+ free(itptr);
+ *lptr = NULL;
+ }
+}
+
+static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char *new_item2)
+{
+ edg_wll_TagValue *itptr;
+ int i;
+
+ if (*lptr == NULL) {
+ itptr = (edg_wll_TagValue *) calloc(2,sizeof(edg_wll_TagValue));
+ itptr[0].tag = strdup(new_item);
+ itptr[0].value = strdup(new_item2);
+ *lptr = itptr;
+ return 1;
+ } else {
+ for (i = 0, itptr = *lptr; itptr[i].tag != NULL; i++)
+ if ( !strcasecmp(itptr[i].tag, new_item) )
+ {
+ free(itptr[i].value);
+ itptr[i].value = strdup(new_item2);
+ return 1;
+ }
+ itptr = (edg_wll_TagValue *) realloc(*lptr, (i+2)*sizeof(edg_wll_TagValue));
+ if (itptr != NULL) {
+ itptr[i].tag = strdup(new_item);
+ itptr[i].value = strdup(new_item2);
+ itptr[i+1].tag = NULL;
+ itptr[i+1].value = NULL;
+ *lptr = itptr;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
+
+
+static void update_branch_state(char *b, char *d, char *c, char *j, branch_state **bs)
+{
+ int i = 0, branch;
+
+
+ if (!b)
+ return;
+ else
+ branch = component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER);
+
+ if (*bs != NULL) {
+ while ((*bs)[i].branch) {
+ if (branch == (*bs)[i].branch) {
+ if (d) rep((*bs)[i].destination, d);
+ if (c) rep((*bs)[i].ce_node, c);
+ if (j) rep((*bs)[i].jdl, j);
+
+ return;
+ }
+ i++;
+ }
+ }
+
+ *bs = (branch_state *) realloc(*bs, (i+2)*sizeof(branch_state));
+ memset(&((*bs)[i]), 0, 2*sizeof(branch_state));
+
+ (*bs)[i].branch = branch;
+ rep((*bs)[i].destination, d);
+ rep((*bs)[i].ce_node, c);
+ rep((*bs)[i].jdl, j);
+}
+
+
+static void free_branch_state(branch_state **bs)
+{
+ int i = 0;
+
+ if (*bs == NULL) return;
+
+ while ((*bs)[i].branch) {
+ free((*bs)[i].destination);
+ free((*bs)[i].ce_node);
+ free((*bs)[i].jdl);
+ i++;
+ }
+ free(*bs);
+ *bs = NULL;
+}
+
+static int compare_branch_states(const void *a, const void *b)
+{
+ branch_state *c = (branch_state *) a;
+ branch_state *d = (branch_state *) b;
+
+ if (c->branch < d->branch) return -1;
+ if (c->branch == d->branch) return 0;
+ /* avoid warning: if (c->branch > d->branch) */ return 1;
+}
+
+static void load_branch_state(intJobStat *js)
+{
+ int i, j, branch;
+
+
+ if ( (!js->branch_tag_seqcode) || (!js->branch_states) ) return;
+
+ branch = component_seqcode(js->branch_tag_seqcode, EDG_WLL_SOURCE_WORKLOAD_MANAGER);
+
+ // count elements
+ i = 0;
+ while (js->branch_states[i].branch) i++;
+
+ // sort them
+ qsort(js->branch_states, (size_t) i, sizeof(branch_state),
+ compare_branch_states);
+
+ // find row corresponding to ReallyRunning WM seq.code (aka branch)
+ i = 0;
+ while (js->branch_states[i].branch) {
+ if (js->branch_states[i].branch == branch) break;
+ i++;
+ }
+
+ // copy this and two before branches data to final state
+ // (each field - dest,ce,jdl - comes from different event)
+ // (and these events have most likely different WM seq.codes)
+ // (even belonging into one logical branch)
+ // (the newer the more important - so i-th element is copied as last)
+ // (and may overwrite data from previous elements)
+ for (j = i - 2; j <= i; j++) {
+ if (j >= 0) {
+ if (js->branch_states[j].destination)
+ rep(js->pub.destination, js->branch_states[j].destination);
+ if (js->branch_states[j].ce_node)
+ rep(js->pub.ce_node, js->branch_states[j].ce_node);
+ if (js->branch_states[j].jdl)
+ rep(js->pub.matched_jdl, js->branch_states[j].jdl);
+ }
+ }
+}
+
+// clear branches (deep resub. or abort)
+static void reset_branch(intJobStat *js, edg_wll_Event *e)
+{
+ js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB;
+ free_stringlist(&js->pub.possible_destinations);
+ free_stringlist(&js->pub.possible_ce_nodes);
+ free_branch_state(&js->branch_states);
+ js->pub.payload_running = 0;
+ rep(js->branch_tag_seqcode, NULL);
+ rep(js->deep_resubmit_seqcode, e->any.seqcode);
+}
+
+static char* location_string(const char *source, const char *host, const char *instance)
+{
+ char *ret;
+ trio_asprintf(&ret, "%s/%s/%s", source, host, instance);
+ return ret;
+}
+
+/* is seq. number of 'es' before WMS higher then 'js' */
+static int after_enter_wm(const char *es,const char *js)
+{
+ return ((component_seqcode(es,EDG_WLL_SOURCE_NETWORK_SERVER) >
+ component_seqcode(js,EDG_WLL_SOURCE_NETWORK_SERVER))
+ ||
+ (component_seqcode(es,EDG_WLL_SOURCE_USER_INTERFACE) >
+ component_seqcode(js,EDG_WLL_SOURCE_USER_INTERFACE)));
+}
+
+
+static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUSED_VAR)
+{
+ char *str;
+
+ str = edg_wll_EventToString(e->any.type);
+ fprintf(stderr, "edg_wll_JobStatus: bad event: type %d (%s)\n",
+ e->any.type, (str == NULL) ? "unknown" : str);
+ free(str);
+ return RET_FATAL;
+}
+
+
+// (?) || (0 && 1) => true if (res == RET_OK)
+#define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict))
+
+// (?) || (1 && 1) => always true
+#define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict))
+
+#define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH)
+#define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE)
+#define PARSABLE_SEQCODE(code) (component_seqcode((code),0) >= 0)
+
+static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
+{
+ edg_wll_JobStatCode old_state = js->pub.state;
+ edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN;
+ int res = RET_OK,
+ fine_res = RET_OK;
+
+ int lm_favour_lrms = 0;
+
+ // Aborted may not be terminal state for collection in some cases
+ // i.e. if some Done/failed subjob is resubmitted
+ if ( (old_state == EDG_WLL_JOB_ABORTED && e->any.type != EDG_WLL_EVENT_COLLECTIONSTATE) ||
+ old_state == EDG_WLL_JOB_CANCELLED ||
+ old_state == EDG_WLL_JOB_CLEARED) {
+ res = RET_LATE;
+ }
+
+/* new event coming from NS or UI => forget about any resubmission loops */
+ if (e->type != EDG_WLL_EVENT_CANCEL &&
+ js->last_seqcode &&
+ after_enter_wm(e->any.seqcode,js->last_seqcode))
+ {
+ rep(js->branch_tag_seqcode,NULL);
+ rep(js->deep_resubmit_seqcode,NULL);
+ rep(js->last_branch_seqcode,NULL);
+ }
+
+ if (js->deep_resubmit_seqcode &&
+ before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) {
+ res = RET_LATE;
+ fine_res = RET_TOOOLD;
+ }
+ else if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived
+ if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) {
+ if ((js->last_branch_seqcode != NULL) &&
+ edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) {
+ res = RET_LATE;
+ }
+ fine_res = RET_GOODBRANCH;
+ }
+ else {
+ res = RET_LATE;
+ fine_res = RET_BADBRANCH;
+ }
+ }
+ else if ((js->last_seqcode != NULL) &&
+ edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) {
+ res = RET_LATE;
+ }
+
+
+ switch (e->any.type) {
+ case EDG_WLL_EVENT_TRANSFER:
+ if (e->transfer.result == EDG_WLL_TRANSFER_OK) {
+ switch (e->transfer.source) {
+ case EDG_WLL_SOURCE_USER_INTERFACE:
+ new_state = EDG_WLL_JOB_WAITING; break;
+ case EDG_WLL_SOURCE_JOB_SUBMISSION:
+ /* if (LRMS_STATE(old_state)) res = RET_LATE; */
+ new_state = EDG_WLL_JOB_READY; break;
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ if (LRMS_STATE(old_state)) {
+ js->pub.stateEnterTimes[1 + EDG_WLL_JOB_SCHEDULED] =
+ e->any.timestamp.tv_sec;
+ res = RET_LATE;
+ }
+ new_state = EDG_WLL_JOB_SCHEDULED;
+ lm_favour_lrms = 1;
+ break;
+ default:
+ goto bad_event; break;
+ }
+ } else if (e->transfer.result == EDG_WLL_TRANSFER_FAIL) {
+ /* transfer failed */
+ switch (e->transfer.source) {
+ case EDG_WLL_SOURCE_USER_INTERFACE:
+ new_state = EDG_WLL_JOB_SUBMITTED; break;
+ case EDG_WLL_SOURCE_JOB_SUBMISSION:
+ if (LRMS_STATE(old_state)) res = RET_LATE;
+ new_state = EDG_WLL_JOB_READY; break;
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ if (LRMS_STATE(old_state)) res = RET_LATE;
+ new_state = EDG_WLL_JOB_READY; break;
+ default:
+ goto bad_event; break;
+ }
+ } else {
+ /* e->transfer.result == EDG_WLL_TRANSFER_START */
+ res = RET_IGNORE;
+ }
+ if (USABLE(res, strict)) {
+ js->pub.state = new_state;
+ rep(js->pub.reason, e->transfer.reason);
+
+ free(js->pub.location);
+ if (e->transfer.result == EDG_WLL_TRANSFER_OK) {
+ js->pub.location = location_string(
+ edg_wll_SourceToString(e->transfer.destination),
+ e->transfer.dest_host,
+ e->transfer.dest_instance);
+ } else {
+ js->pub.location = location_string(
+ edg_wll_SourceToString(e->transfer.source),
+ e->transfer.host,
+ e->transfer.src_instance);
+ }
+ }
+ if (USABLE_DATA(res, strict)) {
+ switch (e->transfer.source) {
+ case EDG_WLL_SOURCE_USER_INTERFACE:
+ rep(js->pub.jdl, e->transfer.job); break;
+ case EDG_WLL_SOURCE_JOB_SUBMISSION:
+ rep(js->pub.condor_jdl, e->transfer.job); break;
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ rep(js->pub.rsl, e->transfer.job); break;
+ default:
+ goto bad_event; break;
+
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_ACCEPTED:
+ switch (e->accepted.source) {
+ case EDG_WLL_SOURCE_NETWORK_SERVER:
+ new_state = EDG_WLL_JOB_WAITING; break;
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ if (LRMS_STATE(old_state)) res = RET_LATE;
+ new_state = EDG_WLL_JOB_READY;
+ lm_favour_lrms = 1;
+ break;
+ case EDG_WLL_SOURCE_LRMS:
+ new_state = EDG_WLL_JOB_SCHEDULED; break;
+ default:
+ goto bad_event; break;
+ }
+ if (USABLE(res, strict)) {
+ js->pub.state = new_state;
+ free(js->pub.location);
+ js->pub.location = location_string(
+ edg_wll_SourceToString(e->accepted.source),
+ e->accepted.host,
+ e->accepted.src_instance);
+ }
+ if (USABLE_DATA(res, strict)) {
+ switch (e->accepted.source) {
+ case EDG_WLL_SOURCE_NETWORK_SERVER:
+ break; /* no WM id */
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ rep(js->pub.condorId, e->accepted.local_jobid); break;
+ case EDG_WLL_SOURCE_LRMS:
+ /* XXX localId */
+ rep(js->pub.globusId, e->accepted.local_jobid); break;
+ default:
+ goto bad_event; break;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_REFUSED:
+ switch (e->refused.source) {
+ case EDG_WLL_SOURCE_NETWORK_SERVER:
+ new_state = EDG_WLL_JOB_SUBMITTED; break;
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ new_state = EDG_WLL_JOB_READY; break;
+ case EDG_WLL_SOURCE_LRMS:
+ new_state = EDG_WLL_JOB_READY; break;
+ default:
+ goto bad_event; break;
+ }
+ if (USABLE(res, strict)) {
+ js->pub.state = new_state;
+ rep(js->pub.reason, e->refused.reason);
+
+ free(js->pub.location);
+ js->pub.location = location_string(
+ edg_wll_SourceToString(e->refused.from),
+ e->refused.from_host,
+ e->refused.from_instance);
+ }
+ break;
+ case EDG_WLL_EVENT_ENQUEUED:
+ if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) {
+ switch (e->enQueued.source) {
+ case EDG_WLL_SOURCE_NETWORK_SERVER:
+ new_state = EDG_WLL_JOB_WAITING; break;
+ case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
+ if (LRMS_STATE(old_state)) res = RET_LATE;
+ update_branch_state(e->any.seqcode, NULL,
+ NULL, e->enQueued.job, &js->branch_states);
+ new_state = EDG_WLL_JOB_READY; break;
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ new_state = EDG_WLL_JOB_WAITING; break;
+ default:
+ goto bad_event; break;
+ }
+ } else if (e->enQueued.result == EDG_WLL_ENQUEUED_FAIL) {
+ switch (e->enQueued.source) {
+ case EDG_WLL_SOURCE_NETWORK_SERVER:
+ new_state = EDG_WLL_JOB_WAITING; break;
+ case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
+ new_state = EDG_WLL_JOB_WAITING; break;
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ new_state = old_state; break;
+ default:
+ goto bad_event; break;
+ }
+ } else {
+ /* e->enQueued.result == EDG_WLL_ENQUEUED_START */
+ res = RET_IGNORE;
+ }
+ if (USABLE(res, strict)) {
+ js->pub.state = new_state;
+ rep(js->pub.reason, e->enQueued.reason);
+
+ free(js->pub.location);
+ if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) {
+ js->pub.location = location_string(
+ e->enQueued.queue,
+ e->enQueued.host,
+ e->enQueued.src_instance);
+ if (e->enQueued.source == EDG_WLL_SOURCE_LOG_MONITOR)
+ js->pub.resubmitted = 1;
+ } else {
+ js->pub.location = location_string(
+ edg_wll_SourceToString(e->enQueued.source),
+ e->enQueued.host,
+ e->enQueued.src_instance);
+ }
+ }
+ if (USABLE_DATA(res, strict)) {
+ switch (e->enQueued.source) {
+ case EDG_WLL_SOURCE_NETWORK_SERVER:
+ rep(js->pub.jdl, e->enQueued.job); break;
+ case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
+ if (USABLE_BRANCH(res)) {
+ rep(js->pub.matched_jdl, e->enQueued.job);
+ }
+ break;
+ case EDG_WLL_SOURCE_LOG_MONITOR:
+ /* no interim JDL here */
+ break;
+ default:
+ goto bad_event; break;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_DEQUEUED:
+ switch (e->deQueued.source) {
+ case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
+ new_state = EDG_WLL_JOB_WAITING; break;
+ case EDG_WLL_SOURCE_JOB_SUBMISSION:
+ if (LRMS_STATE(old_state)) res = RET_LATE;
+ new_state = EDG_WLL_JOB_READY; break;
+ default:
+ goto bad_event; break;
+ }
+ if (USABLE(res, strict)) {
+ js->pub.state = new_state;
+ free(js->pub.location);
+ js->pub.location = location_string(
+ edg_wll_SourceToString(e->deQueued.source),
+ e->deQueued.host,
+ e->deQueued.src_instance);
+ }
+ if (USABLE_DATA(res, strict)) {
+ /* no WM/JSS local jobid */
+ }
+ break;
+ case EDG_WLL_EVENT_HELPERCALL:
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ free(js->pub.location);
+ js->pub.location = location_string(
+ e->helperCall.helper_name,
+ e->helperCall.host,
+ e->helperCall.src_instance);
+ /* roles and params used only for debugging */
+ }
+ break;
+ case EDG_WLL_EVENT_HELPERRETURN:
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ free(js->pub.location);
+ js->pub.location = location_string(
+ edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER),
+ e->helperReturn.host,
+ e->helperReturn.src_instance);
+ /* roles and retvals used only for debugging */
+ }
+ break;
+ case EDG_WLL_EVENT_RUNNING:
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_RUNNING;
+ free(js->pub.location);
+ js->pub.location = location_string(
+ edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS),
+ "worknode",
+ e->running.node);
+ }
+ if (USABLE_DATA(res, strict)) {
+ if (USABLE_BRANCH(fine_res)) {
+ rep(js->pub.ce_node, e->running.node);
+ }
+ /* why? if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { */
+ if (e->running.node) {
+ update_branch_state(e->any.seqcode, NULL,
+ e->running.node, NULL, &js->branch_states);
+ add_stringlist(&js->pub.possible_ce_nodes,
+ e->running.node);
+ }
+ /* } */
+ }
+ break;
+ case EDG_WLL_EVENT_REALLYRUNNING:
+ /* consistence check -- should not receive two contradicting ReallyRunning's within single
+ deep resub cycle */
+ if (fine_res == RET_BADBRANCH) {
+ syslog(LOG_ERR,"ReallyRunning on bad branch %s",
+ e->any.source == EDG_WLL_SOURCE_LOG_MONITOR ? e->reallyRunning.wn_seq : e->any.seqcode);
+ break;
+ }
+ /* select the branch unless TOOOLD, i.e. before deep resubmission */
+ if (!(res == RET_LATE && fine_res == RET_TOOOLD)) {
+ if (e->any.source == EDG_WLL_SOURCE_LRMS) {
+ rep(js->branch_tag_seqcode, e->any.seqcode);
+ if (res == RET_OK) {
+ rep(js->last_branch_seqcode, e->any.seqcode);
+ js->pub.state = EDG_WLL_JOB_RUNNING;
+ }
+ }
+ if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
+ if (!js->branch_tag_seqcode) {
+ if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
+ rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq);
+ } else
+ goto bad_event;
+ }
+ if (!js->last_branch_seqcode) {
+ if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
+ if (res == RET_OK) {
+ rep(js->last_branch_seqcode, e->reallyRunning.wn_seq);
+ js->pub.state = EDG_WLL_JOB_RUNNING;
+ }
+ } else
+ goto bad_event;
+ }
+ }
+
+ /* XXX: best effort -- if we are lucky, ReallyRunning is on the last shallow cycle,
+ so we take in account events processed so far */
+ if (res == RET_LATE && !js->last_branch_seqcode) {
+ if (same_branch(js->last_seqcode,js->branch_tag_seqcode))
+ rep(js->last_branch_seqcode,js->last_seqcode);
+ }
+
+ js->pub.payload_running = 1;
+ load_branch_state(js);
+ }
+#if 0
+ if (USABLE_DATA(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_RUNNING;
+ free(js->pub.location);
+ js->pub.location = location_string(
+ edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS),
+ "worknode",
+ e->running.node);
+ js->pub.payload_running = 1;
+ if (e->any.source == EDG_WLL_SOURCE_LRMS) {
+ rep(js->branch_tag_seqcode, e->any.seqcode);
+ rep(js->last_branch_seqcode, e->any.seqcode);
+ }
+ if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
+ if (!js->branch_tag_seqcode) {
+ if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
+ rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq);
+ } else
+ goto bad_event;
+ }
+ if (!js->last_branch_seqcode) {
+ if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
+ rep(js->last_branch_seqcode, e->reallyRunning.wn_seq);
+ } else
+ goto bad_event;
+ }
+ }
+ load_branch_state(js);
+ }
+#endif
+ break;
+ case EDG_WLL_EVENT_SUSPEND:
+ if (USABLE(res, strict)) {
+ if (js->pub.state == EDG_WLL_JOB_RUNNING) {
+ js->pub.suspended = 1;
+ rep(js->pub.suspend_reason, e->suspend.reason);
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_RESUME:
+ if (USABLE(res, strict)) {
+ if (js->pub.state == EDG_WLL_JOB_RUNNING) {
+ js->pub.suspended = 0;
+ rep(js->pub.suspend_reason, e->resume.reason);
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_RESUBMISSION:
+ if (USABLE(res, strict)) {
+ if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) {
+ rep(js->pub.reason, e->resubmission.reason);
+ }
+ }
+ if (USABLE_DATA(res, strict)) {
+ if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) {
+ js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB;
+ }
+ else
+ if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB &&
+ e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) {
+ reset_branch(js, e);
+ }
+ else
+ if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) {
+ js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW;
+ // deep resubmit stays forever deadline for events
+ // rep(js->deep_resubmit_seqcode, NULL);
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_DONE:
+ if (e->any.source == EDG_WLL_SOURCE_LRMS) {
+ /* Done from JobWrapper is not sufficient for transition
+ * to DONE state according its current definition */
+ break;
+ }
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_DONE;
+ rep(js->pub.reason, e->done.reason);
+ if (fine_res == RET_GOODBRANCH) {
+ js->pub.payload_running = 0;
+ }
+ switch (e->done.status_code) {
+ case EDG_WLL_DONE_CANCELLED:
+ js->pub.state = EDG_WLL_JOB_CANCELLED;
+ case EDG_WLL_DONE_OK:
+ rep(js->pub.location, "none"); break;
+ default:
+ free(js->pub.location);
+ js->pub.location = location_string(
+ edg_wll_SourceToString(e->done.source),
+ e->done.host,
+ e->done.src_instance);
+ }
+ }
+ if (USABLE_DATA(res, strict)) {
+ switch (e->done.status_code) {
+ case EDG_WLL_DONE_OK:
+ js->pub.exit_code = e->done.exit_code;
+ js->pub.done_code = EDG_WLL_STAT_OK; break;
+ case EDG_WLL_DONE_CANCELLED:
+ js->pub.exit_code = 0;
+ js->pub.done_code = EDG_WLL_STAT_CANCELLED; break;
+ case EDG_WLL_DONE_FAILED:
+ js->pub.exit_code = 0;
+ js->pub.done_code = EDG_WLL_STAT_FAILED; break;
+ default:
+ goto bad_event; break;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_CANCEL:
+ if (fine_res != RET_BADBRANCH) {
+ if (js->last_cancel_seqcode != NULL &&
+ edg_wll_compare_seq(e->any.seqcode, js->last_cancel_seqcode) < 0) {
+ res = RET_LATE;
+ }
+ }
+ else {
+ res = RET_LATE;
+ }
+ if (USABLE(res, strict)) {
+ switch (e->cancel.status_code) {
+ case EDG_WLL_CANCEL_REQ:
+ js->pub.cancelling = 1; break;
+ case EDG_WLL_CANCEL_DONE:
+ js->pub.state = EDG_WLL_JOB_CANCELLED;
+ js->pub.remove_from_proxy = 1;
+ rep(js->pub.reason, e->cancel.reason);
+ rep(js->last_seqcode, e->any.seqcode);
+ rep(js->pub.location, "none");
+ /* fall though */
+ case EDG_WLL_CANCEL_ABORT:
+ js->pub.cancelling = 0; break;
+ default:
+ /* do nothing */
+ break;
+
+ }
+ }
+ if (USABLE_DATA(res, strict)) {
+ rep(js->pub.cancelReason, e->cancel.reason);
+ }
+ break;
+ case EDG_WLL_EVENT_ABORT:
+ // XXX: accept Abort from WM in every case
+ // setting res make USABLE macro true (awful !!)
+ if (e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) res = RET_OK;
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_ABORTED;
+ js->pub.remove_from_proxy = 1;
+ rep(js->pub.reason, e->abort.reason);
+ rep(js->pub.location, "none");
+
+ reset_branch(js, e);
+ }
+ break;
+
+ case EDG_WLL_EVENT_CLEAR:
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_CLEARED;
+ js->pub.remove_from_proxy = 1;
+ rep(js->pub.location, "none");
+ switch (e->clear.reason) {
+ case EDG_WLL_CLEAR_USER:
+ rep(js->pub.reason, "user retrieved output sandbox");
+ break;
+ case EDG_WLL_CLEAR_TIMEOUT:
+ rep(js->pub.reason, "timed out, resource purge forced");
+ break;
+ case EDG_WLL_CLEAR_NOOUTPUT:
+ rep(js->pub.reason, "no output was generated");
+ break;
+ default:
+ goto bad_event; break;
+
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_PURGE:
+ /* ignore, meta-information only */
+ break;
+ case EDG_WLL_EVENT_MATCH:
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ js->pub.location = location_string(
+ edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER),
+ e->match.host,
+ e->match.src_instance);
+ }
+ if (USABLE_DATA(res, strict)) {
+ if (USABLE_BRANCH(fine_res)) {
+ rep(js->pub.destination, e->match.dest_id);
+ }
+ if (e->match.dest_id) {
+ update_branch_state(e->any.seqcode, e->match.dest_id,
+ NULL, NULL, &js->branch_states);
+ add_stringlist(&js->pub.possible_destinations,
+ e->match.dest_id);
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_PENDING:
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ rep(js->pub.reason, e->pending.reason);
+ js->pub.location = location_string(
+ edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER),
+ e->match.host,
+ e->match.src_instance);
+ }
+ break;
+ case EDG_WLL_EVENT_REGJOB:
+ if (USABLE(res, strict)) {
+ js->pub.state = EDG_WLL_JOB_SUBMITTED;
+ }
+ if (USABLE_DATA(res, strict)) {
+ rep_cond(js->pub.jdl, e->regJob.jdl);
+ edg_wlc_JobIdFree(js->pub.parent_job);
+ edg_wlc_JobIdDup(e->regJob.parent,
+ &js->pub.parent_job);
+ rep(js->pub.network_server, e->regJob.ns);
+ js->pub.children_num = e->regJob.nsubjobs;
+ switch (e->regJob.jobtype) {
+ case EDG_WLL_REGJOB_DAG:
+ case EDG_WLL_REGJOB_PARTITIONED:
+ js->pub.jobtype = EDG_WLL_STAT_DAG;
+ js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num;
+ break;
+ case EDG_WLL_REGJOB_COLLECTION:
+ js->pub.jobtype = EDG_WLL_STAT_COLLECTION;
+ js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num;
+ break;
+ default:
+ break;
+ }
+ rep(js->pub.seed, e->regJob.seed);
+ }
+ break;
+ case EDG_WLL_EVENT_USERTAG:
+ if (USABLE_DATA(res, strict)) {
+ if (e->userTag.name != NULL && e->userTag.value != NULL) {
+ add_taglist(&js->pub.user_tags,
+ e->userTag.name, e->userTag.value);
+ } else {
+ goto bad_event;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_LISTENER:
+ /* ignore, listener port is not part of job status */
+ break;
+ case EDG_WLL_EVENT_CURDESCR:
+ case EDG_WLL_EVENT_CHKPT:
+ case EDG_WLL_EVENT_CHANGEACL:
+ /* ignore, only for event log */
+ break;
+ case EDG_WLL_EVENT_COLLECTIONSTATE:
+ new_state = edg_wll_StringToStat(e->collectionState.state);
+ if (USABLE(res, strict)) {
+ js->pub.state = new_state;
+ if (new_state == EDG_WLL_JOB_DONE)
+ js->pub.done_code = e->collectionState.done_code;
+ }
+ break;
+ default:
+ goto bad_event;
+ break;
+ }
+
+ if (USABLE(res,strict)) {
+ js->pub.lastUpdateTime = e->any.timestamp;
+ if (old_state != js->pub.state) {
+ js->pub.stateEnterTime = js->pub.lastUpdateTime;
+ js->pub.stateEnterTimes[1 + js->pub.state]
+ = (int)js->pub.lastUpdateTime.tv_sec;
+ }
+ if (e->any.type == EDG_WLL_EVENT_CANCEL) {
+ rep(js->last_cancel_seqcode, e->any.seqcode);
+ } else {
+
+/* the first set of LM events (Accept, Transfer/XX -> LRMS)
+ should not should shift the state (to Ready, Scheduled) but NOT to
+ update js->last_seqcode completely, in order not to block following
+ LRMS events which are likely to arrive later but should still affect
+ job state (as there may be no more LM events due to the Condor bug).
+ However, don't ignore the incoming seqcode completely, to catch up
+ with possibly delayed WM/JSS events */
+
+ if (lm_favour_lrms) {
+ free(js->last_seqcode);
+ js->last_seqcode = set_component_seqcode(e->any.seqcode,EDG_WLL_SOURCE_LOG_MONITOR,0);
+ }
+ else rep(js->last_seqcode, e->any.seqcode);
+ }
+
+ if (js->pub.state != EDG_WLL_JOB_RUNNING) {
+ js->pub.suspended = 0;
+ rep(js->pub.suspend_reason, NULL);
+ }
+
+ if (fine_res == RET_GOODBRANCH) {
+ rep(js->last_branch_seqcode, e->any.seqcode);
+ }
+ }
+
+ if (USABLE_DATA(res,strict)) {
+ if (e->any.source == EDG_WLL_SOURCE_NETWORK_SERVER &&
+ js->pub.network_server == NULL) {
+ char *inst;
+ inst = e->any.src_instance;
+ trio_asprintf(&js->pub.network_server, "%s%s%s",
+ e->any.host,
+ inst != NULL ? ":" : " ",
+ inst != NULL ? inst : "");
+ }
+ }
+
+ return res;
+
+bad_event:
+ badEvent(js,e,ev_seq);
+ return RET_SUSPECT;
+}
+
+int add_stringlist(char ***lptr, const char *new_item)
+{
+ char **itptr;
+ int i;
+
+ if (*lptr == NULL) {
+ itptr = (char **) malloc(2*sizeof(char *));
+ itptr[0] = strdup(new_item);
+ itptr[1] = NULL;
+ *lptr = itptr;
+ return 1;
+ } else {
+ for (i = 0, itptr = *lptr; itptr[i] != NULL; i++);
+ itptr = (char **) realloc(*lptr, (i+2)*sizeof(char *));
+ if (itptr != NULL) {
+ itptr[i] = strdup(new_item);
+ itptr[i+1] = NULL;
+ *lptr = itptr;
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+}
+
+void destroy_intJobStat_extension(intJobStat *p)
+{
+ if (p->last_seqcode) free(p->last_seqcode);
+ if (p->last_cancel_seqcode) free(p->last_cancel_seqcode);
+ if (p->branch_tag_seqcode) free(p->branch_tag_seqcode);
+ if (p->last_branch_seqcode) free(p->last_branch_seqcode);
+ if (p->deep_resubmit_seqcode) free(p->deep_resubmit_seqcode);
+ free_branch_state(&p->branch_states);
+ memset(p,0,sizeof(*p));
+}
+
+void destroy_intJobStat(intJobStat *p)
+{
+ edg_wll_FreeStatus(&p->pub);
+ destroy_intJobStat_extension(p);
+ memset(p, 0, sizeof(intJobStat));
+}
+
+void init_intJobStat(intJobStat *p)
+{
+ memset(p, 0, sizeof(intJobStat));
+ p->pub.jobtype = -1 /* why? EDG_WLL_STAT_SIMPLE */;
+ p->pub.children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+ p->pub.children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES;
+ p->pub.stateEnterTimes = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+ p->pub.stateEnterTimes[0] = EDG_WLL_NUMBER_OF_STATCODES;
+ /* TBD: generate */
+}
+
--- /dev/null
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <syslog.h>
+
+#include "glite/lb/context-int.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+/* TBD: share in whole logging or workload */
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+// XXX: maybe not needed any more
+// if not, remove also last_condor_event_timestamp from intJobStat
+static int compare_timestamps(struct timeval a, struct timeval b)
+{
+ if ( (a.tv_sec > b.tv_sec) ||
+ ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1;
+ if ( (a.tv_sec < b.tv_sec) ||
+ ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1;
+ return 0;
+}
+
+
+// XXX move this defines into some common place to be reusable
+#define USABLE(res) ((res) == RET_OK)
+#define USABLE_DATA(res) (1)
+#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
+#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } }
+
+int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
+{
+ edg_wll_JobStatCode old_state = js->pub.state;
+ int res = RET_OK;
+
+
+ if ((js->last_seqcode != NULL) &&
+ (edg_wll_compare_condor_seq(js->last_seqcode, e->any.seqcode) > 0) ) {
+ res = RET_LATE;
+ }
+
+ switch (e->any.type) {
+ case EDG_WLL_EVENT_REGJOB:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_SUBMITTED;
+ rep(js->pub.condor_status, "Idle");
+ }
+ if (USABLE_DATA(res)) {
+ rep(js->pub.jdl, e->regJob.jdl);
+ }
+ break;
+ case EDG_WLL_EVENT_CONDORMATCH:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_READY;
+ rep(js->pub.condor_status, "Idle");
+ }
+ if (USABLE_DATA(res)) {
+ rep_cond(js->pub.condor_dest_host,e->CondorMatch.dest_host);
+ }
+ break;
+ case EDG_WLL_EVENT_CONDORREJECT:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_ABORTED;
+ rep(js->pub.condor_status, "Unexpanded");
+ }
+ if (USABLE_DATA(res)) {
+ switch(e->CondorReject.status_code) {
+ case EDG_WLL_CONDORREJECT_NOMATCH:
+ rep(js->pub.condor_reason,"No match found.");
+ break;
+ case EDG_WLL_CONDORREJECT_OTHER:
+ default:
+ break;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_CONDORSHADOWSTARTED:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_READY;
+ rep(js->pub.condor_status, "Idle");
+ }
+ if (USABLE_DATA(res)) {
+ switch (get_condor_event_source(e->any.seqcode)) {
+ case EDG_WLL_CONDOR_EVENT_SOURCE_SCHED:
+ js->pub.condor_shadow_pid = e->CondorShadowStarted.shadow_pid;
+ break;
+ default:
+ break;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_CONDORSHADOWEXITED:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_DONE;
+ rep(js->pub.condor_status, "Completed");
+ }
+ if (USABLE_DATA(res)) {
+ switch (get_condor_event_source(e->any.seqcode)) {
+ case EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW:
+ js->pub.condor_shadow_exit_status = e->CondorShadowExited.shadow_exit_status;
+ break;
+ default:
+ break;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_CONDORSTARTERSTARTED:
+ if (USABLE(res)) {
+ switch (get_condor_event_source(e->any.seqcode)) {
+ case EDG_WLL_CONDOR_EVENT_SOURCE_START:
+ js->pub.state = EDG_WLL_JOB_SCHEDULED;
+ rep(js->pub.condor_status, "Idle");
+ break;
+ case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER:
+ js->pub.state = EDG_WLL_JOB_RUNNING;
+ rep(js->pub.condor_status, "Running");
+ break;
+ default:
+ break;
+ }
+ }
+ if (USABLE_DATA(res)) {
+ switch (get_condor_event_source(e->any.seqcode)) {
+ case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER:
+ rep(js->pub.condor_universe, e->CondorStarterStarted.universe);
+ js->pub.condor_starter_pid = e->CondorStarterStarted.starter_pid;
+ break;
+ default:
+ break;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_CONDORSTARTEREXITED:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_DONE;
+ rep(js->pub.condor_status, "Completed");
+ }
+ if (USABLE_DATA(res)) {
+ switch (get_condor_event_source(e->any.seqcode)) {
+ case EDG_WLL_CONDOR_EVENT_SOURCE_START:
+ js->pub.condor_starter_pid = e->CondorStarterExited.starter_pid;
+ js->pub.condor_starter_exit_status = e->CondorStarterExited.starter_exit_status;
+ break;
+ case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER:
+ js->pub.condor_starter_pid = e->CondorStarterExited.starter_pid;
+ js->pub.condor_job_pid = e->CondorStarterExited.job_pid;
+ js->pub.condor_job_exit_status = e->CondorStarterExited.job_exit_status;
+ break;
+ default:
+ break;
+ }
+ }
+ break;
+ case EDG_WLL_EVENT_CONDORRESOURCEUSAGE:
+ if (USABLE(res)) {
+ }
+ if (USABLE_DATA(res)) {
+ }
+ break;
+ case EDG_WLL_EVENT_CONDORERROR:
+ if (USABLE(res)) {
+ }
+ if (USABLE_DATA(res)) {
+ }
+ break;
+
+ default:
+ break;
+ }
+
+/* XXX : just debug output - remove */
+
+ printf("processEvent_Condor(): %s (%s), state: %s --> %s\n ",
+ edg_wll_EventToString(e->any.type),
+ (res == RET_LATE) ? "RET_LATE" : "RET_OK",
+ edg_wll_StatToString(old_state),
+ edg_wll_StatToString(js->pub.state) );
+ printf("\t%s\n",e->any.seqcode);
+ printf("\t(last=%s)\n",js->last_seqcode);
+
+/*----------------------------------*/
+
+ if (USABLE(res)) {
+ rep(js->last_seqcode, e->any.seqcode);
+
+ js->pub.lastUpdateTime = e->any.timestamp;
+ if (old_state != js->pub.state) {
+ js->pub.stateEnterTime = js->pub.lastUpdateTime;
+ js->pub.stateEnterTimes[1 + js->pub.state]
+ = (int)js->pub.lastUpdateTime.tv_sec;
+ }
+ }
+ if (! js->pub.location) js->pub.location = strdup("this is CONDOR");
+
+
+ return RET_OK;
+}
+
--- /dev/null
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <syslog.h>
+
+#include "glite/lb/context-int.h"
+
+#include "glite/lbu/trio.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+/* TBD: share in whole logging or workload */
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+// XXX: maybe not needed any more
+// if not, remove also last_pbs_event_timestamp from intJobStat
+static int compare_timestamps(struct timeval a, struct timeval b)
+{
+ if ( (a.tv_sec > b.tv_sec) ||
+ ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1;
+ if ( (a.tv_sec < b.tv_sec) ||
+ ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1;
+ return 0;
+}
+
+
+// XXX move this defines into some common place to be reusable
+#define USABLE(res) ((res) == RET_OK)
+#define USABLE_DATA(res) (1)
+#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
+#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } }
+
+int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
+{
+ edg_wll_JobStatCode old_state = js->pub.state;
+ int res = RET_OK;
+
+
+ if ((js->last_seqcode != NULL) &&
+ (edg_wll_compare_pbs_seq(js->last_seqcode, e->any.seqcode) > 0) ) {
+ res = RET_LATE;
+ }
+
+ switch (e->any.type) {
+ case EDG_WLL_EVENT_REGJOB:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_SUBMITTED;
+ rep(js->pub.pbs_state, "Q");
+ }
+ if (USABLE_DATA(res)) {
+ }
+ break;
+ case EDG_WLL_EVENT_PBSQUEUED:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ rep(js->pub.pbs_state, "Q");
+ }
+ if (USABLE_DATA(res)) {
+ if (!js->pub.pbs_queue)
+ js->pub.pbs_queue = strdup(e->PBSQueued.queue);
+ assert(!strcmp(js->pub.pbs_queue, e->PBSQueued.queue));
+ rep_cond(js->pub.pbs_owner,e->PBSQueued.owner);
+ rep_cond(js->pub.pbs_name,e->PBSQueued.name);
+ }
+ break;
+ case EDG_WLL_EVENT_PBSMATCH:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_READY;
+ rep(js->pub.pbs_state, "Q");
+ }
+ if (USABLE_DATA(res)) {
+ rep_cond(js->pub.pbs_dest_host,e->PBSMatch.dest_host);
+ }
+ break;
+ case EDG_WLL_EVENT_PBSPENDING:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ rep(js->pub.pbs_state, "Q");
+ js->pbs_reruning = 0; // reset possible reruning flag
+ }
+ if (USABLE_DATA(res)) {
+ rep_cond(js->pub.pbs_reason,e->PBSPending.reason);
+ }
+ break;
+ case EDG_WLL_EVENT_PBSRUN:
+ if (USABLE(res)) {
+ switch (get_pbs_event_source(e->any.seqcode)) {
+ case EDG_WLL_PBS_EVENT_SOURCE_SERVER:
+ js->pub.state = EDG_WLL_JOB_SCHEDULED;
+ rep(js->pub.pbs_state, "Q");
+ break;
+ case EDG_WLL_PBS_EVENT_SOURCE_MOM:
+ js->pub.state = EDG_WLL_JOB_RUNNING;
+ rep(js->pub.pbs_state, "R");
+ break;
+ default:
+ assert(0); // running event from strange source
+ break;
+ }
+ }
+ if (USABLE_DATA(res)) {
+ rep_cond(js->pub.pbs_scheduler, e->PBSRun.scheduler);
+ rep_cond(js->pub.pbs_dest_host, e->PBSRun.dest_host);
+ js->pub.pbs_pid = e->PBSRun.pid;
+ }
+ break;
+ case EDG_WLL_EVENT_PBSRERUN:
+ if (USABLE(res)) {
+ switch (get_pbs_event_source(e->any.seqcode)) {
+ case EDG_WLL_PBS_EVENT_SOURCE_SERVER:
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ rep(js->pub.pbs_state, "Q");
+ break;
+ case EDG_WLL_PBS_EVENT_SOURCE_MOM:
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ rep(js->pub.pbs_state, "E");
+ js->pbs_reruning = 1;
+ break;
+ default:
+ assert(0); // running event from strande source
+ break;
+ }
+ }
+ if (USABLE_DATA(res)) {
+ }
+ break;
+ case EDG_WLL_EVENT_PBSDONE:
+ if (USABLE(res)) {
+ switch (get_pbs_event_source(e->any.seqcode)) {
+ case EDG_WLL_PBS_EVENT_SOURCE_SERVER:
+ js->pub.state = EDG_WLL_JOB_DONE;
+ js->pub.done_code = EDG_WLL_STAT_OK;
+ rep(js->pub.pbs_state, "C");
+ break;
+ case EDG_WLL_PBS_EVENT_SOURCE_MOM:
+ if (!js->pbs_reruning) {
+ js->pub.state = EDG_WLL_JOB_DONE;
+ js->pub.done_code = EDG_WLL_STAT_OK;
+ rep(js->pub.pbs_state, "C");
+ }
+ break;
+ default:
+ assert(0); //done event from strange source
+ break;
+ }
+ }
+ if (USABLE_DATA(res)) {
+ js->pub.pbs_exit_status = e->PBSDone.exit_status;
+ }
+ break;
+ case EDG_WLL_EVENT_PBSRESOURCEUSAGE:
+ if (USABLE(res)) {
+ // signalize state done, done_code uknown
+ js->pub.state = EDG_WLL_JOB_DONE;
+ rep(js->pub.pbs_state, "C");
+ }
+ if (USABLE_DATA(res)) {
+ char *new_resource_usage;
+
+ trio_asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]",
+ (js->pub.pbs_resource_usage) ? js->pub.pbs_resource_usage : "",
+ (js->pub.pbs_resource_usage) ? "\n": "",
+ e->PBSResourceUsage.name,
+ e->PBSResourceUsage.quantity,
+ e->PBSResourceUsage.unit);
+
+ if (js->pub.pbs_resource_usage) free(js->pub.pbs_resource_usage);
+ js->pub.pbs_resource_usage = new_resource_usage;
+ }
+ break;
+ case EDG_WLL_EVENT_PBSERROR:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_DONE;
+ js->pub.done_code = EDG_WLL_STAT_FAILED;
+ rep(js->pub.pbs_state, "C");
+ }
+ if (USABLE_DATA(res)) {
+ char *new_error_desc;
+
+ trio_asprintf(&new_error_desc,"%s%s\t%s",
+ (js->pub.pbs_error_desc) ? js->pub.pbs_error_desc : "",
+ (js->pub.pbs_error_desc) ? "\n" : "",
+ e->PBSError.error_desc);
+
+ if (js->pub.pbs_error_desc) free(js->pub.pbs_error_desc);
+ js->pub.pbs_error_desc = new_error_desc;
+ }
+ break;
+
+ default:
+ break;
+ }
+
+/* XXX : just debug output - remove */
+
+ printf("processEvent_PBS(): %s (%s), state: %s --> %s\n ",
+ edg_wll_EventToString(e->any.type),
+ (res == RET_LATE) ? "RET_LATE" : "RET_OK",
+ edg_wll_StatToString(old_state),
+ edg_wll_StatToString(js->pub.state) );
+ printf("\t%s\n",e->any.seqcode);
+ printf("\t(last=%s)\n",js->last_seqcode);
+
+/*----------------------------------*/
+
+ if (USABLE(res)) {
+ rep(js->last_seqcode, e->any.seqcode);
+
+ js->pub.lastUpdateTime = e->any.timestamp;
+ if (old_state != js->pub.state) {
+ js->pub.stateEnterTime = js->pub.lastUpdateTime;
+ js->pub.stateEnterTimes[1 + js->pub.state]
+ = (int)js->pub.lastUpdateTime.tv_sec;
+ }
+ }
+ if (! js->pub.location) js->pub.location = strdup("this is PBS");
+
+
+ return RET_OK;
+}
+
--- /dev/null
+#ident "$Header$"
+
+#include <stdio.h>
+#include <string.h>
+#include <syslog.h>
+#include <assert.h>
+
+#include "glite/lbu/trio.h"
+#include "glite/lb/context-int.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+
+/*
+#include <inttypes.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <stdarg.h>
+#include <regex.h>
+#include <syslog.h>
+
+#include "glite/jobid/cjobid.h"
+#include "glite/lbu/trio.h"
+#include "glite/lbu/db.h"
+#include "glite/lb/context-int.h"
+
+#include "store.h"
+#include "index.h"
+#include "jobstat.h"
+#include "get_events.h"
+*/
+
+int component_seqcode(const char *a, edg_wll_Source index)
+{
+ unsigned int c[EDG_WLL_SOURCE__LAST];
+ int res;
+ char sc[EDG_WLL_SEQ_SIZE];
+
+ if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+ else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a);
+
+ res = sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+ &c[EDG_WLL_SOURCE_USER_INTERFACE],
+ &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &c[EDG_WLL_SOURCE_BIG_HELPER],
+ &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &c[EDG_WLL_SOURCE_LOG_MONITOR],
+ &c[EDG_WLL_SOURCE_LRMS],
+ &c[EDG_WLL_SOURCE_APPLICATION],
+ &c[EDG_WLL_SOURCE_LB_SERVER]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sc); */
+ fprintf(stderr, "unparsable sequence code %s\n", sc);
+ return -1;
+ }
+
+ return(c[index]);
+}
+
+char * set_component_seqcode(char *a,edg_wll_Source index,int val)
+{
+ unsigned int c[EDG_WLL_SOURCE__LAST];
+ int res;
+ char *ret;
+ char sc[EDG_WLL_SEQ_SIZE];
+
+ if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+ else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a);
+
+ res = sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+ &c[EDG_WLL_SOURCE_USER_INTERFACE],
+ &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &c[EDG_WLL_SOURCE_BIG_HELPER],
+ &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &c[EDG_WLL_SOURCE_LOG_MONITOR],
+ &c[EDG_WLL_SOURCE_LRMS],
+ &c[EDG_WLL_SOURCE_APPLICATION],
+ &c[EDG_WLL_SOURCE_LB_SERVER]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sc); */
+ fprintf(stderr, "unparsable sequence code %s\n", sc);
+ return NULL;
+ }
+
+ c[index] = val;
+ trio_asprintf(&ret,"UI=%06d:NS=%010d:WM=%06d:BH=%010d:JSS=%06d"
+ ":LM=%06d:LRMS=%06d:APP=%06d:LBS=%06d",
+ c[EDG_WLL_SOURCE_USER_INTERFACE],
+ c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ c[EDG_WLL_SOURCE_BIG_HELPER],
+ c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ c[EDG_WLL_SOURCE_LOG_MONITOR],
+ c[EDG_WLL_SOURCE_LRMS],
+ c[EDG_WLL_SOURCE_APPLICATION],
+ c[EDG_WLL_SOURCE_LB_SERVER]);
+ return ret;
+}
+
+int before_deep_resubmission(const char *a, const char *b)
+{
+ if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) <
+ component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
+ return(1);
+ else
+ return(0);
+
+}
+
+int same_branch(const char *a, const char *b)
+{
+ if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ==
+ component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
+ return(1);
+ else
+ return(0);
+}
+
+int edg_wll_compare_pbs_seq(const char *a,const char *b)
+{
+ char timestamp_a[14], pos_a[10], src_a;
+ char timestamp_b[14], pos_b[10], src_b;
+ int ev_code_a, ev_code_b;
+ int res;
+
+ res = sscanf(a,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_a, pos_a, &ev_code_a, &src_a);
+
+ if (res != 4) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", a); */
+ fprintf(stderr, "unparsable sequence code %s\n", a);
+ return -1;
+ }
+
+ res = sscanf(b,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_b, pos_b, &ev_code_b, &src_b);
+
+ if (res != 4) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", b); */
+ fprintf(stderr, "unparsable sequence code %s\n", b);
+ return -1;
+ }
+
+ /* wild card for PBSJobReg - this event should always come as firt one */
+ /* bacause it hold job.type, which is necessary for further event processing */
+ if (ev_code_a == EDG_WLL_EVENT_REGJOB) return -1;
+ if (ev_code_b == EDG_WLL_EVENT_REGJOB) return 1;
+
+ /* sort event w.t.r. to timestamps */
+ if ((res = strcmp(timestamp_a,timestamp_b)) != 0) {
+ return res;
+ }
+ else {
+ /* if timestamps equal, sort if w.t.r. to file position */
+ /* if you both events come from the same log file */
+ if (src_a == src_b) {
+ /* zero mean in fact duplicate events in log */
+ return strcmp(pos_a,pos_b);
+ }
+ /* if the events come from diffrent log files */
+ /* it is possible to prioritize some src log file */
+ else {
+ /* prioritize events from pbs_mom */
+ if (src_a == 'm') return 1;
+ if (src_b == 'm') return -1;
+
+ /* then prioritize events from pbs_server */
+ if (src_a == 's') return 1;
+ if (src_b == 's') return -1;
+
+ /* other priorities comes here... */
+ }
+ }
+
+ return 0;
+}
+
+edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) {
+ switch (pbs_seq_num[EDG_WLL_SEQ_PBS_SIZE - 2]) {
+ case 'c': return(EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER);
+ case 's': return(EDG_WLL_PBS_EVENT_SOURCE_SERVER);
+ case 'm': return(EDG_WLL_PBS_EVENT_SOURCE_MOM);
+ case 'a': return(EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING);
+ default: return(EDG_WLL_PBS_EVENT_SOURCE_UNDEF);
+ }
+}
+
+edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) {
+ switch (condor_seq_num[EDG_WLL_SEQ_CONDOR_SIZE - 2]) {
+ case 'L': return(EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR);
+ case 'M': return(EDG_WLL_CONDOR_EVENT_SOURCE_MASTER);
+ case 'm': return(EDG_WLL_CONDOR_EVENT_SOURCE_MATCH);
+ case 'N': return(EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR);
+ case 'C': return(EDG_WLL_CONDOR_EVENT_SOURCE_SCHED);
+ case 'H': return(EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW);
+ case 's': return(EDG_WLL_CONDOR_EVENT_SOURCE_STARTER);
+ case 'S': return(EDG_WLL_CONDOR_EVENT_SOURCE_START);
+ case 'j': return(EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE);
+ default: return(EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF);
+ }
+}
+
+int edg_wll_compare_seq(const char *a, const char *b)
+{
+ unsigned int c[EDG_WLL_SOURCE__LAST];
+ unsigned int d[EDG_WLL_SOURCE__LAST];
+ int res, i;
+ char sca[EDG_WLL_SEQ_SIZE], scb[EDG_WLL_SEQ_SIZE];
+
+
+ if ( (strstr(a,"TIMESTAMP=") == a) && (strstr(b,"TIMESTAMP=") == b) )
+ return edg_wll_compare_pbs_seq(a,b);
+
+ if (!strstr(a, "LBS")) snprintf(sca,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
+ else snprintf(sca,EDG_WLL_SEQ_SIZE,"%s",a);
+ if (!strstr(b, "LBS")) snprintf(scb,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",b);
+ else snprintf(scb,EDG_WLL_SEQ_SIZE,"%s",b);
+
+ assert(EDG_WLL_SOURCE__LAST == 10);
+
+ res = sscanf(sca, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+ &c[EDG_WLL_SOURCE_USER_INTERFACE],
+ &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &c[EDG_WLL_SOURCE_BIG_HELPER],
+ &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &c[EDG_WLL_SOURCE_LOG_MONITOR],
+ &c[EDG_WLL_SOURCE_LRMS],
+ &c[EDG_WLL_SOURCE_APPLICATION],
+ &c[EDG_WLL_SOURCE_LB_SERVER]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", sca); */
+ fprintf(stderr, "unparsable sequence code %s\n", sca);
+ return -1;
+ }
+
+ res = sscanf(scb, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
+ &d[EDG_WLL_SOURCE_USER_INTERFACE],
+ &d[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &d[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &d[EDG_WLL_SOURCE_BIG_HELPER],
+ &d[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &d[EDG_WLL_SOURCE_LOG_MONITOR],
+ &d[EDG_WLL_SOURCE_LRMS],
+ &d[EDG_WLL_SOURCE_APPLICATION],
+ &d[EDG_WLL_SOURCE_LB_SERVER]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", scb); */
+ fprintf(stderr, "unparsable sequence code %s\n", scb);
+ return 1;
+ }
+
+ for (i = EDG_WLL_SOURCE_USER_INTERFACE ; i < EDG_WLL_SOURCE__LAST; i++) {
+ if (c[i] < d[i]) return -1;
+ if (c[i] > d[i]) return 1;
+ }
+
+ return 0;
+}
+
+
+int compare_events_by_seq(const void *a, const void *b)
+{
+ const edg_wll_Event *e = (edg_wll_Event *) a;
+ const edg_wll_Event *f = (edg_wll_Event *) b;
+ int ret;
+
+
+ ret = edg_wll_compare_seq(e->any.seqcode, f->any.seqcode);
+ if (ret) return ret;
+
+ if (e->any.timestamp.tv_sec < f->any.timestamp.tv_sec) return -1;
+ if (e->any.timestamp.tv_sec > f->any.timestamp.tv_sec) return 1;
+ if (e->any.timestamp.tv_usec < f->any.timestamp.tv_usec) return -1;
+ if (e->any.timestamp.tv_usec > f->any.timestamp.tv_usec) return 1;
+ return 0;
+}
+