Builds fine.
default all: compile
-# disable lb plugin in order to build also with 3.1 JP
-BUILD_PLUGIN=yes
-
GLITE_LB_SERVER_WITH_WS=yes
ifdef LB_STANDALONE
LDFLAGS:= -L${stagedir}/lib
endif
+# LB_MACHINE_LIB:=${stagedir}/lib/libglite_lb_statemachine.a
+LB_MACHINE_LIB:=-L${stagedir}/lib -lglite_lb_statemachine
+
ifdef LBS_DB_PROFILE
CFLAGS:=${CFLAGS} -DLBS_DB_PROFILE
endif
${VOMS_LIBS}
COMMON_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} -lglite_security_gss_${nothrflavour} -lglite_lbu_trio
-PLUGIN_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} -lglite_lbu_trio \
- ${classadslib} -lstdc++ ${expatlib} -lexpat
-
-PLUGIN_LOBJS:= lb_plugin.lo jobstat_supp.lo process_event.lo process_event_pbs.lo process_event_condor.lo
BKSERVER_BASE_OBJS:= \
- bkserverd.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o process_event.o process_event_pbs.o process_event_condor.o \
+ bkserverd.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o \
seqcode.o write2rgma.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \
lb_xml_parse_V21.o \
lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \
BKSERVER_LIBS= \
${SRVBONES_LIB} \
${LB_COMMON_LIB} \
+ ${LB_MACHINE_LIB} \
${LB_UTILS_DB_LIB} \
${GSOAP_LIB} \
${EXT_LIBS} \
BKSERVER_LIBS= \
${SRVBONES_LIB} \
${LB_COMMON_LIB} \
+ ${LB_MACHINE_LIB} \
${LB_UTILS_DB_LIB} \
-lglite_security_gss_${nothrflavour} \
${EXT_LIBS} \
endif
INDEX_OBJS:= index.o index_parse.o jobstat_supp.o openserver.o \
- jobstat.o process_event.o process_event_pbs.o process_event_condor.o query.o lock.o get_events.o write2rgma.o index_lex.o \
+ jobstat.o query.o lock.o get_events.o write2rgma.o index_lex.o \
lb_authz.o store.o bkindex.o stats.o\
request.o db_store.o srv_purge.o notif_match.o il_lbproxy.o dump.o lb_xml_parse.o il_notification.o lb_proto.o server_state.o lb_xml_parse_V21.o lb_html.o notification.o seqcode.o userjobs.o load.o db_calls.o db_supp.lo
-INDEX_LIBS:= ${SRVBONES_LIB} ${COMMON_LIBS} ${EXT_LIBS} ${LB_UTILS_DB_LIB}
+INDEX_LIBS:= ${SRVBONES_LIB} ${COMMON_LIBS} ${LB_MACHINE_LIB} ${EXT_LIBS} ${LB_UTILS_DB_LIB}
WS_CLIENT_OBJS:= $(GSOAP_FILES_PREFIX)C.o $(GSOAP_FILES_PREFIX)Client.o ws_fault.o ws_typeref.o
WS_CLIENT_LIBS:= ${GSOAP_LIB} -lglite_lb_common_${nothrflavour} \
MAN = $(MAN_GZ:.gz=)
LIB_OBJS_BK:= \
- il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o process_event.o process_event_pbs.o process_event_condor.o \
+ il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o \
seqcode.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \
lb_xml_parse_V21.o \
lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \
db_calls.o db_supp.o
MONDB_OBJS:=mon-db.o ${LIB_OBJS_BK}
-MONDB_LIBS:=${COMMON_LIBS} ${LB_UTILS_DB_LIB} ${EXT_LIBS}
+MONDB_LIBS:=${COMMON_LIBS} ${LB_MACHINE_LIB} ${LB_UTILS_DB_LIB} ${EXT_LIBS}
MAN_GZ:= glite-lb-bkindex.8.gz glite-lb-bkserverd.8.gz
MAN = $(MAN_GZ:.gz=)
glite-lb-bkindex: ${INDEX_OBJS}
${LINKXX} -o $@ ${INDEX_OBJS} ${INDEX_LIBS}
-glite_lb_plugin.la: ${PLUGIN_LOBJS}
- ${SOLINK} -o $@ ${PLUGIN_LOBJS} ${PLUGIN_LIBS}
-
glite-lb-mon-db: ${MONDB_OBJS}
${LINKXX} -o $@ ${MONDB_OBJS} ${MONDB_LIBS}
-ifdef LB_STANDALONE
- PLUGIN_LIB=
-else
-ifndef BUILD_PLUGIN
- PLUGIN_LIB=
-else
- PLUGIN_LIB=glite_lb_plugin.la
-endif
-endif
-
${MAN_GZ}: ${MAN}
cp $? .
gzip $(notdir $?)
-compile: generate glite-lb-bkserverd glite-lb-bkindex glite-lb-mon-db ${STATIC_LIB_BK} ${PLUGIN_LIB} ${MAN_GZ}
+compile: generate glite-lb-bkserverd glite-lb-bkindex glite-lb-mon-db ${STATIC_LIB_BK} ${MAN_GZ}
-generate: store.c index.c jp_job_attrs.h
+generate: store.c index.c
check: compile test.xml test.query
-echo check.query not complete yet
ar crv $@ ${LIB_OBJS_BK}
ranlib $@
-jp_job_attrs.h: job-attrs.xsd jp_job_attrs.xsl
- ${XSLTPROC} ../src/jp_job_attrs.xsl $< >$@
-
doc: ${MAN_GZ}
${MAN_GZ}: ${MAN}
done
${INSTALL} -m 755 ${top_srcdir}/config/startup ${PREFIX}/etc/init.d/glite-lb-bkserverd
- ${INSTALL} -m 644 ${top_srcdir}/interface/job-attrs.xsd ${PREFIX}/interface
- ${INSTALL} -m 644 ${top_srcdir}/interface/job-record.xsd ${PREFIX}/interface
if [ ${stagedir} != ${PREFIX} ]; then ${INSTALL} -m 755 ${stagedir}/bin/glite-lb-notif-interlogd ${PREFIX}/bin; fi
mkdir -p ${PREFIX}/include/${globalprefix}/${lbprefix}
(cd ${top_srcdir}/interface && install -m 644 ${HDRS} ${PREFIX}/include/${globalprefix}/${lbprefix})
install -m 644 ${STATIC_LIB_BK} ${PREFIX}/lib
- if [ x${LB_STANDALONE} = x -a x${PLUGIN_LIB} != x ]; then \
- ${INSTALL} -m 755 ${PLUGIN_LIB} ${PREFIX}/lib; \
- ${INSTALL} -m 644 jp_job_attrs.h ${PREFIX}/include/${globalprefix}/${lbprefix} ; \
- fi
${INSTALL} -m 644 ${top_srcdir}/interface/srv_perf.h ${PREFIX}/include/${globalprefix}/${lbprefix}
- ${INSTALL} -m 644 ${MAN_GZ} ${PREFIX}/share/man/man8
+ ${INSTALL} -m 644 ${MAN_GZ} ${PREFIX}/share/man/man8
clean:
- rm -rvf *.c *.h *.ch *.xh *.xml *.nsmap *.o *.lo .libs glite-lb-* ${STATIC_LIB_BK} ${PLUGIN_LIB} test* ${MAN_GZ}
+ rm -rvf *.c *.h *.ch *.xh *.xml *.nsmap *.o *.lo .libs glite-lb-* ${STATIC_LIB_BK} test* ${MAN_GZ}
rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/
%.c: %.c.T
%.lo: %.c
${COMPILE} -o $@ -c $<
-lb_plugin.lo: lb_plugin.c jp_job_attrs.h
- ${COMPILE} -DPLUGIN_DEBUG -o $@ -c $<
-
soap_version.h:
${gsoap_bin_prefix}/soapcpp2 /dev/null
perl -ne '$$. == 2 && /.*([0-9])\.([0-9])\.([0-9]).*/ && printf "#define GSOAP_VERSION %d%02d%02d\n",$$1,$$2,$$3' soapH.h >$@
+++ /dev/null
-<?xml version="1.0"?>
-<!DOCTYPE schema PUBLIC "-//W3C//DTD XMLSCHEMA 200102//EN" "http://www.w3.org/2001/XMLSchema.dtd">
-
-<!-- $Header$ -->
-
-<xs:schema
- xmlns:xs="http://www.w3.org/2001/XMLSchema"
- xmlns:a="http://egee.cesnet.cz/en/Schema/LB/Attributes"
- targetNamespace="http://egee.cesnet.cz/en/Schema/LB/Attributes"
- version="1.0"
- elementFormDefault="qualified"
- attributeFormDefault="unqualified"
->
- <xs:simpleType name="statusType">
- <xs:restriction base="xs:string">
- <xs:enumeration value="SUBMITTED" />
- <xs:enumeration value="WAITING" />
- <xs:enumeration value="READY" />
- <xs:enumeration value="SCHEDULED" />
- <xs:enumeration value="RUNNING" />
- <xs:enumeration value="DONE" />
- <xs:enumeration value="CLEARED" />
- <xs:enumeration value="ABORTED" />
- <xs:enumeration value="CANCELLED" />
- <xs:enumeration value="UNKNOWN" />
- </xs:restriction>
- </xs:simpleType>
-
- <xs:simpleType name="doneType">
- <xs:restriction base="xs:string">
- <xs:enumeration value="OK"/>
- <xs:enumeration value="FAIL"/>
- <xs:enumeration value="CANCEL"/>
- </xs:restriction>
- </xs:simpleType>
-
- <xs:complexType name="historyStatusType">
- <xs:complexContent>
- <xs:restriction base="xs:anyType">
- <xs:attribute name="name" type="a:statusType" use="required"/>
- <xs:attribute name="timestamp" type="xs:dateTime" use="required"/>
- <xs:attribute name="reason" type="xs:string" use="optional"/>
- </xs:restriction>
- </xs:complexContent>
- </xs:complexType>
-
- <xs:simpleType name="jobTypeType">
- <xs:restriction base="xs:string">
- <xs:enumeration value="SIMPLE"/>
- <xs:enumeration value="DAG"/>
- </xs:restriction>
- </xs:simpleType>
-
- <xs:complexType name="statusSequenceType">
- <xs:sequence>
- <xs:element name="status" type="a:historyStatusType" minOccurs="0" maxOccurs="unbounded"/>
- </xs:sequence>
- </xs:complexType>
-
- <xs:complexType name="jobIdSequenceType">
- <xs:sequence>
- <xs:element name="jobId" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
- </xs:sequence>
- </xs:complexType>
-
-
-
- <xs:element name="jobId" type="xs:string" />
- <xs:element name="LBserver" type="xs:string" />
-
- <xs:element name="user" type="xs:string">
- <xs:annotation> <xs:documentation>Job owner according to LB</xs:documentation> </xs:annotation>
- </xs:element>
-
- <!-- Parent jobId of DAG -->
- <xs:element name="parent" type="xs:string" />
-
- <!-- the following 6 elements have to be retrieved from JDL -->
- <xs:element name="VO" type="xs:string" />
- <xs:element name="aTag" type="xs:string" />
- <xs:element name="rQType" type="xs:string" />
- <xs:element name="eDuration" type="xs:duration" />
- <xs:element name="eNodes" type="xs:int" />
- <xs:element name="eProc" type="xs:int" />
-
- <xs:element name="RB" type="xs:string" />
- <xs:element name="CE" type="xs:string" />
- <xs:element name="host" type="xs:string" /> <!-- worker node -->
-
- <!-- Genevieve's spec has "site" and "country" here, but I've got no idea how to retrieve them -->
- <xs:element name="UIHost" type="xs:string" />
-
- <!-- not mandatory, currently LB hasn't got the info -->
- <xs:element name="CPUTime" type="xs:duration" />
- <xs:element name="NProc" type="xs:int" />
-
- <!-- In LB the real final status is Cleared
- However, Done, Aborted, or Cancelled should be reported here -->
- <xs:element name="finalStatus" type="a:statusType" />
- <xs:element name="finalStatusDate" type="xs:dateTime" />
- <xs:element name="finalStatusReason" type="xs:string" />
- <xs:element name="LRMSDoneStatus" type="a:doneType" />
- <xs:element name="LRMSStatusReason" type="xs:string" />
-
- <xs:element name="retryCount" type="xs:int" />
- <xs:element name="additionalReason" type="xs:string" />
-
- <xs:element name="jobType" type="a:jobTypeType" />
- <xs:element name="nsubjobs" type="xs:int" />
- <xs:element name="subjobs" type="a:jobIdSequenceType" />
-
- <!-- timestamps of the state history of the last resubmission cycle,
- i.e. it is guaranteed that each state apears here only once.
- Cf. stateEnterTimes in LB JobStatus -->
- <xs:element name="lastStatusHistory" type="a:statusSequenceType" />
-
- <!-- timestamps of the whole state history, including all resubmission cycles -->
- <xs:element name="fullStatusHistory" type="a:statusSequenceType" />
-
- <xs:element name="JDL" type="xs:string" />
-
-<!-- No idea where to get these from:
-
- ENVIRONMENT
-
- testbed production, preproduction, specific
- release middleware release (LCG, g-lite...)
- version version of middleware
- job_history_version in case of structure changes
--->
-
-</xs:schema>
+++ /dev/null
-<?xml version="1.0"?>
-<!DOCTYPE schema PUBLIC "-//W3C//DTD XMLSCHEMA 200102//EN" "http://www.w3.org/2001/XMLSchema.dtd">
-
-<!-- $Header$ -->
-
-<xs:schema
- xmlns:xs="http://www.w3.org/2001/XMLSchema"
- xmlns:jr="http://egee.cesnet.cz/en/Schema/LB/JobRecord"
- xmlns:a="http://egee.cesnet.cz/en/Schema/LB/Attributes"
- targetNamespace="http://egee.cesnet.cz/en/Schema/LB/JobRecord"
- version="1.0"
- elementFormDefault="qualified"
- attributeFormDefault="unqualified"
->
-
- <xs:import
- namespace="http://egee.cesnet.cz/en/Schema/LB/Attributes"
- schemaLocation="http://egee.cesnet.cz/en/Schema/LB/Attributes"
- />
-
- <xs:complexType name="jobRecordType">
- <xs:sequence>
-
- <xs:element ref="a:jobId" minOccurs="1" maxOccurs="1"/>
- <xs:element ref="a:user" minOccurs="1" maxOccurs="1"/>
- <xs:element ref="a:parent" minOccurs="0" maxOccurs="1"/>
-
- <!-- the following 6 elements have to be retrieved from JDL -->
- <xs:element ref="a:VO" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:aTag" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:rQType" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:eDuration" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:eNodes" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:eProc" minOccurs="0" maxOccurs="1"/>
-
- <xs:element ref="a:RB" minOccurs="1" maxOccurs="1"/>
- <xs:element ref="a:CE" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:host" minOccurs="0" maxOccurs="1"/> <!-- worker node -->
-
- <!-- Genevieve's spec has "site" and "country" here, but I've got no idea how to retrieve them -->
- <xs:element ref="a:UIHost" minOccurs="0" maxOccurs="1"/>
-
- <!-- not mandatory, currently LB hasn't got the info -->
- <xs:element ref="a:CPUTime" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:NProc" minOccurs="0" maxOccurs="1"/>
-
- <!-- In LB the real final status is Cleared
- However, Done, Aborted, or Cancelled should be reported here -->
- <xs:element ref="a:finalStatus" minOccurs="1" maxOccurs="1"/>
- <xs:element ref="a:finalStatusDate" minOccurs="1" maxOccurs="1"/>
- <xs:element ref="a:finalStatusReason" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:LRMSDoneStatus" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:LRMSStatusReason" minOccurs="0" maxOccurs="1"/>
-
- <xs:element ref="a:retryCount" minOccurs="1" maxOccurs="1"/>
- <xs:element ref="a:additionalReason" minOccurs="0" maxOccurs="1"/>
-
- <xs:element ref="a:jobType" minOccurs="1" maxOccurs="1"/>
- <xs:element ref="a:nsubjobs" minOccurs="0" maxOccurs="1"/>
- <xs:element ref="a:subjobs" minOccurs="0" maxOccurs="1"/>
-
- <!-- timestamps of the state history of the last resubmission cycle,
- i.e. it is guaranteed that each state apears here only once.
- Cf. stateEnterTimes in LB JobStatus -->
-
- <xs:element ref="a:lastStatusHistory" minOccurs="1" maxOccurs="1"/>
-
- <!-- timestamps of the whole state history, including all resubmission cycles -->
- <xs:element ref="a:fullStatusHistory" minOccurs="1" maxOccurs="1"/>
-
- <xs:element ref="a:JDL" minOccurs="0" maxOccurs="1"/>
- </xs:sequence>
-
- <!-- No idea where to get these from:
-
- ENVIRONMENT
-
- testbed production, preproduction, specific
- release middleware release (LCG, g-lite...)
- version version of middleware
- job_history_version in case of structure changes
- -->
-
- <xs:attribute name="jobid" type="xs:string" use="required"/>
- </xs:complexType>
-
-
- <xs:element name="jobRecord" type="jr:jobRecordType"/>
-
-</xs:schema>
#include "glite/lbu/trio.h"
#include "glite/lb/events.h"
#include "glite/lb/context-int.h"
+#include "glite/lb/intjobstat.h"
#include "get_events.h"
#include "store.h"
#ident "$Header$"
#include "glite/lb/jobstat.h"
-
-/*
- * Internal representation of job state
- * (includes edg_wll_JobStat API structure)
- */
-
-/* convention: revision X.XX - DESCRIPTION */
-/* where X.XX is version from indent + 1 (version after commit) */
-/* and DESCRIPTION is short hit why version changed */
-
-#define INTSTAT_VERSION "revision 1.31 - proxy merge"
-
-
-// Internal error codes
-
-#define RET_FAIL 0
-#define RET_OK 1
-#define RET_FATAL RET_FAIL
-#define RET_SOON 2
-#define RET_LATE 3
-#define RET_BADSEQ 4
-#define RET_SUSPECT 5
-#define RET_IGNORE 6
-#define RET_BADBRANCH 7
-#define RET_GOODBRANCH 8
-#define RET_TOOOLD 9
-#define RET_UNREG 10
-#define RET_INTERNAL 100
-
-
-// shallow resubmission container - holds state of each branch
-// (useful when state restore is needed after ReallyRunning event)
-//
-typedef struct _branch_state {
- int branch;
- char *destination;
- char *ce_node;
- char *jdl;
- /*!! if adding new field, modify also free_branch_state() */
-} branch_state;
-
-
-typedef struct _intJobStat {
- edg_wll_JobStat pub;
- int resubmit_type;
- char *last_seqcode;
- char *last_cancel_seqcode;
- char *branch_tag_seqcode;
- char *last_branch_seqcode;
- char *deep_resubmit_seqcode;
- branch_state *branch_states; // branch zero terminated array
-
- struct timeval last_pbs_event_timestamp;
- int pbs_reruning; // true if rerun event arrived
-
- /*!! if adding new field, modify also destroy_intJobStat_extension() */
- } intJobStat;
-
-typedef enum _edg_wll_PBSEventSource {
- EDG_WLL_PBS_EVENT_SOURCE_UNDEF = 0,
- EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER,
- EDG_WLL_PBS_EVENT_SOURCE_SERVER,
- EDG_WLL_PBS_EVENT_SOURCE_MOM,
- EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING,
- EDG_WLL_PBS_EVENT_SOURCE__LAST
-} edg_wll_PBSEventSource;
-
-typedef enum _edg_wll_CondorEventSource {
- EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF = 0,
- EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR,
- EDG_WLL_CONDOR_EVENT_SOURCE_MASTER,
- EDG_WLL_CONDOR_EVENT_SOURCE_MATCH,
- EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR,
- EDG_WLL_CONDOR_EVENT_SOURCE_SCHED,
- EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW,
- EDG_WLL_CONDOR_EVENT_SOURCE_STARTER,
- EDG_WLL_CONDOR_EVENT_SOURCE_START,
- EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE,
- EDG_WLL_CONDOR_EVENT_SOURCE__LAST
-} edg_wll_CondorEventSource;
-
-typedef enum _subjobClassCodes {
- SUBJOB_CLASS_UNDEF = 0,
- SUBJOB_CLASS_RUNNING,
- SUBJOB_CLASS_DONE,
- SUBJOB_CLASS_ABORTED,
- SUBJOB_CLASS_CLEARED,
- SUBJOB_CLASS_REST
-} subjobClassCodes;
+#include "glite/lb/intjobstat.h"
int edg_wll_JobStatus(edg_wll_Context, glite_jobid_const_t, int, edg_wll_JobStat *);
-void destroy_intJobStat(intJobStat *);
-void destroy_intJobStat_extension(intJobStat *p);
-
int edg_wll_intJobStatus( edg_wll_Context, glite_jobid_const_t, int, intJobStat *, int);
edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
void write2rgma_chgstatus(edg_wll_JobStat *, char *);
char* write2rgma_statline(edg_wll_JobStat *);
-int before_deep_resubmission(const char *, const char *);
-int same_branch(const char *, const char *);
-int component_seqcode(const char *a, edg_wll_Source index);
-char * set_component_seqcode(char *s,edg_wll_Source index,int val);
-int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
-int processEvent_PBS(intJobStat *, edg_wll_Event *, int, int, char **);
-int processEvent_Condor(intJobStat *, edg_wll_Event *, int, int, char **);
-
int add_stringlist(char ***, const char *);
-int edg_wll_compare_seq(const char *, const char *);
-int edg_wll_compare_pbs_seq(const char *,const char *);
-#define edg_wll_compare_condor_seq edg_wll_compare_pbs_seq
-edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num);
-edg_wll_CondorEventSource get_condor_event_source(const char *pbs_seq_num);
-
-void init_intJobStat(intJobStat *p);
edg_wll_ErrorCode edg_wll_GetSubjobHistogram(edg_wll_Context, edg_wlc_JobId parent_jobid, int *hist);
edg_wll_ErrorCode edg_wll_StoreSubjobHistogram(edg_wll_Context, edg_wlc_JobId parent_jobid, intJobStat *ijs);
#include "glite/lbu/trio.h"
#include "glite/lbu/db.h"
#include "glite/lb/context-int.h"
+#include "glite/lb/intjobstat.h"
+#include "glite/lb/seqcode_aux.h"
#include "store.h"
#include "index.h"
}
-int component_seqcode(const char *a, edg_wll_Source index)
-{
- unsigned int c[EDG_WLL_SOURCE__LAST];
- int res;
- char sc[EDG_WLL_SEQ_SIZE];
-
- if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
- else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a);
-
- res = sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
- &c[EDG_WLL_SOURCE_USER_INTERFACE],
- &c[EDG_WLL_SOURCE_NETWORK_SERVER],
- &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
- &c[EDG_WLL_SOURCE_BIG_HELPER],
- &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
- &c[EDG_WLL_SOURCE_LOG_MONITOR],
- &c[EDG_WLL_SOURCE_LRMS],
- &c[EDG_WLL_SOURCE_APPLICATION],
- &c[EDG_WLL_SOURCE_LB_SERVER]);
- if (res != EDG_WLL_SOURCE__LAST-1) {
- syslog(LOG_ERR, "unparsable sequence code %s\n", sc);
- fprintf(stderr, "unparsable sequence code %s\n", sc);
- return -1;
- }
-
- return(c[index]);
-}
-
-char * set_component_seqcode(char *a,edg_wll_Source index,int val)
-{
- unsigned int c[EDG_WLL_SOURCE__LAST];
- int res;
- char *ret;
- char sc[EDG_WLL_SEQ_SIZE];
-
- if (!strstr(a, "LBS")) snprintf(sc,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
- else snprintf(sc,EDG_WLL_SEQ_SIZE,"%s",a);
-
- res = sscanf(sc, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
- &c[EDG_WLL_SOURCE_USER_INTERFACE],
- &c[EDG_WLL_SOURCE_NETWORK_SERVER],
- &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
- &c[EDG_WLL_SOURCE_BIG_HELPER],
- &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
- &c[EDG_WLL_SOURCE_LOG_MONITOR],
- &c[EDG_WLL_SOURCE_LRMS],
- &c[EDG_WLL_SOURCE_APPLICATION],
- &c[EDG_WLL_SOURCE_LB_SERVER]);
- if (res != EDG_WLL_SOURCE__LAST-1) {
- syslog(LOG_ERR, "unparsable sequence code %s\n", sc);
- fprintf(stderr, "unparsable sequence code %s\n", sc);
- return NULL;
- }
-
- c[index] = val;
- trio_asprintf(&ret,"UI=%06d:NS=%010d:WM=%06d:BH=%010d:JSS=%06d"
- ":LM=%06d:LRMS=%06d:APP=%06d:LBS=%06d",
- c[EDG_WLL_SOURCE_USER_INTERFACE],
- c[EDG_WLL_SOURCE_NETWORK_SERVER],
- c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
- c[EDG_WLL_SOURCE_BIG_HELPER],
- c[EDG_WLL_SOURCE_JOB_SUBMISSION],
- c[EDG_WLL_SOURCE_LOG_MONITOR],
- c[EDG_WLL_SOURCE_LRMS],
- c[EDG_WLL_SOURCE_APPLICATION],
- c[EDG_WLL_SOURCE_LB_SERVER]);
- return ret;
-}
-
-int before_deep_resubmission(const char *a, const char *b)
-{
- if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) <
- component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
- return(1);
- else
- return(0);
-
-}
-
-int same_branch(const char *a, const char *b)
-{
- if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ==
- component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
- return(1);
- else
- return(0);
-}
-
-int edg_wll_compare_pbs_seq(const char *a,const char *b)
-{
- char timestamp_a[14], pos_a[10], src_a;
- char timestamp_b[14], pos_b[10], src_b;
- int ev_code_a, ev_code_b;
- int res;
-
- res = sscanf(a,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_a, pos_a, &ev_code_a, &src_a);
-
- if (res != 4) {
- syslog(LOG_ERR, "unparsable sequence code %s\n", a);
- fprintf(stderr, "unparsable sequence code %s\n", a);
- return -1;
- }
-
- res = sscanf(b,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_b, pos_b, &ev_code_b, &src_b);
-
- if (res != 4) {
- syslog(LOG_ERR, "unparsable sequence code %s\n", b);
- fprintf(stderr, "unparsable sequence code %s\n", b);
- return -1;
- }
-
- /* wild card for PBSJobReg - this event should always come as firt one */
- /* bacause it hold job.type, which is necessary for further event processing */
- if (ev_code_a == EDG_WLL_EVENT_REGJOB) return -1;
- if (ev_code_b == EDG_WLL_EVENT_REGJOB) return 1;
-
- /* sort event w.t.r. to timestamps */
- if ((res = strcmp(timestamp_a,timestamp_b)) != 0) {
- return res;
- }
- else {
- /* if timestamps equal, sort if w.t.r. to file position */
- /* if you both events come from the same log file */
- if (src_a == src_b) {
- /* zero mean in fact duplicate events in log */
- return strcmp(pos_a,pos_b);
- }
- /* if the events come from diffrent log files */
- /* it is possible to prioritize some src log file */
- else {
- /* prioritize events from pbs_mom */
- if (src_a == 'm') return 1;
- if (src_b == 'm') return -1;
-
- /* then prioritize events from pbs_server */
- if (src_a == 's') return 1;
- if (src_b == 's') return -1;
-
- /* other priorities comes here... */
- }
- }
-
- return 0;
-}
-
-edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) {
- switch (pbs_seq_num[EDG_WLL_SEQ_PBS_SIZE - 2]) {
- case 'c': return(EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER);
- case 's': return(EDG_WLL_PBS_EVENT_SOURCE_SERVER);
- case 'm': return(EDG_WLL_PBS_EVENT_SOURCE_MOM);
- case 'a': return(EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING);
- default: return(EDG_WLL_PBS_EVENT_SOURCE_UNDEF);
- }
-}
-
-edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) {
- switch (condor_seq_num[EDG_WLL_SEQ_CONDOR_SIZE - 2]) {
- case 'L': return(EDG_WLL_CONDOR_EVENT_SOURCE_COLLECTOR);
- case 'M': return(EDG_WLL_CONDOR_EVENT_SOURCE_MASTER);
- case 'm': return(EDG_WLL_CONDOR_EVENT_SOURCE_MATCH);
- case 'N': return(EDG_WLL_CONDOR_EVENT_SOURCE_NEGOTIATOR);
- case 'C': return(EDG_WLL_CONDOR_EVENT_SOURCE_SCHED);
- case 'H': return(EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW);
- case 's': return(EDG_WLL_CONDOR_EVENT_SOURCE_STARTER);
- case 'S': return(EDG_WLL_CONDOR_EVENT_SOURCE_START);
- case 'j': return(EDG_WLL_CONDOR_EVENT_SOURCE_JOBQUEUE);
- default: return(EDG_WLL_CONDOR_EVENT_SOURCE_UNDEF);
- }
-}
-
-int edg_wll_compare_seq(const char *a, const char *b)
-{
- unsigned int c[EDG_WLL_SOURCE__LAST];
- unsigned int d[EDG_WLL_SOURCE__LAST];
- int res, i;
- char sca[EDG_WLL_SEQ_SIZE], scb[EDG_WLL_SEQ_SIZE];
-
-
- if ( (strstr(a,"TIMESTAMP=") == a) && (strstr(b,"TIMESTAMP=") == b) )
- return edg_wll_compare_pbs_seq(a,b);
-
- if (!strstr(a, "LBS")) snprintf(sca,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);
- else snprintf(sca,EDG_WLL_SEQ_SIZE,"%s",a);
- if (!strstr(b, "LBS")) snprintf(scb,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",b);
- else snprintf(scb,EDG_WLL_SEQ_SIZE,"%s",b);
-
- assert(EDG_WLL_SOURCE__LAST == 10);
-
- res = sscanf(sca, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
- &c[EDG_WLL_SOURCE_USER_INTERFACE],
- &c[EDG_WLL_SOURCE_NETWORK_SERVER],
- &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
- &c[EDG_WLL_SOURCE_BIG_HELPER],
- &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
- &c[EDG_WLL_SOURCE_LOG_MONITOR],
- &c[EDG_WLL_SOURCE_LRMS],
- &c[EDG_WLL_SOURCE_APPLICATION],
- &c[EDG_WLL_SOURCE_LB_SERVER]);
- if (res != EDG_WLL_SOURCE__LAST-1) {
- syslog(LOG_ERR, "unparsable sequence code %s\n", sca);
- fprintf(stderr, "unparsable sequence code %s\n", sca);
- return -1;
- }
-
- res = sscanf(scb, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d:LBS=%d",
- &d[EDG_WLL_SOURCE_USER_INTERFACE],
- &d[EDG_WLL_SOURCE_NETWORK_SERVER],
- &d[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
- &d[EDG_WLL_SOURCE_BIG_HELPER],
- &d[EDG_WLL_SOURCE_JOB_SUBMISSION],
- &d[EDG_WLL_SOURCE_LOG_MONITOR],
- &d[EDG_WLL_SOURCE_LRMS],
- &d[EDG_WLL_SOURCE_APPLICATION],
- &d[EDG_WLL_SOURCE_LB_SERVER]);
- if (res != EDG_WLL_SOURCE__LAST-1) {
- syslog(LOG_ERR, "unparsable sequence code %s\n", scb);
- fprintf(stderr, "unparsable sequence code %s\n", scb);
- return 1;
- }
-
- for (i = EDG_WLL_SOURCE_USER_INTERFACE ; i < EDG_WLL_SOURCE__LAST; i++) {
- if (c[i] < d[i]) return -1;
- if (c[i] > d[i]) return 1;
- }
-
- return 0;
-}
-
-static int compare_events_by_seq(const void *a, const void *b)
-{
- const edg_wll_Event *e = (edg_wll_Event *) a;
- const edg_wll_Event *f = (edg_wll_Event *) b;
- int ret;
-
-
- ret = edg_wll_compare_seq(e->any.seqcode, f->any.seqcode);
- if (ret) return ret;
-
- if (e->any.timestamp.tv_sec < f->any.timestamp.tv_sec) return -1;
- if (e->any.timestamp.tv_sec > f->any.timestamp.tv_sec) return 1;
- if (e->any.timestamp.tv_usec < f->any.timestamp.tv_usec) return -1;
- if (e->any.timestamp.tv_usec > f->any.timestamp.tv_usec) return 1;
- return 0;
-}
-
static int compare_pevents_by_seq(const void *a, const void *b)
{
const edg_wll_Event **e = (const edg_wll_Event **) a;
}
-void init_intJobStat(intJobStat *p)
-{
- memset(p, 0, sizeof(intJobStat));
- p->pub.jobtype = -1 /* why? EDG_WLL_STAT_SIMPLE */;
- p->pub.children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
- p->pub.children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES;
- p->pub.stateEnterTimes = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
- p->pub.stateEnterTimes[0] = EDG_WLL_NUMBER_OF_STATCODES;
- /* TBD: generate */
-}
-
+++ /dev/null
-<?xml version="1.0"?>
-
-<xsl:stylesheet version="1.0"
- xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
- xmlns:xs="http://www.w3.org/2001/XMLSchema"
->
-
-<xsl:output method="text"/>
-
-<xsl:template match="xs:schema">
-#ifndef GLITE_LB_JP_JOB_ATTR_H
-#define GLITE_LB_JP_JOB_ATTR_H
-#define GLITE_JP_LB_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes"
-#define GLITE_JP_LB_JDL_NS "http://egee.cesnet.cz/en/Schema/LB/Attributes:JDL"
- <xsl:apply-templates select="xs:element"/>
-#define GLITE_JP_LB_CLASSAD_NS "http://jdl"
-#endif
-</xsl:template>
-
-<xsl:template match="xs:element">
-/** <xsl:value-of select="xs:documentation/text()"/> */
-#define GLITE_JP_LB_<xsl:value-of select="@name"/> GLITE_JP_LB_NS ":<xsl:value-of select="@name"/>"
-</xsl:template>
-
-</xsl:stylesheet>
+++ /dev/null
-#ident "$Header$"
-
-#include <stdlib.h>
-#include <assert.h>
-#include <stdarg.h>
-#include <string.h>
-#include <stdio.h>
-#include <errno.h>
-#include <time.h>
-#include <ctype.h>
-
-#include <cclassad.h>
-
-#include "glite/lbu/trio.h"
-#include "glite/lb/context.h"
-#include "glite/lb/context-int.h"
-#include "glite/lb/jobstat.h"
-#include "glite/lb/events.h"
-#include "glite/lb/events_parse.h"
-
-#include "jobstat.h"
-#include "get_events.h"
-
-#include "glite/jp/types.h"
-#include "glite/jp/context.h"
-#include "glite/jp/file_plugin.h"
-#include "glite/jp/builtin_plugins.h"
-#include "glite/jp/backend.h"
-#include "glite/jp/attr.h"
-#include "glite/jp/utils.h"
-#include "glite/jp/known_attr.h"
-#include "jp_job_attrs.h"
-
-#define INITIAL_NUMBER_EVENTS 100
-#define INITIAL_NUMBER_STATES EDG_WLL_NUMBER_OF_STATCODES
-#define LB_PLUGIN_NAMESPACE "urn:org.glite.lb"
-
-/*typedef struct _lb_buffer_t {
- char *buf;
- size_t pos, size;
- off_t offset;
-} lb_buffer_t;*/
-
-typedef struct _lb_historyStatus {
- edg_wll_JobStatCode state;
- struct timeval timestamp;
- char *reason;
-} lb_historyStatus;
-
-typedef struct _lb_handle {
- edg_wll_Event **events;
- edg_wll_JobStat status;
- lb_historyStatus **fullStatusHistory, **lastStatusHistory, *finalStatus;
- glite_jpps_fplug_data_t* classad_plugin;
-} lb_handle;
-
-#define check_strdup(s) ((s) ? strdup(s) : NULL)
-
-extern int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
-
-static int lb_query(void *fpctx, void *handle, const char *attr, glite_jp_attrval_t **attrval);
-static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle);
-static int lb_close(void *fpctx, void *handle);
-static int lb_filecom(void *fpctx, void *handle);
-static int lb_status(void *handle);
-//static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line);
-
-static int lb_dummy(void *fpctx, void *handle, int oper, ...) {
- puts("lb_dummy() - generic call not used; for testing purposes only...");
- return -1;
-}
-
-int init(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data) {
-
- data->fpctx = ctx;
-
- data->uris = calloc(2,sizeof *data->uris);
- data->uris[0] = strdup(GLITE_JP_FILETYPE_LB);
-
- data->classes = calloc(2,sizeof *data->classes);
- data->classes[0] = strdup("lb");
-
- data->namespaces = calloc(4, sizeof *data->namespaces);
- data->namespaces[0] = strdup(GLITE_JP_LB_NS);
- data->namespaces[1] = strdup(GLITE_JP_LB_JDL_NS);
- data->namespaces[2] = strdup(GLITE_JP_LBTAG_NS);
-
- data->ops.open = lb_open;
- data->ops.close = lb_close;
- data->ops.filecom = lb_filecom;
- data->ops.attr = lb_query;
- data->ops.generic = lb_dummy;
-
-#ifdef PLUGIN_DEBUG
- fprintf(stderr,"lb_plugin: init OK\n");
-#endif
- return 0;
-}
-
-
-void done(glite_jp_context_t ctx, glite_jpps_fplug_data_t *data) {
- free(data->uris[0]);
- free(data->classes[0]);
- free(data->namespaces[0]);
- free(data->namespaces[1]);
- free(data->namespaces[2]);
- free(data->uris);
- free(data->classes);
- free(data->namespaces);
- memset(data, 0, sizeof(*data));
-}
-
-
-static int lb_open(void *fpctx, void *bhandle, const char *uri, void **handle) {
-
- lb_handle *h;
- rl_buffer_t buffer;
- glite_jp_context_t ctx = (glite_jp_context_t) fpctx;
- char *line;
- int retval;
- edg_wll_Context context;
- int nevents, maxnevents, i;
- glite_jp_error_t err;
- char *id0 = NULL,*id = NULL;
-
- glite_jp_clear_error(ctx);
- h = calloc(1, sizeof(lb_handle));
-
- if ((retval = edg_wll_InitContext(&context)) != 0) return retval;
-
- // read the file given by bhandle
- // parse events into h->events array
- memset(&buffer, 0, sizeof(buffer));
- buffer.buf = malloc(BUFSIZ);
- maxnevents = INITIAL_NUMBER_EVENTS;
- nevents = 0;
- h->events = malloc(maxnevents * sizeof(edg_wll_Event *));
-
- if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) {
- err.code = retval;
- err.desc = "reading LB logline";
- err.source = "lb_plugin.c:read_line()";
- glite_jp_stack_error(ctx,&err);
- goto fail;
- }
- while (line) {
-#ifdef PLUGIN_DEBUG
- //fprintf(stderr,"lb_plugin opened\n", line);
-#endif
-
- if (line[0]) {
- if (nevents >= maxnevents) {
- maxnevents <<= 1;
- h->events = realloc(h->events, maxnevents * sizeof(edg_wll_Event *));
- }
- if ((retval = edg_wll_ParseEvent(context, line, &h->events[nevents])) != 0) {
- char *ed;
- free(line);
- err.code = retval;
- edg_wll_Error(context,NULL,&ed);
- err.desc = ed;
- err.source = "edg_wll_ParseEvent()";
- glite_jp_stack_error(ctx,&err);
- free(ed);
- goto fail;
- }
- if (nevents == 0) {
- id0 = edg_wlc_JobIdGetUnique(h->events[nevents]->any.jobId );
- } else {
- id = edg_wlc_JobIdGetUnique(h->events[nevents]->any.jobId );
- if (strcmp(id0,id) != 0) {
- char et[BUFSIZ];
- retval = EINVAL;
- err.code = retval;
- snprintf(et,sizeof et,"Attempt to process different jobs. Id '%s' (event n.%d) differs from '%s'",id,nevents+1,id0);
- et[BUFSIZ-1] = 0;
- err.desc = et;
- err.source = "lb_plugin.c:edg_wlc_JobIdGetUnique()";
- glite_jp_stack_error(ctx,&err);
- goto fail;
- }
- }
-
- if (id) free(id); id = NULL;
- nevents++;
- }
- free(line);
-
- if ((retval = glite_jppsbe_readline(ctx, bhandle, &buffer, &line)) != 0) {
- err.code = retval;
- err.desc = "reading LB logline";
- err.source = "lb_plugin.c:read_line()";
- glite_jp_stack_error(ctx,&err);
- goto fail;
- }
- }
- free(line);
-
- free(buffer.buf);
- edg_wll_FreeContext(context);
-
- if (nevents >= maxnevents) {
- maxnevents <<= 1;
- h->events = realloc(h->events, maxnevents * sizeof(edg_wll_Event *));
- }
- h->events[nevents] = NULL;
-
-#ifdef PLUGIN_DEBUG
- fprintf(stderr,"lb_plugin: opened %d events\n", nevents);
-#endif
-
- // find classad plugin, if it is loaded
- int j;
- h->classad_plugin = NULL;
- for (i=0; ctx->plugins[i]; i++){
- glite_jpps_fplug_data_t *pd = ctx->plugins[i];
- if (pd->namespaces)
- for (j=0; pd->classes[j]; j++)
- if (! strcmp(pd->classes[j], "classad")){
- h->classad_plugin = pd;
- goto cont;
- }
- }
-cont:
-
- /* count state and status history of the job given by the loaded events */
- if ((retval = lb_status(h)) != 0) goto fail;
-
- *handle = (void *)h;
-
- return 0;
-
-fail:
-#ifdef PLUGIN_DEBUG
- fprintf(stderr,"lb_plugin: open ERROR\n");
-#endif
- for (i = 0; i < nevents; i++) {
- edg_wll_FreeEvent(h->events[i]);
- free(h->events[i]);
- }
- free(h->events);
- free(buffer.buf);
- if (id0) free(id0);
- if (id) free(id);
- edg_wll_FreeContext(context);
- free(h);
- *handle = NULL;
- err.code = EIO;
- err.desc = NULL;
- err.source = __FUNCTION__;
- glite_jp_stack_error(ctx,&err);
-
- return retval;
-}
-
-
-static int lb_close(void *fpctx,void *handle) {
-
- lb_handle *h = (lb_handle *) handle;
- int i;
-
- // Free allocated stuctures
- if (h->events) {
- i = 0;
- while (h->events[i]) {
- edg_wll_FreeEvent(h->events[i]);
- free(h->events[i]);
- i++;
- }
- free(h->events);
- }
-
- if (h->status.state != EDG_WLL_JOB_UNDEF)
- edg_wll_FreeStatus(&h->status);
-
- if (h->fullStatusHistory) {
- i = 0;
- while (h->fullStatusHistory[i]) {
- if (h->fullStatusHistory[i]->reason) free(h->fullStatusHistory[i]->reason);
- free (h->fullStatusHistory[i]);
- i++;
- }
- h->fullStatusHistory = NULL;
- h->lastStatusHistory = NULL;
- h->finalStatus = NULL;
- }
-
- free(h);
-
-#ifdef PLUGIN_DEBUG
- fprintf(stderr,"lb_plugin: close OK\n");
-#endif
- return 0;
-}
-
-static int get_classad_attr(const char* attr, glite_jp_context_t ctx, lb_handle *h, glite_jp_attrval_t **av){
- printf("attr = %s\n", attr);
- glite_jp_error_t err;
- glite_jp_clear_error(ctx);
- memset(&err,0,sizeof err);
- err.source = __FUNCTION__;
-
- if (! h->classad_plugin){
- err.code = ENOENT;
- err.desc = strdup("Classad plugin has not been loaded.");
- return glite_jp_stack_error(ctx,&err);
- }
- // Get the attribute from JDL
- int i = 0;
- while (h->events[i]){
- if (h->events[i]->type == EDG_WLL_EVENT_REGJOB
- && h->events[i]->regJob.jdl
- && h->events[i]->regJob.jdl[0])
- {
- void *beh;
- if (! h->classad_plugin->ops.open_str(h->classad_plugin->fpctx, h->events[i]->regJob.jdl, "", "", &beh)){
- if (! h->classad_plugin->ops.attr(h->classad_plugin->fpctx, beh, attr, av))
- (*av)[0].timestamp = h->events[i]->any.timestamp.tv_sec;
- else{
- h->classad_plugin->ops.close(h->classad_plugin->fpctx, beh);
- err.code = ENOENT;
- err.desc = strdup("Classad attribute not found.");
- return glite_jp_stack_error(ctx,&err);
- }
- h->classad_plugin->ops.close(h->classad_plugin->fpctx, beh);
- }
- }
- i++;
- }
- return 0;
-}
-
-static int lb_query(void *fpctx,void *handle, const char *attr,glite_jp_attrval_t **attrval) {
- lb_handle *h = (lb_handle *) handle;
- glite_jp_context_t ctx = (glite_jp_context_t) fpctx;
- glite_jp_error_t err;
- glite_jp_attrval_t *av = NULL;
- int i, j, n_tags;
- char *ns = glite_jpps_get_namespace(attr);
- char *tag;
-
- glite_jp_clear_error(ctx);
- memset(&err,0,sizeof err);
- err.source = __FUNCTION__;
-
- if ((h->events == NULL) ||
- (h->status.state == EDG_WLL_JOB_UNDEF) ||
- (h->fullStatusHistory == NULL) ) {
- *attrval = NULL;
- err.code = ENOENT;
- err.desc = strdup("There is no job information to query.");
- return glite_jp_stack_error(ctx,&err);
- }
-
- if (strcmp(ns, GLITE_JP_LB_JDL_NS) == 0){
- if (get_classad_attr(attr, ctx, h, &av)){
- *attrval = NULL;
- err.code = ENOENT;
- err.desc = strdup("Cannot get attribute from classad.");
- free(ns);
- return glite_jp_stack_error(ctx,&err);
- }
- }
- else if (strcmp(attr, GLITE_JP_LB_user) == 0) {
- if (h->status.owner) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->status.owner);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_jobId) == 0) {
- if (h->status.jobId) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = edg_wlc_JobIdUnparse(h->status.jobId);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_LBserver) == 0) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = edg_wlc_JobIdGetServer(h->status.jobId);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- } else if (strcmp(attr, GLITE_JP_LB_parent) == 0) {
- if (h->status.parent_job) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = edg_wlc_JobIdUnparse(h->status.parent_job);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_VO) == 0) {
- if (get_classad_attr(":VirtualOrganisation", ctx, h, &av)){
- printf("error");
- *attrval = NULL;
- err.code = ENOENT;
- err.desc = strdup("Cannot get attribute from classad.");
- free(ns);
- return glite_jp_stack_error(ctx,&err);
- }
- } else if (strcmp(attr, GLITE_JP_LB_eNodes) == 0) {
- if (get_classad_attr(":max_nodes_running", ctx, h, &av)){
- printf("error");
- *attrval = NULL;
- err.code = ENOENT;
- err.desc = strdup("Cannot get attribute from classad.");
- free(ns);
- return glite_jp_stack_error(ctx,&err);
- }
- } else if (strcmp(attr, GLITE_JP_LB_eProc) == 0) {
- if (get_classad_attr(":NodeNumber", ctx, h, &av)){
- printf("error");
- *attrval = NULL;
- err.code = ENOENT;
- err.desc = strdup("Cannot get attribute from classad.");
- free(ns);
- return glite_jp_stack_error(ctx,&err);
- }
- } else if (strcmp(attr, GLITE_JP_LB_aTag) == 0 ||
- strcmp(attr, GLITE_JP_LB_rQType) == 0 ||
- strcmp(attr, GLITE_JP_LB_eDuration) == 0) {
- /* have to be retrieved from JDL, but probably obsolete and not needed at all */
- char et[BUFSIZ];
- *attrval = NULL;
- err.code = ENOSYS;
- snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr);
- et[BUFSIZ-1] = 0;
- err.desc = et;
- return glite_jp_stack_error(ctx,&err);
- } else if (strcmp(attr, GLITE_JP_LB_RB) == 0) {
- if (h->status.network_server) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->status.network_server);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_CE) == 0) {
- if (h->status.destination) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->status.destination);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_host) == 0) {
- if (h->status.ce_node) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->status.ce_node);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_UIHost) == 0) {
- i = 0;
- while (h->events[i]) {
- if (h->events[i]->type == EDG_WLL_EVENT_REGJOB) {
- if (h->events[i]->any.host) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->events[i]->any.host);
- av[0].size = -1;
- av[0].timestamp = h->events[i]->any.timestamp.tv_sec;
- }
- break;
- }
- i++;
- }
- } else if (strcmp(attr, GLITE_JP_LB_CPUTime) == 0) {
- if (h->status.cpuTime) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- trio_asprintf(&av[0].value,"%d", h->status.cpuTime);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_NProc) == 0) {
- /* currently LB hasn't got the info */
- char et[BUFSIZ];
- *attrval = NULL;
- err.code = ENOSYS;
- snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr);
- et[BUFSIZ-1] = 0;
- err.desc = et;
- return glite_jp_stack_error(ctx,&err);
- } else if (strcmp(attr, GLITE_JP_LB_finalStatus) == 0) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- if (h->finalStatus) {
- av[0].value = edg_wll_StatToString(h->finalStatus->state);
- av[0].timestamp = h->finalStatus->timestamp.tv_sec;
- } else {
- av[0].value = edg_wll_StatToString(h->status.state);
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- av[0].size = -1;
- } else if (strcmp(attr, GLITE_JP_LB_finalStatusDate) == 0) {
- struct tm *t = NULL;
- if ( (h->finalStatus) &&
- ((t = gmtime(&h->finalStatus->timestamp.tv_sec)) != NULL) ) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */
- trio_asprintf(&av[0].value,"%04d-%02d-%02dT%02d:%02d:%02d.%06d",
- 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
- t->tm_hour, t->tm_min, t->tm_sec,
- h->finalStatus->timestamp.tv_usec);
- av[0].size = -1;
- av[0].timestamp = h->finalStatus->timestamp.tv_sec;
- } else if ((t = gmtime(&h->status.lastUpdateTime.tv_sec)) != NULL) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */
- trio_asprintf(&av[0].value,"%04d-%02d-%02dT%02d:%02d:%02d.%06d",
- 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
- t->tm_hour, t->tm_min, t->tm_sec,
- h->status.lastUpdateTime.tv_usec);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_finalStatusReason) == 0) {
- if (h->finalStatus && h->finalStatus->reason) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->finalStatus->reason);
- av[0].size = -1;
- av[0].timestamp = h->finalStatus->timestamp.tv_sec;
- } else if (h->status.reason) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->status.reason);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_LRMSDoneStatus) == 0) {
- i = 0;
- j = -1;
- while (h->events[i]) {
- if ( (h->events[i]->type == EDG_WLL_EVENT_DONE) &&
- (h->events[i]->any.source == EDG_WLL_SOURCE_LRMS) )
- j = i;
- i++;
- }
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].size = -1;
- if ( j != -1) {
- av[0].value = edg_wll_DoneStatus_codeToString(h->events[j]->done.status_code);
- av[0].timestamp = h->events[j]->any.timestamp.tv_sec;
- } else {
- av[0].value = edg_wll_DoneStatus_codeToString(h->status.done_code);
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_LRMSStatusReason) == 0) {
- i = 0;
- j = -1;
- while (h->events[i]) {
- if ( (h->events[i]->type == EDG_WLL_EVENT_DONE) &&
- (h->events[i]->any.source == EDG_WLL_SOURCE_LRMS) )
- j = i;
- i++;
- }
- if ( ( j != -1) && (h->events[j]->done.reason) ) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->events[j]->done.reason);
- av[0].size = -1;
- av[0].timestamp = h->events[j]->any.timestamp.tv_sec;
- }
- } else if (strcmp(attr, GLITE_JP_LB_retryCount) == 0) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- trio_asprintf(&av[0].value,"%d", h->status.resubmitted);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- } else if (strcmp(attr, GLITE_JP_LB_additionalReason) == 0) {
- /* what is it? */
- char et[BUFSIZ];
- *attrval = NULL;
- err.code = ENOSYS;
- snprintf(et,sizeof et,"Attribute '%s' not implemented yet.",attr);
- et[BUFSIZ-1] = 0;
- err.desc = et;
- return glite_jp_stack_error(ctx,&err);
- } else if (strcmp(attr, GLITE_JP_LB_jobType) == 0) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- switch (h->status.jobtype) {
- case EDG_WLL_STAT_SIMPLE:
- av[0].value = strdup("SIMPLE"); break;
- case EDG_WLL_STAT_DAG:
- av[0].value = strdup("DAG"); break;
- default:
- av[0].value = strdup("UNKNOWN"); break;
- }
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- } else if (strcmp(attr, GLITE_JP_LB_nsubjobs) == 0) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- trio_asprintf(&av[0].value,"%d", h->status.children_num);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- } else if (strcmp(attr, GLITE_JP_LB_subjobs) == 0) {
- if (h->status.children_num > 0) {
- char *val = NULL, *old_val;
-
- old_val = strdup ("");
- for (i=0; i<h->status.children_num; i++) {
- trio_asprintf(&val,"%s\t\t<jobId>%s</jobId>\n",
- old_val, h->status.children[i] ? h->status.children[i] : "");
- if (old_val) free(old_val);
- old_val = val; val = NULL;
- }
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = check_strdup(old_val);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- } else {
- char et[BUFSIZ];
- *attrval = NULL;
- err.code = ENOENT;
- snprintf(et,sizeof et,"Value unknown for attribute '%s', there are no subjobs.",attr);
- et[BUFSIZ-1] = 0;
- err.desc = et;
- return glite_jp_stack_error(ctx,&err);
- }
- } else if (strcmp(attr, GLITE_JP_LB_lastStatusHistory) == 0) {
- int i,j;
- char *val, *old_val, *s_str, *t_str, *r_str;
- struct tm *t;
-
- val = s_str = t_str = r_str = NULL;
- old_val = strdup("");
- t = calloc(1, sizeof(*t));
- /* first record is Submitted - hopefully in fullStatusHistory[0] */
- if ((h->fullStatusHistory[0] &&
- (h->fullStatusHistory[0]->state == EDG_WLL_JOB_SUBMITTED)) ) {
-
- s_str = edg_wll_StatToString(h->fullStatusHistory[0]->state);
- for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]);
- if (gmtime_r(&h->fullStatusHistory[0]->timestamp.tv_sec,t) != NULL) {
- /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */
- trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ",
- 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
- t->tm_hour, t->tm_min, t->tm_sec,
- h->fullStatusHistory[0]->timestamp.tv_usec);
- }
- if (h->fullStatusHistory[0]->reason) {
- trio_asprintf(&r_str,"reason=\"%s\" ",h->fullStatusHistory[0]->reason);
- }
- trio_asprintf(&val,"%s\t\t<status xmlns=\"" GLITE_JP_LB_NS "\" name=\"%s\" %s%s/>\n",
- old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : "");
- if (s_str) free(s_str);
- if (t_str) free(t_str);
- if (r_str) free(t_str);
- if (old_val) free(old_val);
- old_val = val; val = NULL;
- }
- /* and the rest is from last Waiting to the end - i.e. all lastStatusHistory[] */
- if (h->lastStatusHistory) {
- i = 0;
- while (h->lastStatusHistory[i]) {
- s_str = edg_wll_StatToString(h->lastStatusHistory[i]->state);
- for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]);
- if (gmtime_r(&h->lastStatusHistory[i]->timestamp.tv_sec,t) != NULL) {
- /* dateTime format: yyyy-mm-ddThh:mm:ss.uuuuuu */
- trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ",
- 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
- t->tm_hour, t->tm_min, t->tm_sec,
- h->lastStatusHistory[i]->timestamp.tv_usec);
- }
- if (h->lastStatusHistory[i]->reason) {
- trio_asprintf(&r_str,"reason=\"%s\" ",h->lastStatusHistory[i]->reason);
- }
- trio_asprintf(&val,"%s\t\t<status xmlns=\"" GLITE_JP_LB_NS "\" name=\"%s\" %s%s/>\n",
- old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : "");
- if (s_str) free(s_str); s_str = NULL;
- if (t_str) free(t_str); t_str = NULL;
- if (r_str) free(r_str); r_str = NULL;
- if (old_val) free(old_val);
- old_val = val; val = NULL;
- i++;
- }
- }
- val = old_val; old_val = NULL;
- if (val) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(val);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- free(val);
- }
- } else if (strcmp(attr, GLITE_JP_LB_fullStatusHistory) == 0) {
- int i,j;
- char *val, *old_val, *s_str, *t_str, *r_str;
- struct tm *t;
-
- val = s_str = t_str = r_str = NULL;
- old_val = strdup("");
- t = calloc(1, sizeof(*t));
- i = 0;
- while (h->fullStatusHistory[i]) {
- s_str = edg_wll_StatToString(h->fullStatusHistory[i]->state);
- for (j = 0; s_str[j]; j++) s_str[j] = toupper(s_str[j]);
- if (gmtime_r(&h->fullStatusHistory[i]->timestamp.tv_sec,t) != NULL) {
- /* dateTime format: yyyy-mm-ddThh:mm:ss:uuuuuu */
- trio_asprintf(&t_str,"timestamp=\"%04d-%02d-%02dT%02d:%02d:%02d.%06d\" ",
- 1900+t->tm_year, 1+t->tm_mon, t->tm_mday,
- t->tm_hour, t->tm_min, t->tm_sec,
- h->fullStatusHistory[i]->timestamp.tv_usec);
- }
- if (h->fullStatusHistory[i]->reason) {
- trio_asprintf(&r_str,"reason=\"%s\" ",h->fullStatusHistory[i]->reason);
- }
- trio_asprintf(&val,"%s\t\t<status xmlns=\"" GLITE_JP_LB_NS "\" name=\"%s\" %s%s/>\n",
- old_val, s_str ? s_str : "", t_str ? t_str : "", r_str ? r_str : "");
- if (s_str) free(s_str); s_str = NULL;
- if (t_str) free(t_str); t_str = NULL;
- if (r_str) free(r_str); r_str = NULL;
- if (old_val) free(old_val);
- old_val = val; val = NULL;
- i++;
- }
- val = old_val; old_val = NULL;
- if (val) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(val);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- free(val);
- }
- } else if (strcmp(ns, GLITE_JP_LBTAG_NS) == 0) {
- tag = strrchr(attr, ':');
- if (h->events && tag) {
- tag++;
- i = 0;
- n_tags = 0;
-
- while (h->events[i]) {
- if ((h->events[i]->type == EDG_WLL_EVENT_USERTAG) &&
- (strcasecmp(h->events[i]->userTag.name, tag) == 0) ) {
-/* XXX: LB tag names are case-insensitive */
- av = realloc(av, (n_tags+2) * sizeof(glite_jp_attrval_t));
- memset(&av[n_tags], 0, 2 * sizeof(glite_jp_attrval_t));
-
- av[n_tags].name = strdup(attr);
- av[n_tags].value = check_strdup(h->events[i]->userTag.value);
- av[n_tags].timestamp =
- h->events[i]->any.timestamp.tv_sec;
- av[n_tags].size = -1;
-
- n_tags++;
- }
- i++;
- }
- }
- } else if (strcmp(attr, GLITE_JP_LB_JDL) == 0) {
- if (h->status.jdl) {
- av = calloc(2, sizeof(glite_jp_attrval_t));
- av[0].name = strdup(attr);
- av[0].value = strdup(h->status.jdl);
- av[0].size = -1;
- av[0].timestamp = h->status.lastUpdateTime.tv_sec;
- }
- } else {
- char et[BUFSIZ];
- *attrval = NULL;
- err.code = EINVAL;
- snprintf(et,sizeof et,"No such attribute '%s'.",attr);
- et[BUFSIZ-1] = 0;
- err.desc = et;
- return glite_jp_stack_error(ctx,&err);
- }
-
- free(ns);
-
- if (av && av[0].value) {
- for (i=0; av[i].name; i++) av[i].origin = GLITE_JP_ATTR_ORIG_FILE;
- *attrval = av;
- return 0;
- } else {
- char et[BUFSIZ];
- *attrval = NULL;
- err.code = ENOENT;
- snprintf(et,sizeof et,"Value unknown for attribute '%s'.",attr);
- et[BUFSIZ-1] = 0;
- err.desc = et;
- if (av) glite_jp_attrval_free(av,1); // XXX: probably not needed
- return glite_jp_stack_error(ctx,&err);
- }
-}
-
-static int lb_filecom(void *fpctx, void *handle){
- glite_jp_context_t ctx = (glite_jp_context_t) fpctx;
- lb_handle *h = (lb_handle *) handle;
- glite_jp_attrval_t attr[2];
- memset(attr, 0, 2 * sizeof(glite_jp_attrval_t));
-
- if (h->events) {
- int i = 0;
- while (h->events[i]) {
- if (h->events[i]->type == EDG_WLL_EVENT_USERTAG &&
- strchr(h->events[i]->userTag.name,':'))
- {
- //printf("%s, %s\n", edg_wlc_JobIdUnparse(h->status.jobId), h->status.jobId);
- attr[0].name = h->events[i]->userTag.name;
- attr[0].value = h->events[i]->userTag.value;
- attr[0].binary = 0;
- attr[0].origin = GLITE_JP_ATTR_ORIG_USER;
- attr[0].timestamp = time(NULL);
- attr[0].origin_detail = NULL; /* XXX */
- attr[1].name = NULL;
- glite_jppsbe_append_tag(ctx, edg_wlc_JobIdUnparse(h->status.jobId), attr);
- }
- i++;
- }
- }
-
- return 0;
-}
-
-static int lb_status(void *handle) {
-
- lb_handle *h = (lb_handle *) handle;
- intJobStat *js;
- int maxnstates, nstates, i, be_strict = 0, retval;
- char *errstring;
- edg_wll_JobStatCode old_state = EDG_WLL_JOB_UNDEF;
- int lastStatusHistoryIndex = -1;
-
- js = calloc(1, sizeof(intJobStat));
- init_intJobStat(js);
-
- edg_wll_SortPEvents(h->events);
-
- maxnstates = INITIAL_NUMBER_STATES;
- nstates = 0;
- h->fullStatusHistory = calloc(maxnstates, sizeof(lb_historyStatus *));
- h->lastStatusHistory = NULL;
- h->finalStatus = NULL;
- i = 0;
- while (h->events[i])
- {
- /* realloc the fullStatusHistory if needed */
- if (nstates >= maxnstates) {
- maxnstates <<= 1;
- h->fullStatusHistory = realloc(h->fullStatusHistory, maxnstates * sizeof(lb_historyStatus *));
- if (lastStatusHistoryIndex > -1)
- h->lastStatusHistory = &(h->fullStatusHistory[lastStatusHistoryIndex]);
- }
-
- /* job owner and jobId not filled from events normally */
- if (h->events[i]->any.type == EDG_WLL_EVENT_REGJOB) {
- js->pub.owner = check_strdup(h->events[i]->any.user);
- if (edg_wlc_JobIdDup(h->events[i]->any.jobId,&js->pub.jobId)) {
- goto err;
- }
- }
- /* Process Event and update the state */
- if (processEvent(js, h->events[i], 0, be_strict, &errstring) == RET_FATAL) {
- goto err;
- }
-
- /* if the state has changed, update the status history */
- if (js->pub.state != old_state) {
- h->fullStatusHistory[nstates] = calloc(1,sizeof(lb_historyStatus));
- h->fullStatusHistory[nstates]->state = js->pub.state;
- h->fullStatusHistory[nstates]->timestamp.tv_sec = js->pub.stateEnterTime.tv_sec;
- h->fullStatusHistory[nstates]->timestamp.tv_usec = js->pub.stateEnterTime.tv_usec;
- h->fullStatusHistory[nstates]->reason = check_strdup(js->pub.reason);
- /* lastStatusHistory starts from the last WAITING state */
- if (js->pub.state == EDG_WLL_JOB_WAITING) {
- h->lastStatusHistory = &(h->fullStatusHistory[nstates]);
- lastStatusHistoryIndex = nstates;
- }
- /* finalStatus is the one preceeding the CLEARED state */
- if ( (js->pub.state == EDG_WLL_JOB_CLEARED) && (nstates > 0) ) {
- h->finalStatus = h->fullStatusHistory[nstates-1];
- }
- old_state = js->pub.state;
- nstates++;
- }
-
- i++;
- }
- h->fullStatusHistory[nstates] = NULL;
- /* if there is no CLEARED state, finalStatus is just the last status
- and if there is no such thing, leave h->finalStatus NULL and for the attribute
- try to read something from the h->status */
- if ( (h->finalStatus == NULL) && (nstates > 0) ) {
- h->finalStatus = h->fullStatusHistory[nstates-1];
- }
-
- /* fill in also subjobs */
- if (js->pub.children_num > 0) {
- edg_wll_Context context;
- edg_wlc_JobId *subjobs;
-
- if ((retval = edg_wll_InitContext(&context)) != 0) return retval;
- subjobs = calloc(js->pub.children_num, sizeof (*subjobs));
- if ((retval = edg_wll_GenerateSubjobIds(context,
- js->pub.jobId, js->pub.children_num, js->pub.seed, &subjobs) ) != 0 ) {
- goto err;
- }
- js->pub.children = calloc(js->pub.children_num + 1, sizeof (*js->pub.children));
- for (i=0; i<js->pub.children_num; i++) {
- js->pub.children[i] = edg_wlc_JobIdUnparse(subjobs[i]);
- }
- edg_wll_FreeContext(context);
- free(subjobs);
- }
-
- memcpy(&h->status, &js->pub, sizeof(edg_wll_JobStat));
-
- // not very clean, but working
- memset(&js->pub, 0, sizeof(edg_wll_JobStat));
- destroy_intJobStat(js);
-
- return 0;
-
-err:
- destroy_intJobStat(js);
- return -1;
-}
-
-
-/*
- * realloc the line to double size if needed
- *
- * \return 0 if failed, did nothing
- * \return 1 if success
- */
-/*int check_realloc_line(char **line, size_t *maxlen, size_t len) {
- void *tmp;
-
- if (len > *maxlen) {
- *maxlen <<= 1;
- tmp = realloc(*line, *maxlen);
- if (!tmp) return 0;
- *line = tmp;
- }
-
- return 1;
-}
-*/
-
-/*
- * read next line from stream
- *
- * \return error code
- */
-/*static int read_line(glite_jp_context_t ctx, void *handle, lb_buffer_t *buffer, char **line) {
-
- size_t maxlen, len, i;
- ssize_t nbytes;
- int retval, z, end;
-
- maxlen = BUFSIZ;
- i = 0;
- len = 0;
- *line = malloc(maxlen);
- end = 0;
-
- do {
- // read next portion
- if (buffer->pos >= buffer->size) {
- buffer->pos = 0;
- buffer->size = 0;
- if ((retval = glite_jppsbe_pread(ctx, handle, buffer->buf, BUFSIZ, buffer->offset, &nbytes)) == 0) {
- if (nbytes < 0) {
- retval = EINVAL;
- goto fail;
- } else {
- if (nbytes) {
- buffer->size = (size_t)nbytes;
- buffer->offset += nbytes;
- } else end = 1;
- }
- } else goto fail;
- }
-
- // we have buffer->size - buffer->pos bytes
- i = buffer->pos;
- do {
- if (i >= buffer->size) z = '\0';
- else {
- z = buffer->buf[i];
- if (z == '\n') z = '\0';
- }
- len++;
-
- if (!check_realloc_line(line, &maxlen, len)) {
- retval = ENOMEM;
- goto fail;
- }
- (*line)[len - 1] = z;
- i++;
- } while (z && i < buffer->size);
- buffer->pos = i;
- } while (len && (*line)[len - 1] != '\0');
-
- if ((!len || !(*line)[0]) && end) {
- free(*line);
- *line = NULL;
- }
-
- return 0;
-
-fail:
- free(*line);
- *line = NULL;
- return retval;
-}
-*/
+++ /dev/null
-#ident "$Header$"
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-#include <syslog.h>
-
-#include "glite/lb/context-int.h"
-
-#include "jobstat.h"
-#include "lock.h"
-
-/* TBD: share in whole logging or workload */
-#ifdef __GNUC__
-#define UNUSED_VAR __attribute__((unused))
-#else
-#define UNUSED_VAR
-#endif
-
-static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
-
-int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
-{
- if (js->pub.jobtype == -1 && e->type == EDG_WLL_EVENT_REGJOB)
- switch (e->regJob.jobtype) {
- case EDG_WLL_REGJOB_SIMPLE:
- js->pub.jobtype = EDG_WLL_STAT_SIMPLE;
- break;
- case EDG_WLL_REGJOB_DAG:
- case EDG_WLL_REGJOB_PARTITIONABLE:
- case EDG_WLL_REGJOB_PARTITIONED:
- js->pub.jobtype = EDG_WLL_STAT_DAG;
- break;
- case EDG_WLL_REGJOB_COLLECTION:
- js->pub.jobtype = EDG_WLL_STAT_COLLECTION;
- break;
- case EDG_WLL_REGJOB_PBS:
- js->pub.jobtype = EDG_WLL_STAT_PBS;
- break;
- case EDG_WLL_REGJOB_CONDOR:
- js->pub.jobtype = EDG_WLL_STAT_CONDOR;
- break;
- default:
- asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype);
- return RET_FAIL;
- }
-
- switch (js->pub.jobtype) {
- case EDG_WLL_STAT_SIMPLE:
- case EDG_WLL_STAT_DAG:
- case EDG_WLL_STAT_COLLECTION:
- return processEvent_glite(js,e,ev_seq,strict,errstring);
- case EDG_WLL_STAT_PBS:
- return processEvent_PBS(js,e,ev_seq,strict,errstring);
- case EDG_WLL_STAT_CONDOR:
- return processEvent_Condor(js,e,ev_seq,strict,errstring);
- case -1: return RET_UNREG;
- default:
- asprintf(errstring,"undefined job type %d",js->pub.jobtype);
- return RET_FAIL;
- }
-}
-
-#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
-#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } }
-
-static void free_stringlist(char ***lptr)
-{
- char **itptr;
- int i;
-
- if (*lptr) {
- for (i = 0, itptr = *lptr; itptr[i] != NULL; i++)
- free(itptr[i]);
- free(itptr);
- *lptr = NULL;
- }
-}
-
-static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char *new_item2)
-{
- edg_wll_TagValue *itptr;
- int i;
-
- if (*lptr == NULL) {
- itptr = (edg_wll_TagValue *) calloc(2,sizeof(edg_wll_TagValue));
- itptr[0].tag = strdup(new_item);
- itptr[0].value = strdup(new_item2);
- *lptr = itptr;
- return 1;
- } else {
- for (i = 0, itptr = *lptr; itptr[i].tag != NULL; i++)
- if ( !strcasecmp(itptr[i].tag, new_item) )
- {
- free(itptr[i].value);
- itptr[i].value = strdup(new_item2);
- return 1;
- }
- itptr = (edg_wll_TagValue *) realloc(*lptr, (i+2)*sizeof(edg_wll_TagValue));
- if (itptr != NULL) {
- itptr[i].tag = strdup(new_item);
- itptr[i].value = strdup(new_item2);
- itptr[i+1].tag = NULL;
- itptr[i+1].value = NULL;
- *lptr = itptr;
- return 1;
- } else {
- return 0;
- }
- }
-}
-
-
-static void update_branch_state(char *b, char *d, char *c, char *j, branch_state **bs)
-{
- int i = 0, branch;
-
-
- if (!b)
- return;
- else
- branch = component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER);
-
- if (*bs != NULL) {
- while ((*bs)[i].branch) {
- if (branch == (*bs)[i].branch) {
- if (d) rep((*bs)[i].destination, d);
- if (c) rep((*bs)[i].ce_node, c);
- if (j) rep((*bs)[i].jdl, j);
-
- return;
- }
- i++;
- }
- }
-
- *bs = (branch_state *) realloc(*bs, (i+2)*sizeof(branch_state));
- memset(&((*bs)[i]), 0, 2*sizeof(branch_state));
-
- (*bs)[i].branch = branch;
- rep((*bs)[i].destination, d);
- rep((*bs)[i].ce_node, c);
- rep((*bs)[i].jdl, j);
-}
-
-
-static void free_branch_state(branch_state **bs)
-{
- int i = 0;
-
- if (*bs == NULL) return;
-
- while ((*bs)[i].branch) {
- free((*bs)[i].destination);
- free((*bs)[i].ce_node);
- free((*bs)[i].jdl);
- i++;
- }
- free(*bs);
- *bs = NULL;
-}
-
-static int compare_branch_states(const void *a, const void *b)
-{
- branch_state *c = (branch_state *) a;
- branch_state *d = (branch_state *) b;
-
- if (c->branch < d->branch) return -1;
- if (c->branch == d->branch) return 0;
- /* avoid warning: if (c->branch > d->branch) */ return 1;
-}
-
-static void load_branch_state(intJobStat *js)
-{
- int i, j, branch;
-
-
- if ( (!js->branch_tag_seqcode) || (!js->branch_states) ) return;
-
- branch = component_seqcode(js->branch_tag_seqcode, EDG_WLL_SOURCE_WORKLOAD_MANAGER);
-
- // count elements
- i = 0;
- while (js->branch_states[i].branch) i++;
-
- // sort them
- qsort(js->branch_states, (size_t) i, sizeof(branch_state),
- compare_branch_states);
-
- // find row corresponding to ReallyRunning WM seq.code (aka branch)
- i = 0;
- while (js->branch_states[i].branch) {
- if (js->branch_states[i].branch == branch) break;
- i++;
- }
-
- // copy this and two before branches data to final state
- // (each field - dest,ce,jdl - comes from different event)
- // (and these events have most likely different WM seq.codes)
- // (even belonging into one logical branch)
- // (the newer the more important - so i-th element is copied as last)
- // (and may overwrite data from previous elements)
- for (j = i - 2; j <= i; j++) {
- if (j >= 0) {
- if (js->branch_states[j].destination)
- rep(js->pub.destination, js->branch_states[j].destination);
- if (js->branch_states[j].ce_node)
- rep(js->pub.ce_node, js->branch_states[j].ce_node);
- if (js->branch_states[j].jdl)
- rep(js->pub.matched_jdl, js->branch_states[j].jdl);
- }
- }
-}
-
-// clear branches (deep resub. or abort)
-static void reset_branch(intJobStat *js, edg_wll_Event *e)
-{
- js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB;
- free_stringlist(&js->pub.possible_destinations);
- free_stringlist(&js->pub.possible_ce_nodes);
- free_branch_state(&js->branch_states);
- js->pub.payload_running = 0;
- rep(js->branch_tag_seqcode, NULL);
- rep(js->deep_resubmit_seqcode, e->any.seqcode);
-}
-
-static char* location_string(const char *source, const char *host, const char *instance)
-{
- char *ret;
- asprintf(&ret, "%s/%s/%s", source, host, instance);
- return ret;
-}
-
-/* is seq. number of 'es' before WMS higher then 'js' */
-static int after_enter_wm(const char *es,const char *js)
-{
- return ((component_seqcode(es,EDG_WLL_SOURCE_NETWORK_SERVER) >
- component_seqcode(js,EDG_WLL_SOURCE_NETWORK_SERVER))
- ||
- (component_seqcode(es,EDG_WLL_SOURCE_USER_INTERFACE) >
- component_seqcode(js,EDG_WLL_SOURCE_USER_INTERFACE)));
-}
-
-
-static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUSED_VAR)
-{
- char *str;
-
- str = edg_wll_EventToString(e->any.type);
- fprintf(stderr, "edg_wll_JobStatus: bad event: type %d (%s)\n",
- e->any.type, (str == NULL) ? "unknown" : str);
- free(str);
- return RET_FATAL;
-}
-
-
-// (?) || (0 && 1) => true if (res == RET_OK)
-#define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict))
-
-// (?) || (1 && 1) => always true
-#define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict))
-
-#define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH)
-#define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE)
-#define PARSABLE_SEQCODE(code) (component_seqcode((code),0) >= 0)
-
-static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
-{
- edg_wll_JobStatCode old_state = js->pub.state;
- edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN;
- int res = RET_OK,
- fine_res = RET_OK;
-
- int lm_favour_lrms = 0;
-
- // Aborted may not be terminal state for collection in some cases
- // i.e. if some Done/failed subjob is resubmitted
- if ( (old_state == EDG_WLL_JOB_ABORTED && e->any.type != EDG_WLL_EVENT_COLLECTIONSTATE) ||
- old_state == EDG_WLL_JOB_CANCELLED ||
- old_state == EDG_WLL_JOB_CLEARED) {
- res = RET_LATE;
- }
-
-/* new event coming from NS or UI => forget about any resubmission loops */
- if (e->type != EDG_WLL_EVENT_CANCEL &&
- js->last_seqcode &&
- after_enter_wm(e->any.seqcode,js->last_seqcode))
- {
- rep(js->branch_tag_seqcode,NULL);
- rep(js->deep_resubmit_seqcode,NULL);
- rep(js->last_branch_seqcode,NULL);
- }
-
- if (js->deep_resubmit_seqcode &&
- before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) {
- res = RET_LATE;
- fine_res = RET_TOOOLD;
- }
- else if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived
- if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) {
- if ((js->last_branch_seqcode != NULL) &&
- edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) {
- res = RET_LATE;
- }
- fine_res = RET_GOODBRANCH;
- }
- else {
- res = RET_LATE;
- fine_res = RET_BADBRANCH;
- }
- }
- else if ((js->last_seqcode != NULL) &&
- edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) {
- res = RET_LATE;
- }
-
-
- switch (e->any.type) {
- case EDG_WLL_EVENT_TRANSFER:
- if (e->transfer.result == EDG_WLL_TRANSFER_OK) {
- switch (e->transfer.source) {
- case EDG_WLL_SOURCE_USER_INTERFACE:
- new_state = EDG_WLL_JOB_WAITING; break;
- case EDG_WLL_SOURCE_JOB_SUBMISSION:
- /* if (LRMS_STATE(old_state)) res = RET_LATE; */
- new_state = EDG_WLL_JOB_READY; break;
- case EDG_WLL_SOURCE_LOG_MONITOR:
- if (LRMS_STATE(old_state)) {
- js->pub.stateEnterTimes[1 + EDG_WLL_JOB_SCHEDULED] =
- e->any.timestamp.tv_sec;
- res = RET_LATE;
- }
- new_state = EDG_WLL_JOB_SCHEDULED;
- lm_favour_lrms = 1;
- break;
- default:
- goto bad_event; break;
- }
- } else if (e->transfer.result == EDG_WLL_TRANSFER_FAIL) {
- /* transfer failed */
- switch (e->transfer.source) {
- case EDG_WLL_SOURCE_USER_INTERFACE:
- new_state = EDG_WLL_JOB_SUBMITTED; break;
- case EDG_WLL_SOURCE_JOB_SUBMISSION:
- if (LRMS_STATE(old_state)) res = RET_LATE;
- new_state = EDG_WLL_JOB_READY; break;
- case EDG_WLL_SOURCE_LOG_MONITOR:
- if (LRMS_STATE(old_state)) res = RET_LATE;
- new_state = EDG_WLL_JOB_READY; break;
- default:
- goto bad_event; break;
- }
- } else {
- /* e->transfer.result == EDG_WLL_TRANSFER_START */
- res = RET_IGNORE;
- }
- if (USABLE(res, strict)) {
- js->pub.state = new_state;
- rep(js->pub.reason, e->transfer.reason);
-
- free(js->pub.location);
- if (e->transfer.result == EDG_WLL_TRANSFER_OK) {
- js->pub.location = location_string(
- edg_wll_SourceToString(e->transfer.destination),
- e->transfer.dest_host,
- e->transfer.dest_instance);
- } else {
- js->pub.location = location_string(
- edg_wll_SourceToString(e->transfer.source),
- e->transfer.host,
- e->transfer.src_instance);
- }
- }
- if (USABLE_DATA(res, strict)) {
- switch (e->transfer.source) {
- case EDG_WLL_SOURCE_USER_INTERFACE:
- rep(js->pub.jdl, e->transfer.job); break;
- case EDG_WLL_SOURCE_JOB_SUBMISSION:
- rep(js->pub.condor_jdl, e->transfer.job); break;
- case EDG_WLL_SOURCE_LOG_MONITOR:
- rep(js->pub.rsl, e->transfer.job); break;
- default:
- goto bad_event; break;
-
- }
- }
- break;
- case EDG_WLL_EVENT_ACCEPTED:
- switch (e->accepted.source) {
- case EDG_WLL_SOURCE_NETWORK_SERVER:
- new_state = EDG_WLL_JOB_WAITING; break;
- case EDG_WLL_SOURCE_LOG_MONITOR:
- if (LRMS_STATE(old_state)) res = RET_LATE;
- new_state = EDG_WLL_JOB_READY;
- lm_favour_lrms = 1;
- break;
- case EDG_WLL_SOURCE_LRMS:
- new_state = EDG_WLL_JOB_SCHEDULED; break;
- default:
- goto bad_event; break;
- }
- if (USABLE(res, strict)) {
- js->pub.state = new_state;
- free(js->pub.location);
- js->pub.location = location_string(
- edg_wll_SourceToString(e->accepted.source),
- e->accepted.host,
- e->accepted.src_instance);
- }
- if (USABLE_DATA(res, strict)) {
- switch (e->accepted.source) {
- case EDG_WLL_SOURCE_NETWORK_SERVER:
- break; /* no WM id */
- case EDG_WLL_SOURCE_LOG_MONITOR:
- rep(js->pub.condorId, e->accepted.local_jobid); break;
- case EDG_WLL_SOURCE_LRMS:
- /* XXX localId */
- rep(js->pub.globusId, e->accepted.local_jobid); break;
- default:
- goto bad_event; break;
- }
- }
- break;
- case EDG_WLL_EVENT_REFUSED:
- switch (e->refused.source) {
- case EDG_WLL_SOURCE_NETWORK_SERVER:
- new_state = EDG_WLL_JOB_SUBMITTED; break;
- case EDG_WLL_SOURCE_LOG_MONITOR:
- new_state = EDG_WLL_JOB_READY; break;
- case EDG_WLL_SOURCE_LRMS:
- new_state = EDG_WLL_JOB_READY; break;
- default:
- goto bad_event; break;
- }
- if (USABLE(res, strict)) {
- js->pub.state = new_state;
- rep(js->pub.reason, e->refused.reason);
-
- free(js->pub.location);
- js->pub.location = location_string(
- edg_wll_SourceToString(e->refused.from),
- e->refused.from_host,
- e->refused.from_instance);
- }
- break;
- case EDG_WLL_EVENT_ENQUEUED:
- if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) {
- switch (e->enQueued.source) {
- case EDG_WLL_SOURCE_NETWORK_SERVER:
- new_state = EDG_WLL_JOB_WAITING; break;
- case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
- if (LRMS_STATE(old_state)) res = RET_LATE;
- update_branch_state(e->any.seqcode, NULL,
- NULL, e->enQueued.job, &js->branch_states);
- new_state = EDG_WLL_JOB_READY; break;
- case EDG_WLL_SOURCE_LOG_MONITOR:
- new_state = EDG_WLL_JOB_WAITING; break;
- default:
- goto bad_event; break;
- }
- } else if (e->enQueued.result == EDG_WLL_ENQUEUED_FAIL) {
- switch (e->enQueued.source) {
- case EDG_WLL_SOURCE_NETWORK_SERVER:
- new_state = EDG_WLL_JOB_WAITING; break;
- case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
- new_state = EDG_WLL_JOB_WAITING; break;
- case EDG_WLL_SOURCE_LOG_MONITOR:
- new_state = old_state; break;
- default:
- goto bad_event; break;
- }
- } else {
- /* e->enQueued.result == EDG_WLL_ENQUEUED_START */
- res = RET_IGNORE;
- }
- if (USABLE(res, strict)) {
- js->pub.state = new_state;
- rep(js->pub.reason, e->enQueued.reason);
-
- free(js->pub.location);
- if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) {
- js->pub.location = location_string(
- e->enQueued.queue,
- e->enQueued.host,
- e->enQueued.src_instance);
- if (e->enQueued.source == EDG_WLL_SOURCE_LOG_MONITOR)
- js->pub.resubmitted = 1;
- } else {
- js->pub.location = location_string(
- edg_wll_SourceToString(e->enQueued.source),
- e->enQueued.host,
- e->enQueued.src_instance);
- }
- }
- if (USABLE_DATA(res, strict)) {
- switch (e->enQueued.source) {
- case EDG_WLL_SOURCE_NETWORK_SERVER:
- rep(js->pub.jdl, e->enQueued.job); break;
- case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
- if (USABLE_BRANCH(res)) {
- rep(js->pub.matched_jdl, e->enQueued.job);
- }
- break;
- case EDG_WLL_SOURCE_LOG_MONITOR:
- /* no interim JDL here */
- break;
- default:
- goto bad_event; break;
- }
- }
- break;
- case EDG_WLL_EVENT_DEQUEUED:
- switch (e->deQueued.source) {
- case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
- new_state = EDG_WLL_JOB_WAITING; break;
- case EDG_WLL_SOURCE_JOB_SUBMISSION:
- if (LRMS_STATE(old_state)) res = RET_LATE;
- new_state = EDG_WLL_JOB_READY; break;
- default:
- goto bad_event; break;
- }
- if (USABLE(res, strict)) {
- js->pub.state = new_state;
- free(js->pub.location);
- js->pub.location = location_string(
- edg_wll_SourceToString(e->deQueued.source),
- e->deQueued.host,
- e->deQueued.src_instance);
- }
- if (USABLE_DATA(res, strict)) {
- /* no WM/JSS local jobid */
- }
- break;
- case EDG_WLL_EVENT_HELPERCALL:
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_WAITING;
- free(js->pub.location);
- js->pub.location = location_string(
- e->helperCall.helper_name,
- e->helperCall.host,
- e->helperCall.src_instance);
- /* roles and params used only for debugging */
- }
- break;
- case EDG_WLL_EVENT_HELPERRETURN:
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_WAITING;
- free(js->pub.location);
- js->pub.location = location_string(
- edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER),
- e->helperReturn.host,
- e->helperReturn.src_instance);
- /* roles and retvals used only for debugging */
- }
- break;
- case EDG_WLL_EVENT_RUNNING:
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_RUNNING;
- free(js->pub.location);
- js->pub.location = location_string(
- edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS),
- "worknode",
- e->running.node);
- }
- if (USABLE_DATA(res, strict)) {
- if (USABLE_BRANCH(fine_res)) {
- rep(js->pub.ce_node, e->running.node);
- }
- /* why? if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { */
- if (e->running.node) {
- update_branch_state(e->any.seqcode, NULL,
- e->running.node, NULL, &js->branch_states);
- add_stringlist(&js->pub.possible_ce_nodes,
- e->running.node);
- }
- /* } */
- }
- break;
- case EDG_WLL_EVENT_REALLYRUNNING:
- /* consistence check -- should not receive two contradicting ReallyRunning's within single
- deep resub cycle */
- if (fine_res == RET_BADBRANCH) {
- syslog(LOG_ERR,"ReallyRunning on bad branch %s",
- e->any.source == EDG_WLL_SOURCE_LOG_MONITOR ? e->reallyRunning.wn_seq : e->any.seqcode);
- break;
- }
- /* select the branch unless TOOOLD, i.e. before deep resubmission */
- if (!(res == RET_LATE && fine_res == RET_TOOOLD)) {
- if (e->any.source == EDG_WLL_SOURCE_LRMS) {
- rep(js->branch_tag_seqcode, e->any.seqcode);
- if (res == RET_OK) {
- rep(js->last_branch_seqcode, e->any.seqcode);
- js->pub.state = EDG_WLL_JOB_RUNNING;
- }
- }
- if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
- if (!js->branch_tag_seqcode) {
- if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
- rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq);
- } else
- goto bad_event;
- }
- if (!js->last_branch_seqcode) {
- if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
- if (res == RET_OK) {
- rep(js->last_branch_seqcode, e->reallyRunning.wn_seq);
- js->pub.state = EDG_WLL_JOB_RUNNING;
- }
- } else
- goto bad_event;
- }
- }
-
- /* XXX: best effort -- if we are lucky, ReallyRunning is on the last shallow cycle,
- so we take in account events processed so far */
- if (res == RET_LATE && !js->last_branch_seqcode) {
- if (same_branch(js->last_seqcode,js->branch_tag_seqcode))
- rep(js->last_branch_seqcode,js->last_seqcode);
- }
-
- js->pub.payload_running = 1;
- load_branch_state(js);
- }
-#if 0
- if (USABLE_DATA(res, strict)) {
- js->pub.state = EDG_WLL_JOB_RUNNING;
- free(js->pub.location);
- js->pub.location = location_string(
- edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS),
- "worknode",
- e->running.node);
- js->pub.payload_running = 1;
- if (e->any.source == EDG_WLL_SOURCE_LRMS) {
- rep(js->branch_tag_seqcode, e->any.seqcode);
- rep(js->last_branch_seqcode, e->any.seqcode);
- }
- if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
- if (!js->branch_tag_seqcode) {
- if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
- rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq);
- } else
- goto bad_event;
- }
- if (!js->last_branch_seqcode) {
- if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
- rep(js->last_branch_seqcode, e->reallyRunning.wn_seq);
- } else
- goto bad_event;
- }
- }
- load_branch_state(js);
- }
-#endif
- break;
- case EDG_WLL_EVENT_SUSPEND:
- if (USABLE(res, strict)) {
- if (js->pub.state == EDG_WLL_JOB_RUNNING) {
- js->pub.suspended = 1;
- rep(js->pub.suspend_reason, e->suspend.reason);
- }
- }
- break;
- case EDG_WLL_EVENT_RESUME:
- if (USABLE(res, strict)) {
- if (js->pub.state == EDG_WLL_JOB_RUNNING) {
- js->pub.suspended = 0;
- rep(js->pub.suspend_reason, e->resume.reason);
- }
- }
- break;
- case EDG_WLL_EVENT_RESUBMISSION:
- if (USABLE(res, strict)) {
- if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) {
- rep(js->pub.reason, e->resubmission.reason);
- }
- }
- if (USABLE_DATA(res, strict)) {
- if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) {
- js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB;
- }
- else
- if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB &&
- e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) {
- reset_branch(js, e);
- }
- else
- if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) {
- js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW;
- // deep resubmit stays forever deadline for events
- // rep(js->deep_resubmit_seqcode, NULL);
- }
- }
- break;
- case EDG_WLL_EVENT_DONE:
- if (e->any.source == EDG_WLL_SOURCE_LRMS) {
- /* Done from JobWrapper is not sufficient for transition
- * to DONE state according its current definition */
- break;
- }
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_DONE;
- rep(js->pub.reason, e->done.reason);
- if (fine_res == RET_GOODBRANCH) {
- js->pub.payload_running = 0;
- }
- switch (e->done.status_code) {
- case EDG_WLL_DONE_CANCELLED:
- js->pub.state = EDG_WLL_JOB_CANCELLED;
- case EDG_WLL_DONE_OK:
- rep(js->pub.location, "none"); break;
- default:
- free(js->pub.location);
- js->pub.location = location_string(
- edg_wll_SourceToString(e->done.source),
- e->done.host,
- e->done.src_instance);
- }
- }
- if (USABLE_DATA(res, strict)) {
- switch (e->done.status_code) {
- case EDG_WLL_DONE_OK:
- js->pub.exit_code = e->done.exit_code;
- js->pub.done_code = EDG_WLL_STAT_OK; break;
- case EDG_WLL_DONE_CANCELLED:
- js->pub.exit_code = 0;
- js->pub.done_code = EDG_WLL_STAT_CANCELLED; break;
- case EDG_WLL_DONE_FAILED:
- js->pub.exit_code = 0;
- js->pub.done_code = EDG_WLL_STAT_FAILED; break;
- default:
- goto bad_event; break;
- }
- }
- break;
- case EDG_WLL_EVENT_CANCEL:
- if (fine_res != RET_BADBRANCH) {
- if (js->last_cancel_seqcode != NULL &&
- edg_wll_compare_seq(e->any.seqcode, js->last_cancel_seqcode) < 0) {
- res = RET_LATE;
- }
- }
- else {
- res = RET_LATE;
- }
- if (USABLE(res, strict)) {
- switch (e->cancel.status_code) {
- case EDG_WLL_CANCEL_REQ:
- js->pub.cancelling = 1; break;
- case EDG_WLL_CANCEL_DONE:
- js->pub.state = EDG_WLL_JOB_CANCELLED;
- js->pub.remove_from_proxy = 1;
- rep(js->pub.reason, e->cancel.reason);
- rep(js->last_seqcode, e->any.seqcode);
- rep(js->pub.location, "none");
- /* fall though */
- case EDG_WLL_CANCEL_ABORT:
- js->pub.cancelling = 0; break;
- default:
- /* do nothing */
- break;
-
- }
- }
- if (USABLE_DATA(res, strict)) {
- rep(js->pub.cancelReason, e->cancel.reason);
- }
- break;
- case EDG_WLL_EVENT_ABORT:
- // XXX: accept Abort from WM in every case
- // setting res make USABLE macro true (awful !!)
- if (e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) res = RET_OK;
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_ABORTED;
- js->pub.remove_from_proxy = 1;
- rep(js->pub.reason, e->abort.reason);
- rep(js->pub.location, "none");
-
- reset_branch(js, e);
- }
- break;
-
- case EDG_WLL_EVENT_CLEAR:
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_CLEARED;
- js->pub.remove_from_proxy = 1;
- rep(js->pub.location, "none");
- switch (e->clear.reason) {
- case EDG_WLL_CLEAR_USER:
- rep(js->pub.reason, "user retrieved output sandbox");
- break;
- case EDG_WLL_CLEAR_TIMEOUT:
- rep(js->pub.reason, "timed out, resource purge forced");
- break;
- case EDG_WLL_CLEAR_NOOUTPUT:
- rep(js->pub.reason, "no output was generated");
- break;
- default:
- goto bad_event; break;
-
- }
- }
- break;
- case EDG_WLL_EVENT_PURGE:
- /* ignore, meta-information only */
- break;
- case EDG_WLL_EVENT_MATCH:
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_WAITING;
- js->pub.location = location_string(
- edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER),
- e->match.host,
- e->match.src_instance);
- }
- if (USABLE_DATA(res, strict)) {
- if (USABLE_BRANCH(fine_res)) {
- rep(js->pub.destination, e->match.dest_id);
- }
- if (e->match.dest_id) {
- update_branch_state(e->any.seqcode, e->match.dest_id,
- NULL, NULL, &js->branch_states);
- add_stringlist(&js->pub.possible_destinations,
- e->match.dest_id);
- }
- }
- break;
- case EDG_WLL_EVENT_PENDING:
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_WAITING;
- rep(js->pub.reason, e->pending.reason);
- js->pub.location = location_string(
- edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER),
- e->match.host,
- e->match.src_instance);
- }
- break;
- case EDG_WLL_EVENT_REGJOB:
- if (USABLE(res, strict)) {
- js->pub.state = EDG_WLL_JOB_SUBMITTED;
- }
- if (USABLE_DATA(res, strict)) {
- rep_cond(js->pub.jdl, e->regJob.jdl);
- edg_wlc_JobIdFree(js->pub.parent_job);
- edg_wlc_JobIdDup(e->regJob.parent,
- &js->pub.parent_job);
- rep(js->pub.network_server, e->regJob.ns);
- js->pub.children_num = e->regJob.nsubjobs;
- switch (e->regJob.jobtype) {
- case EDG_WLL_REGJOB_DAG:
- case EDG_WLL_REGJOB_PARTITIONED:
- js->pub.jobtype = EDG_WLL_STAT_DAG;
- js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num;
- break;
- case EDG_WLL_REGJOB_COLLECTION:
- js->pub.jobtype = EDG_WLL_STAT_COLLECTION;
- js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num;
- break;
- default:
- break;
- }
- rep(js->pub.seed, e->regJob.seed);
- }
- break;
- case EDG_WLL_EVENT_USERTAG:
- if (USABLE_DATA(res, strict)) {
- if (e->userTag.name != NULL && e->userTag.value != NULL) {
- add_taglist(&js->pub.user_tags,
- e->userTag.name, e->userTag.value);
- } else {
- goto bad_event;
- }
- }
- break;
- case EDG_WLL_EVENT_LISTENER:
- /* ignore, listener port is not part of job status */
- break;
- case EDG_WLL_EVENT_CURDESCR:
- case EDG_WLL_EVENT_CHKPT:
- case EDG_WLL_EVENT_CHANGEACL:
- /* ignore, only for event log */
- break;
- case EDG_WLL_EVENT_COLLECTIONSTATE:
- new_state = edg_wll_StringToStat(e->collectionState.state);
- if (USABLE(res, strict)) {
- js->pub.state = new_state;
- if (new_state == EDG_WLL_JOB_DONE)
- js->pub.done_code = e->collectionState.done_code;
- }
- break;
- default:
- goto bad_event;
- break;
- }
-
- if (USABLE(res,strict)) {
- js->pub.lastUpdateTime = e->any.timestamp;
- if (old_state != js->pub.state) {
- js->pub.stateEnterTime = js->pub.lastUpdateTime;
- js->pub.stateEnterTimes[1 + js->pub.state]
- = (int)js->pub.lastUpdateTime.tv_sec;
- }
- if (e->any.type == EDG_WLL_EVENT_CANCEL) {
- rep(js->last_cancel_seqcode, e->any.seqcode);
- } else {
-
-/* the first set of LM events (Accept, Transfer/- -> LRMS)
- should not should shift the state (to Ready, Scheduled) but NOT to
- update js->last_seqcode completely, in order not to block following
- LRMS events which are likely to arrive later but should still affect
- job state (as there may be no more LM events due to the Condor bug).
- However, don't ignore the incoming seqcode completely, to catch up
- with possibly delayed WM/JSS events */
-
- if (lm_favour_lrms) {
- free(js->last_seqcode);
- js->last_seqcode = set_component_seqcode(e->any.seqcode,EDG_WLL_SOURCE_LOG_MONITOR,0);
- }
- else rep(js->last_seqcode, e->any.seqcode);
- }
-
- if (js->pub.state != EDG_WLL_JOB_RUNNING) {
- js->pub.suspended = 0;
- rep(js->pub.suspend_reason, NULL);
- }
-
- if (fine_res == RET_GOODBRANCH) {
- rep(js->last_branch_seqcode, e->any.seqcode);
- }
- }
-
- if (USABLE_DATA(res,strict)) {
- if (e->any.source == EDG_WLL_SOURCE_NETWORK_SERVER &&
- js->pub.network_server == NULL) {
- char *inst;
- inst = e->any.src_instance;
- asprintf(&js->pub.network_server, "%s%s%s",
- e->any.host,
- inst != NULL ? ":" : " ",
- inst != NULL ? inst : "");
- }
- }
-
- return res;
-
-bad_event:
- badEvent(js,e,ev_seq);
- return RET_SUSPECT;
-}
-
-int add_stringlist(char ***lptr, const char *new_item)
-{
- char **itptr;
- int i;
-
- if (*lptr == NULL) {
- itptr = (char **) malloc(2*sizeof(char *));
- itptr[0] = strdup(new_item);
- itptr[1] = NULL;
- *lptr = itptr;
- return 1;
- } else {
- for (i = 0, itptr = *lptr; itptr[i] != NULL; i++);
- itptr = (char **) realloc(*lptr, (i+2)*sizeof(char *));
- if (itptr != NULL) {
- itptr[i] = strdup(new_item);
- itptr[i+1] = NULL;
- *lptr = itptr;
- return 1;
- } else {
- return 0;
- }
- }
-}
-
-void destroy_intJobStat_extension(intJobStat *p)
-{
- if (p->last_seqcode) free(p->last_seqcode);
- if (p->last_cancel_seqcode) free(p->last_cancel_seqcode);
- if (p->branch_tag_seqcode) free(p->branch_tag_seqcode);
- if (p->last_branch_seqcode) free(p->last_branch_seqcode);
- if (p->deep_resubmit_seqcode) free(p->deep_resubmit_seqcode);
- free_branch_state(&p->branch_states);
- memset(p,0,sizeof(*p));
-}
-
-void destroy_intJobStat(intJobStat *p)
-{
- edg_wll_FreeStatus(&p->pub);
- destroy_intJobStat_extension(p);
- memset(p, 0, sizeof(intJobStat));
-}
+++ /dev/null
-#ident "$Header$"
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-#include <syslog.h>
-
-#include "glite/lb/context-int.h"
-
-#include "jobstat.h"
-#include "lock.h"
-
-/* TBD: share in whole logging or workload */
-#ifdef __GNUC__
-#define UNUSED_VAR __attribute__((unused))
-#else
-#define UNUSED_VAR
-#endif
-
-// XXX: maybe not needed any more
-// if not, remove also last_condor_event_timestamp from intJobStat
-static int compare_timestamps(struct timeval a, struct timeval b)
-{
- if ( (a.tv_sec > b.tv_sec) ||
- ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1;
- if ( (a.tv_sec < b.tv_sec) ||
- ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1;
- return 0;
-}
-
-
-// XXX move this defines into some common place to be reusable
-#define USABLE(res) ((res) == RET_OK)
-#define USABLE_DATA(res) (1)
-#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
-#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } }
-
-int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
-{
- edg_wll_JobStatCode old_state = js->pub.state;
- int res = RET_OK;
-
-
- if ((js->last_seqcode != NULL) &&
- (edg_wll_compare_condor_seq(js->last_seqcode, e->any.seqcode) > 0) ) {
- res = RET_LATE;
- }
-
- switch (e->any.type) {
- case EDG_WLL_EVENT_REGJOB:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_SUBMITTED;
- rep(js->pub.condor_status, "Idle");
- }
- if (USABLE_DATA(res)) {
- rep(js->pub.jdl, e->regJob.jdl);
- }
- break;
- case EDG_WLL_EVENT_CONDORMATCH:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_READY;
- rep(js->pub.condor_status, "Idle");
- }
- if (USABLE_DATA(res)) {
- rep_cond(js->pub.condor_dest_host,e->CondorMatch.dest_host);
- }
- break;
- case EDG_WLL_EVENT_CONDORREJECT:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_ABORTED;
- rep(js->pub.condor_status, "Unexpanded");
- }
- if (USABLE_DATA(res)) {
- switch(e->CondorReject.status_code) {
- case EDG_WLL_CONDORREJECT_NOMATCH:
- rep(js->pub.condor_reason,"No match found.");
- break;
- case EDG_WLL_CONDORREJECT_OTHER:
- default:
- break;
- }
- }
- break;
- case EDG_WLL_EVENT_CONDORSHADOWSTARTED:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_READY;
- rep(js->pub.condor_status, "Idle");
- }
- if (USABLE_DATA(res)) {
- switch (get_condor_event_source(e->any.seqcode)) {
- case EDG_WLL_CONDOR_EVENT_SOURCE_SCHED:
- js->pub.condor_shadow_pid = e->CondorShadowStarted.shadow_pid;
- break;
- default:
- break;
- }
- }
- break;
- case EDG_WLL_EVENT_CONDORSHADOWEXITED:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_DONE;
- rep(js->pub.condor_status, "Completed");
- }
- if (USABLE_DATA(res)) {
- switch (get_condor_event_source(e->any.seqcode)) {
- case EDG_WLL_CONDOR_EVENT_SOURCE_SHADOW:
- js->pub.condor_shadow_exit_status = e->CondorShadowExited.shadow_exit_status;
- break;
- default:
- break;
- }
- }
- break;
- case EDG_WLL_EVENT_CONDORSTARTERSTARTED:
- if (USABLE(res)) {
- switch (get_condor_event_source(e->any.seqcode)) {
- case EDG_WLL_CONDOR_EVENT_SOURCE_START:
- js->pub.state = EDG_WLL_JOB_SCHEDULED;
- rep(js->pub.condor_status, "Idle");
- break;
- case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER:
- js->pub.state = EDG_WLL_JOB_RUNNING;
- rep(js->pub.condor_status, "Running");
- break;
- default:
- break;
- }
- }
- if (USABLE_DATA(res)) {
- switch (get_condor_event_source(e->any.seqcode)) {
- case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER:
- rep(js->pub.condor_universe, e->CondorStarterStarted.universe);
- js->pub.condor_starter_pid = e->CondorStarterStarted.starter_pid;
- break;
- default:
- break;
- }
- }
- break;
- case EDG_WLL_EVENT_CONDORSTARTEREXITED:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_DONE;
- rep(js->pub.condor_status, "Completed");
- }
- if (USABLE_DATA(res)) {
- switch (get_condor_event_source(e->any.seqcode)) {
- case EDG_WLL_CONDOR_EVENT_SOURCE_START:
- js->pub.condor_starter_pid = e->CondorStarterExited.starter_pid;
- js->pub.condor_starter_exit_status = e->CondorStarterExited.starter_exit_status;
- break;
- case EDG_WLL_CONDOR_EVENT_SOURCE_STARTER:
- js->pub.condor_starter_pid = e->CondorStarterExited.starter_pid;
- js->pub.condor_job_pid = e->CondorStarterExited.job_pid;
- js->pub.condor_job_exit_status = e->CondorStarterExited.job_exit_status;
- break;
- default:
- break;
- }
- }
- break;
- case EDG_WLL_EVENT_CONDORRESOURCEUSAGE:
- if (USABLE(res)) {
- }
- if (USABLE_DATA(res)) {
- }
- break;
- case EDG_WLL_EVENT_CONDORERROR:
- if (USABLE(res)) {
- }
- if (USABLE_DATA(res)) {
- }
- break;
-
- default:
- break;
- }
-
-/* XXX : just debug output - remove */
-
- printf("processEvent_Condor(): %s (%s), state: %s --> %s\n ",
- edg_wll_EventToString(e->any.type),
- (res == RET_LATE) ? "RET_LATE" : "RET_OK",
- edg_wll_StatToString(old_state),
- edg_wll_StatToString(js->pub.state) );
- printf("\t%s\n",e->any.seqcode);
- printf("\t(last=%s)\n",js->last_seqcode);
-
-/*----------------------------------*/
-
- if (USABLE(res)) {
- rep(js->last_seqcode, e->any.seqcode);
-
- js->pub.lastUpdateTime = e->any.timestamp;
- if (old_state != js->pub.state) {
- js->pub.stateEnterTime = js->pub.lastUpdateTime;
- js->pub.stateEnterTimes[1 + js->pub.state]
- = (int)js->pub.lastUpdateTime.tv_sec;
- }
- }
- if (! js->pub.location) js->pub.location = strdup("this is CONDOR");
-
-
- return RET_OK;
-}
-
+++ /dev/null
-#ident "$Header$"
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <errno.h>
-#include <assert.h>
-#include <syslog.h>
-
-#include "glite/lb/context-int.h"
-
-#include "jobstat.h"
-#include "lock.h"
-
-/* TBD: share in whole logging or workload */
-#ifdef __GNUC__
-#define UNUSED_VAR __attribute__((unused))
-#else
-#define UNUSED_VAR
-#endif
-
-// XXX: maybe not needed any more
-// if not, remove also last_pbs_event_timestamp from intJobStat
-static int compare_timestamps(struct timeval a, struct timeval b)
-{
- if ( (a.tv_sec > b.tv_sec) ||
- ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1;
- if ( (a.tv_sec < b.tv_sec) ||
- ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1;
- return 0;
-}
-
-
-// XXX move this defines into some common place to be reusable
-#define USABLE(res) ((res) == RET_OK)
-#define USABLE_DATA(res) (1)
-#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
-#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } }
-
-int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
-{
- edg_wll_JobStatCode old_state = js->pub.state;
- int res = RET_OK;
-
-
- if ((js->last_seqcode != NULL) &&
- (edg_wll_compare_pbs_seq(js->last_seqcode, e->any.seqcode) > 0) ) {
- res = RET_LATE;
- }
-
- switch (e->any.type) {
- case EDG_WLL_EVENT_REGJOB:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_SUBMITTED;
- rep(js->pub.pbs_state, "Q");
- }
- if (USABLE_DATA(res)) {
- }
- break;
- case EDG_WLL_EVENT_PBSQUEUED:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_WAITING;
- rep(js->pub.pbs_state, "Q");
- }
- if (USABLE_DATA(res)) {
- if (!js->pub.pbs_queue)
- js->pub.pbs_queue = strdup(e->PBSQueued.queue);
- assert(!strcmp(js->pub.pbs_queue, e->PBSQueued.queue));
- rep_cond(js->pub.pbs_owner,e->PBSQueued.owner);
- rep_cond(js->pub.pbs_name,e->PBSQueued.name);
- }
- break;
- case EDG_WLL_EVENT_PBSMATCH:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_READY;
- rep(js->pub.pbs_state, "Q");
- }
- if (USABLE_DATA(res)) {
- rep_cond(js->pub.pbs_dest_host,e->PBSMatch.dest_host);
- }
- break;
- case EDG_WLL_EVENT_PBSPENDING:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_WAITING;
- rep(js->pub.pbs_state, "Q");
- js->pbs_reruning = 0; // reset possible reruning flag
- }
- if (USABLE_DATA(res)) {
- rep_cond(js->pub.pbs_reason,e->PBSPending.reason);
- }
- break;
- case EDG_WLL_EVENT_PBSRUN:
- if (USABLE(res)) {
- switch (get_pbs_event_source(e->any.seqcode)) {
- case EDG_WLL_PBS_EVENT_SOURCE_SERVER:
- js->pub.state = EDG_WLL_JOB_SCHEDULED;
- rep(js->pub.pbs_state, "Q");
- break;
- case EDG_WLL_PBS_EVENT_SOURCE_MOM:
- js->pub.state = EDG_WLL_JOB_RUNNING;
- rep(js->pub.pbs_state, "R");
- break;
- default:
- assert(0); // running event from strange source
- break;
- }
- }
- if (USABLE_DATA(res)) {
- rep_cond(js->pub.pbs_scheduler, e->PBSRun.scheduler);
- rep_cond(js->pub.pbs_dest_host, e->PBSRun.dest_host);
- js->pub.pbs_pid = e->PBSRun.pid;
- }
- break;
- case EDG_WLL_EVENT_PBSRERUN:
- if (USABLE(res)) {
- switch (get_pbs_event_source(e->any.seqcode)) {
- case EDG_WLL_PBS_EVENT_SOURCE_SERVER:
- js->pub.state = EDG_WLL_JOB_WAITING;
- rep(js->pub.pbs_state, "Q");
- break;
- case EDG_WLL_PBS_EVENT_SOURCE_MOM:
- js->pub.state = EDG_WLL_JOB_WAITING;
- rep(js->pub.pbs_state, "E");
- js->pbs_reruning = 1;
- break;
- default:
- assert(0); // running event from strande source
- break;
- }
- }
- if (USABLE_DATA(res)) {
- }
- break;
- case EDG_WLL_EVENT_PBSDONE:
- if (USABLE(res)) {
- switch (get_pbs_event_source(e->any.seqcode)) {
- case EDG_WLL_PBS_EVENT_SOURCE_SERVER:
- js->pub.state = EDG_WLL_JOB_DONE;
- js->pub.done_code = EDG_WLL_STAT_OK;
- rep(js->pub.pbs_state, "C");
- break;
- case EDG_WLL_PBS_EVENT_SOURCE_MOM:
- if (!js->pbs_reruning) {
- js->pub.state = EDG_WLL_JOB_DONE;
- js->pub.done_code = EDG_WLL_STAT_OK;
- rep(js->pub.pbs_state, "C");
- }
- break;
- default:
- assert(0); //done event from strange source
- break;
- }
- }
- if (USABLE_DATA(res)) {
- js->pub.pbs_exit_status = e->PBSDone.exit_status;
- }
- break;
- case EDG_WLL_EVENT_PBSRESOURCEUSAGE:
- if (USABLE(res)) {
- // signalize state done, done_code uknown
- js->pub.state = EDG_WLL_JOB_DONE;
- rep(js->pub.pbs_state, "C");
- }
- if (USABLE_DATA(res)) {
- char *new_resource_usage;
-
- asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]",
- (js->pub.pbs_resource_usage) ? js->pub.pbs_resource_usage : "",
- (js->pub.pbs_resource_usage) ? "\n": "",
- e->PBSResourceUsage.name,
- e->PBSResourceUsage.quantity,
- e->PBSResourceUsage.unit);
-
- if (js->pub.pbs_resource_usage) free(js->pub.pbs_resource_usage);
- js->pub.pbs_resource_usage = new_resource_usage;
- }
- break;
- case EDG_WLL_EVENT_PBSERROR:
- if (USABLE(res)) {
- js->pub.state = EDG_WLL_JOB_DONE;
- js->pub.done_code = EDG_WLL_STAT_FAILED;
- rep(js->pub.pbs_state, "C");
- }
- if (USABLE_DATA(res)) {
- char *new_error_desc;
-
- asprintf(&new_error_desc,"%s%s\t%s",
- (js->pub.pbs_error_desc) ? js->pub.pbs_error_desc : "",
- (js->pub.pbs_error_desc) ? "\n" : "",
- e->PBSError.error_desc);
-
- if (js->pub.pbs_error_desc) free(js->pub.pbs_error_desc);
- js->pub.pbs_error_desc = new_error_desc;
- }
- break;
-
- default:
- break;
- }
-
-/* XXX : just debug output - remove */
-
- printf("processEvent_PBS(): %s (%s), state: %s --> %s\n ",
- edg_wll_EventToString(e->any.type),
- (res == RET_LATE) ? "RET_LATE" : "RET_OK",
- edg_wll_StatToString(old_state),
- edg_wll_StatToString(js->pub.state) );
- printf("\t%s\n",e->any.seqcode);
- printf("\t(last=%s)\n",js->last_seqcode);
-
-/*----------------------------------*/
-
- if (USABLE(res)) {
- rep(js->last_seqcode, e->any.seqcode);
-
- js->pub.lastUpdateTime = e->any.timestamp;
- if (old_state != js->pub.state) {
- js->pub.stateEnterTime = js->pub.lastUpdateTime;
- js->pub.stateEnterTimes[1 + js->pub.state]
- = (int)js->pub.lastUpdateTime.tv_sec;
- }
- }
- if (! js->pub.location) js->pub.location = strdup("this is PBS");
-
-
- return RET_OK;
-}
-